Create and Run a Simple Pipeline
In this tutorial, we’ll see how we can create a set of generators and use them to synthesize a simple dataset with things that have color and people that have gender and first names.
We’ll be using the command line interface (CLI). Install and configure the CLI if you haven’t yet.
We’re going to create a set of things that have a random color and a set of people with a random gender and name. We want the name to depend on the person’s gender, so we need two generators: one for male names and one for female names. We want the genders and colors to be equally likely, but we want the names to be selected based on popularity.
Before we jump into creating things, let’s sketch out what we want to do. To run a pipeline, we need a set of seeders and generators. For each generator, we need a transform.
The result looks something like the following:
In this diagram, the stronger lines show definition dependencies (e.g., pipelines depend on generators), and the weaker lines show sequencing dependencies (e.g., ThingSeeder has to run before ColorThings).
Because of the way the platform works, pipelines can share generators, and generators can share transforms. So we can build up a library of transforms and generators as we develop pipelines.
For this tutorial, let’s create a new project named tutorial-project
:
ursactl create project tutorial-project
See “Setting a Default Project” for setting tutorial-project
as your default project.
If you don’t set a default project, you’ll need to provide the project name when you run commands by adding --project tutorial-project
to the command.
We can create a local directory to hold all of our files that we can later sync to the project.
ursactl init --dir tutorial-files
We’re using a different directory name than the project so it’s clear when we’re referencing the project and when we’re referencing the directory.
This will create a set of directories under the tutorial-files
directory that we can later sync to our project.
You can name it something else as long as you change the commands where we use tutorial-files
to be the name of the directory you are using.
Defining the Transforms
We need to define four transforms:
- Identity Seeder
- Random Choice Mapper
- Weighted Random Choice Mapper
- Pairing Reducer
We can use the Lua and Markdown support in the ursactl
tool to make this easier.
Because transforms are defined using Lua,
we can take advantage of syntax highlighting in our editors when using either.
Identity Seeder
The identity seeder doesn’t create any new information. We use it to create a generator that just inserts some static data for each of the entities we’re seeding into the dataset.
Let’s save the following into tutorial-files/data/transforms/identity-seeder.lua
.
--[[
# Identity Seeder
@type seeder
@path seeders/identity
The identity seeder transform lets us create seed generators
that insert static data into the dataset for each entity
created.
--]]
function seed(count)
return {}
end
Random Choice Mapper
The random choice mapper emits a single value for each match in the dataset.
Let’s save the following into tutorial-files/data/transforms/random-choice-mapper.lua
.
--[[
# Random Choice Mapper
@type mapper
@path mappers/random/choice
This mapping transform selects from a list of choices
with uniform probability.
--]]
function init(configuration)
choices = configuration["choices"]
end
function process(row)
return choices[math.random(#choices)]
end
configurationSchema = {
type = "object",
properties = {
choices = {
type = "array",
minItems = 1,
maxItems = 50,
items = {
type = "string",
minLength = 1,
maxLength = 255
}
}
}
}
This example shows how the configuration schema works. It’s just a JSON schema describing what the transform expects. The platform will check generator configurations against this schema.
Weighted Random Choice Mapper
The weighted random choice mapper emits a value at random based on relative weights of the choices.
Let’s save the following into tutorial-files/data/transforms/weighted-random-choice-mapper.md
.
---
type: mapper
path: mappers/random/weighted-choice
---
# Weighted Random Choice Transform
## Implementation
```lua
function init(configuration)
-- note that this only supports a dictionary for now
choices = configuration["choices"]
total_weight = 0
for k in pairs(choices) do
total_weight = total_weight + choices[k]
end
end
function process(row)
choice = math.random(total_weight)
for k in pairs(choices) do
w = choices[k]
if choice <= w then
return k
else
choice = choice - w
end
end
end
```
## Configuration Schema
```yaml
type: object
properties:
choices:
oneOf:
# a list of {item, weight} tuples
- type: array
minItems: 1
maxItems: 50
prefixItems:
- type: string
minLength: 1
maxLength: 255
- type: number
# a dictionary where the keys point to the weights
- type: object
minProperties: 1
maxProperties: 50
patternProperties:
"^.{1,255}$":
type: number
# or a dataset reference to a CSV file
# the "reference" points to the value
# the "weights" points to the weight
# this is the most likely to change
- type: object
required: [fromDatasetPath, reference, weight]
properties:
fromDatasetPath:
type: string
reference:
oneOf:
- type: string
- type: integer
weight:
oneOf:
- type: string
- type: integer
# or a reference to an RDF dataset
- type: object
required: [fromDatasetPath, query]
properties:
fromDatasetPath:
type: string
query:
type: string
```
Pairing Reducer
Finally, let’s see what a reducing transformation looks like. Such a transformation has three parts: an initializer, a row processor, and a finalizer.
The main difference between a reducer and a mapper is that a reducer can return any number of outputs for each row processed (even no outputs), and after all of the rows are processed, the pipeline calls the finalizer to handle any processing that has to happen after all of the rows are processed.
Let’s save the following into tutorial-files/data/transforms/pairing-reducer.lua
.
--[[
# Coupler Reducer
@type reducer
@path reducers/pairer
@input entity the subject of the record being paired
@input matchValue the value on which matches are based
@output entity1 one of the entities being paired
@output entity2 one of the entities being paired
Forms pairs of entities based on a matching property and a percentage chance that pairs
will be formed with the same value of the property vs. different values.
This transform produces zero or one pairs on each call to `process`.
--]]
configurationSchema = {
type = "object",
properties = {
sameValueMatchChance = {
type = "number",
minimum = 0.0,
maximum = 1.0
},
matchOnRowChance = {
type = "number",
minimum = 0.0,
maximum = 1.0
}
}
}
chance_for_same_value_match = 0.0
chance_for_match_made = 1.0
pending = {}
function init(configuration)
chance_for_same_value_match = configuration["sameValueMatchChance"]
chance_for_match_made = configuration["matchOnRowChance"]
end
function process(row)
table.insert(pending, row)
if chance_for_match_made > math.random() then
return find_match()
end
end
function finish()
-- for now, anyone left is considered unmatched
pairs = {}
pair = find_match()
while pair and #pair > 0 do
table.insert(pairs, pair)
pair = find_match()
end
if #pairs > 0 then
return pairs
end
end
function find_match()
-- look for a pair if we can find one
if #pending < 2 then return end
looking_for_matching_value = (chance_for_same_value_match > math.random())
if #pending == 2 then
if pending[1]["matchValue"] == pending[2]["matchValue"] then
if looking_for_matching_value then
val = {entity1=pending[1]["entity"], entity2=pending[2]["entity"]}
pending = {}
return val
end
return
end
val = {entity1=pending[1]["entity"], entity2=pending[2]["entity"]}
pending = {}
return val
end
-- look for a suitable pair from the back
selected = table.remove(pending)
for i=#pending,1,-1 do
matched = selected["matchValue"] == pending[i]["matchValue"]
if looking_for_matching_value == matched then
match = table.remove(pending, i)
return {entity1=selected["entity"], entity2=match["entity"]}
end
end
-- no suitable pair found - put the initial entity back
table.insert(pending, selected)
end
Defining the Seeders
We want two seeders: one to create things and another to create people. We’ll use the https://schema.org/ ontology since it’s simple and sufficient for what we’re doing.
The search engine industry created Schema.Org to make it easier for search engines to extract information from normal web pages. It was never designed for academic use. We’re using it here because it’s easy to use for illustrating how pipelines work.
For both seeders, we’ll use the identity seeder. This seeder doesn’t create any information. Instead, it takes the entity identifier and inserts it into the dataset using the configured template.
We’ll put all of our seeders into a single file, seperated by three dashes (-
).
This is how YAML files can contain multiple documents.
When ursactl
finds a YAML files with multiple documents, it treats it as if it were a set of different
files.
It will create a separate seeder for each document in the file.
Let’s put these in tutorial-files/data/generators/seeders.yaml
.
People Seeder
path: seeders/people
transform:
path: seeders/identity
configuration:
count: 10
load: "<<entity>> a <https://schema.org/Person> ."
You can change the count to something else, but the count here is just the default. We can override it later in the pipeline definition.
The provides
key is a Turtle template.
Each of the <<...>>
placeholders is replaced with the value matching the key.
Thing Seeder
Save the following content into the same file, separated by ---
on its own line.
path: seeders/things
transform:
path: seeders/identity
configuration:
count: 10
load: "<<entity>> a <https://schema.org/Thing> ."
This is the same as the people seeder except that it creates “Things” rather than “People.”
You seeder file should look like the following:
path: seeders/people
transform:
path: seeders/identity
configuration:
count: 10
load: "<<entity>> a <https://schema.org/Person> ."
---
path: seeders/things
transform:
path: seeders/identity
configuration:
count: 10
load: "<<entity>> a <https://schema.org/Thing> ."
Defining the Mappers
Mappers augment a dataset by running a query and inserting new triples for each result.
We want to create mappers that will give colors to things, genders to pepple, and then names to people based on their gender.
We’ll save all of the mappers to tutorial-files/data/generators/mappers.yaml
.
Remember to separate them by the three dashes (---
).
Coloring Things
We’re using the uniform random choice transform for this generator.
It selects from the choices randomly with equal weight.
The selection is available in the template as the value
.
path: mappers/things/color
extract: |
SELECT ?entity WHERE {
?entity a <https://schema.org/Thing> .
}
transform:
path: mappers/random/choice
configuration:
choices: [red, orange, yellow, green, blue, indigo, violet]
load: "<<entity>> <https://example.org/color> <<value>> ."
The requires
key is a SPARQL query that returns a match for each thing we want augmented with a color.
The keys and values in the SELECT
statement (here, ?entity
) will be available in the Turtle template.
Assigning a Gender
This uses the same uniform random choice transform as the thing colorer.
path: mappers/people/gender
extract: |
SELECT ?entity WHERE {
?entity a <https://schema.org/Person> .
}
transform:
path: mappers/random/choice
configuration:
choices: [male, female]
load: "<<entity>> <https://schema.org/gender> <<value>> ."
Assigning a Name
We have two generators for names: one for men and one for women. We’ve chosen to use the top 10 names from 1970-1979 in Minnesota.
path: mappers/people/names/female
extract: |
SELECT ?entity WHERE {
?entity a <https://schema.org/Person> ;
<https://schema.org/gender> "female" .
}
transform:
path: mappers/random/weighted-choice
configuration:
choices:
Jennifer: 30712
Amy: 15246
Michelle: 13294
Lisa: 12395
Melissa: 12230
Angela: 11898
Heather: 11873
Kimberly: 10895
Nicole: 9484
Kelly: 8721
load: "<<entity>> <https://schema.org/givenName> <<value>> ."
---
path: mappers/people/names/male
extract: |
SELECT ?entity WHERE {
?entity a <https://schema.org/Person> ;
<https://schema.org/gender> "male" .
}
transform:
path: mappers/random/weighted-choice
configuration:
choices:
Michael: 35138
Jason: 26413
David: 21812
Christopher: 20721
James: 20042
Robert: 19747
Matthew: 17540
John: 16659
Brian: 16605
Scott: 13412
load: "<<entity>> <https://schema.org/givenName> <<value>> ."
Defining the Reducers
Reducers augment a dataset by running a query and inserting new triples based on the data from the query. The reducer recieves each row and can respond with no data, a single row of data, or multiple rows.
We want to create a reducer that pairs people based on gender.
We’ll save all of the mappers to tutorial-files/data/generators/reducers.yaml
.
Pairing People
We want to pair people up somewhat randomly with a chance that people will be paired with the same gender. The default configuration will try to find a match 80% of the time when a new row is given to the reducer. If the reducer tries to find a match, it will try to find a same-gender match 50% of the time. We’ll see later in the pipeline how to make that chance configurable in the pipeline.
path: reducers/people/pairer
extract: |
SELECT ?entity ?matchValue WHERE {
?entity a <https://schema.org/Person> ;
<https://schema.org/gender> ?matchValue .
}
transform:
path: reducers/pairer
configuration:
matchOnRowChance: 0.8
sameValueMatchChance: 0.5
load: |
<<entity1>> <https://schema.org/spouse> <<entity2>> .
<<entity2>> <https://schema.org/spouse> <<entity1>> .
Defining the Pipeline
We want to build a pipeline that ties all of our generators together. In this example, we have the counts be parameters so we can generate different size datasets without having to define a new pipeline.
Save the following content into tutorial-files/data/pipelines/pipeline.yaml
.
seeders:
- generator:
path: seeders/things
configuration:
count: "{{thingsCount}}"
- generator:
path: seeders/people
configuration:
count: "{{peopleCount}}"
generators:
color-things:
generator:
path: mappers/things/color
name-men:
generator:
path: mappers/people/names/male
dependsOn: [assign-gender]
assign-gender:
generator:
path: mappers/people/gender
name-women:
generator:
path: mappers/people/names/female
dependsOn: [assign-gender]
pairing-people:
generator:
path: reducers/people/pairer
configuration:
sameValueMatchChance: "{{sameGenderMatchChance}}"
dependsOn: [assign-gender]
path: things-n-people
We set up our seeders and generators to create the data that we want. The platform will run all of the seeders in parallel. The generators may depend on each other, so we indicate how the generators depend on each other. This lets the platform order the generators and run as many as possible at the same time.
We overrode the configuration for the things seeder so that it creates twenty things. You can modify this count (as well as the count for people) to anything you like, though there is a limit of 10,000 for each seeder right now.
Uploading to the Platform
We want to upload the transforms and generators first because the platform will check that the they exist when we upload the pipeline.
The sync
function is smart enough to manage this for us.
It will sync transforms before generators, and generators before pipelines.
ursactl sync --dir tutorial-files
If you get errors during the sync process, rerun the command with the --dry-run
option to see what would be uploaded.
If there are missing resources, check the files defining those resources for any errors.
Running the Pipeline
Now that our generators and pipeline are uploaded to the platform, we can run the pipeline and see what we get. In this case, we’re creating twenty things and ten people.
ursactl run pipeline things-n-people --param "thingsCount=20" --param "peopleCount=10" --param "sameGenderMatchChance=0.05"
This will print out some information showing a run id and a job status of queued
.
Once the pipeline run is completed, we’ll be able to see it in the list of datasets (ursactl list datasets
).
Pipeline runs store datasets with a path that is the same as the pipeline path extended with
the date and time that the run ended.
Assuming that the date and time is in TIMESTAMP
, then we can download the generated data.
ursactl get dataset things-n-people/$TIMESTAMP things-and-people.ttl
This will download the contents into the local file things-and-people.ttl
.
It should look something like the following.
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
<entity:1>
a <https://schema.org/Thing> ;
<https://example.org/color> "indigo" .
<entity:10>
a <https://schema.org/Thing> ;
<https://example.org/color> "violet" .
<entity:21>
a <https://schema.org/Person> ;
<https://schema.org/gender> "male" ;
<https://schema.org/givenName> "Brian" .
<entity:22>
a <https://schema.org/Person> ;
<https://schema.org/gender> "female" ;
<https://schema.org/givenName> "Lisa" .
Sweeping the Pipeline
If you want to see how changing the sameGenderMatchChance
configuration value changes the dataset,
you can sweep the value through a list:
ursactl run pipeline things-n-people \
--param "thingsCount=10" \
--param "peopleCount=500" \
--sweep-param "sameGenderMatchChance=[0.0, 0.05, 0.10, 0.20, 0.40, 0.60, 0.80, 0.90, 0.95, 1.0]"
This will run the pipeline ten times, once for each value of sameGenderMatchChance
.