Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Create and Run a Simple Pipeline

In this tutorial, we’ll see how we can create a set of generators and use them to synthesize a simple dataset with things that have color and people that have gender and first names.

We’ll be using the command line interface (CLI). Install and configure the CLI if you haven’t yet.

We’re going to create a set of things that have a random color and a set of people with a random gender and name. We want the name to depend on the person’s gender, so we need two generators: one for male names and one for female names. We want the genders and colors to be equally likely, but we want the names to be selected based on popularity.

Before we jump into creating things, let’s sketch out what we want to do. To run a pipeline, we need a set of seeders and generators. For each generator, we need a transform.

The result looks something like the following:

%3 Pipeline Pipeline PeopleSeeder PeopleSeeder Pipeline->PeopleSeeder ThingSeeder ThingSeeder Pipeline->ThingSeeder ColorThings ColorThings Pipeline->ColorThings GenderPeople GenderPeople Pipeline->GenderPeople NameMalePeople NameMalePeople Pipeline->NameMalePeople NameFemalePeople NameFemalePeople Pipeline->NameFemalePeople PairPeople PairPeople Pipeline->PairPeople PeopleSeeder->GenderPeople IdentitySeeder IdentitySeeder PeopleSeeder->IdentitySeeder ThingSeeder->ColorThings ThingSeeder->IdentitySeeder RandomChoiceMapper RandomChoiceMapper ColorThings->RandomChoiceMapper GenderPeople->NameMalePeople GenderPeople->NameFemalePeople GenderPeople->PairPeople GenderPeople->RandomChoiceMapper WeightedRandomChoiceMapper WeightedRandomChoiceMapper NameMalePeople->WeightedRandomChoiceMapper NameFemalePeople->WeightedRandomChoiceMapper PairingReducer PairingReducer PairPeople->PairingReducer

In this diagram, the stronger lines show definition dependencies (e.g., pipelines depend on generators), and the weaker lines show sequencing dependencies (e.g., ThingSeeder has to run before ColorThings).

Because of the way the platform works, pipelines can share generators, and generators can share transforms. So we can build up a library of transforms and generators as we develop pipelines.

For this tutorial, let’s create a new project named tutorial-project:

ursactl create project tutorial-project

See “Setting a Default Project” for setting tutorial-project as your default project. If you don’t set a default project, you’ll need to provide the project name when you run commands by adding --project tutorial-project to the command.

We can create a local directory to hold all of our files that we can later sync to the project.

ursactl init --dir tutorial-files

We’re using a different directory name than the project so it’s clear when we’re referencing the project and when we’re referencing the directory.

This will create a set of directories under the tutorial-files directory that we can later sync to our project. You can name it something else as long as you change the commands where we use tutorial-files to be the name of the directory you are using.

Defining the Transforms

We need to define four transforms:

  • Identity Seeder
  • Random Choice Mapper
  • Weighted Random Choice Mapper
  • Pairing Reducer

We can use the Lua and Markdown support in the ursactl tool to make this easier. Because transforms are defined using Lua, we can take advantage of syntax highlighting in our editors when using either.

Identity Seeder

The identity seeder doesn’t create any new information. We use it to create a generator that just inserts some static data for each of the entities we’re seeding into the dataset.

Let’s save the following into tutorial-files/data/transforms/identity-seeder.lua.

--[[
# Identity Seeder

@type seeder
@path seeders/identity

The identity seeder transform lets us create seed generators
that insert static data into the dataset for each entity
created.

--]]

function seed(count)
  return {}
end

Random Choice Mapper

The random choice mapper emits a single value for each match in the dataset.

Let’s save the following into tutorial-files/data/transforms/random-choice-mapper.lua.

--[[
# Random Choice Mapper

@type mapper
@path mappers/random/choice

This mapping transform selects from a list of choices
with uniform probability.

--]]

function init(configuration)
    choices = configuration["choices"]
end

function process(row)
    return choices[math.random(#choices)]
end

configurationSchema = {
    type = "object",
    properties = {
        choices = {
            type = "array",
            minItems = 1,
            maxItems = 50,
            items = {
                type = "string",
                minLength = 1,
                maxLength = 255
            }
        }
    }
}

This example shows how the configuration schema works. It’s just a JSON schema describing what the transform expects. The platform will check generator configurations against this schema.

Weighted Random Choice Mapper

The weighted random choice mapper emits a value at random based on relative weights of the choices.

Let’s save the following into tutorial-files/data/transforms/weighted-random-choice-mapper.md.

---
type: mapper
path: mappers/random/weighted-choice
---
# Weighted Random Choice Transform

## Implementation

```lua
function init(configuration)
    -- note that this only supports a dictionary for now
    choices = configuration["choices"]
    total_weight = 0
    for k in pairs(choices) do
        total_weight = total_weight + choices[k]
    end
end

function process(row)
    choice = math.random(total_weight)
    for k in pairs(choices) do
        w = choices[k]
        if choice <= w then
            return k
        else
            choice = choice - w
        end
    end
end
```

## Configuration Schema

```yaml
type: object
properties:
  choices:
    oneOf:
      # a list of {item, weight} tuples
      - type: array
        minItems: 1
        maxItems: 50
        prefixItems:
          - type: string
            minLength: 1
            maxLength: 255
          - type: number
      # a dictionary where the keys point to the weights
      - type: object
        minProperties: 1
        maxProperties: 50
        patternProperties:
          "^.{1,255}$":
            type: number
      # or a dataset reference to a CSV file
      # the "reference" points to the value
      # the "weights" points to the weight
      # this is the most likely to change
      - type: object
        required: [fromDatasetPath, reference, weight]
        properties:
          fromDatasetPath:
            type: string
          reference:
            oneOf:
              - type: string
              - type: integer
          weight:
            oneOf:
              - type: string
              - type: integer
      # or a reference to an RDF dataset
      - type: object
        required: [fromDatasetPath, query]
        properties:
          fromDatasetPath:
            type: string
          query:
            type: string
```

Pairing Reducer

Finally, let’s see what a reducing transformation looks like. Such a transformation has three parts: an initializer, a row processor, and a finalizer.

The main difference between a reducer and a mapper is that a reducer can return any number of outputs for each row processed (even no outputs), and after all of the rows are processed, the pipeline calls the finalizer to handle any processing that has to happen after all of the rows are processed.

Let’s save the following into tutorial-files/data/transforms/pairing-reducer.lua.

--[[
# Coupler Reducer

@type reducer
@path reducers/pairer
@input entity the subject of the record being paired
@input matchValue the value on which matches are based
@output entity1 one of the entities being paired
@output entity2 one of the entities being paired

Forms pairs of entities based on a matching property and a percentage chance that pairs
will be formed with the same value of the property vs. different values.

This transform produces zero or one pairs on each call to `process`.

--]]

configurationSchema = {
    type = "object",
    properties = {
        sameValueMatchChance = {
            type = "number",
            minimum = 0.0,
            maximum = 1.0
        },
        matchOnRowChance = {
            type = "number",
            minimum = 0.0,
            maximum = 1.0
        }
    }
}

chance_for_same_value_match = 0.0
chance_for_match_made = 1.0
pending = {}

function init(configuration)
    chance_for_same_value_match = configuration["sameValueMatchChance"]
    chance_for_match_made = configuration["matchOnRowChance"]
end

function process(row)
    table.insert(pending, row)
    if chance_for_match_made > math.random() then
        return find_match()
    end
end

function finish()
    -- for now, anyone left is considered unmatched
    pairs = {}
    pair = find_match()
    while pair and #pair > 0 do
        table.insert(pairs, pair)
        pair = find_match()
    end
    if #pairs > 0 then
        return pairs
    end
end

function find_match()
    -- look for a pair if we can find one
    if #pending < 2 then return end

    looking_for_matching_value = (chance_for_same_value_match > math.random())

    if #pending == 2 then
        if pending[1]["matchValue"] == pending[2]["matchValue"] then
            if looking_for_matching_value then
                val = {entity1=pending[1]["entity"], entity2=pending[2]["entity"]}
                pending = {}
                return val
            end
            return
        end
        val = {entity1=pending[1]["entity"], entity2=pending[2]["entity"]}
        pending = {}
        return val
    end

    -- look for a suitable pair from the back
    selected = table.remove(pending)
    for i=#pending,1,-1 do
        matched = selected["matchValue"] == pending[i]["matchValue"]
        if looking_for_matching_value == matched then
            match = table.remove(pending, i)
            return {entity1=selected["entity"], entity2=match["entity"]}
        end
    end
    -- no suitable pair found - put the initial entity back
    table.insert(pending, selected)
end

Defining the Seeders

We want two seeders: one to create things and another to create people. We’ll use the https://schema.org/ ontology since it’s simple and sufficient for what we’re doing.

The search engine industry created Schema.Org to make it easier for search engines to extract information from normal web pages. It was never designed for academic use. We’re using it here because it’s easy to use for illustrating how pipelines work.

For both seeders, we’ll use the identity seeder. This seeder doesn’t create any information. Instead, it takes the entity identifier and inserts it into the dataset using the configured template.

We’ll put all of our seeders into a single file, seperated by three dashes (-). This is how YAML files can contain multiple documents. When ursactl finds a YAML files with multiple documents, it treats it as if it were a set of different files. It will create a separate seeder for each document in the file.

Let’s put these in tutorial-files/data/generators/seeders.yaml.

People Seeder

path: seeders/people
transform:
  path: seeders/identity
  configuration:
    count: 10
load: "<<entity>> a <https://schema.org/Person> ."

You can change the count to something else, but the count here is just the default. We can override it later in the pipeline definition.

The provides key is a Turtle template. Each of the <<...>> placeholders is replaced with the value matching the key.

Thing Seeder

Save the following content into the same file, separated by --- on its own line.

path: seeders/things
transform:
  path: seeders/identity
  configuration:
    count: 10
load: "<<entity>> a <https://schema.org/Thing> ."

This is the same as the people seeder except that it creates “Things” rather than “People.”

You seeder file should look like the following:

path: seeders/people
transform:
  path: seeders/identity
  configuration:
    count: 10
load: "<<entity>> a <https://schema.org/Person> ."
---
path: seeders/things
transform:
  path: seeders/identity
  configuration:
    count: 10
load: "<<entity>> a <https://schema.org/Thing> ."

Defining the Mappers

Mappers augment a dataset by running a query and inserting new triples for each result.

We want to create mappers that will give colors to things, genders to pepple, and then names to people based on their gender.

We’ll save all of the mappers to tutorial-files/data/generators/mappers.yaml. Remember to separate them by the three dashes (---).

Coloring Things

We’re using the uniform random choice transform for this generator. It selects from the choices randomly with equal weight. The selection is available in the template as the value.

path: mappers/things/color
extract: |
  SELECT ?entity WHERE {
    ?entity a <https://schema.org/Thing> .
  }
transform:
  path: mappers/random/choice
  configuration:
    choices: [red, orange, yellow, green, blue, indigo, violet]
load: "<<entity>> <https://example.org/color> <<value>> ."

The requires key is a SPARQL query that returns a match for each thing we want augmented with a color. The keys and values in the SELECT statement (here, ?entity) will be available in the Turtle template.

Assigning a Gender

This uses the same uniform random choice transform as the thing colorer.

path: mappers/people/gender
extract: |
  SELECT ?entity WHERE {
    ?entity a <https://schema.org/Person> .
  }
transform:
  path: mappers/random/choice
  configuration:
    choices: [male, female]
load: "<<entity>> <https://schema.org/gender> <<value>> ."

Assigning a Name

We have two generators for names: one for men and one for women. We’ve chosen to use the top 10 names from 1970-1979 in Minnesota.

path: mappers/people/names/female
extract: |
  SELECT ?entity WHERE {
    ?entity a <https://schema.org/Person> ;
            <https://schema.org/gender> "female" .
  }
transform:
  path: mappers/random/weighted-choice
  configuration:
    choices:
      Jennifer: 30712
      Amy: 15246
      Michelle: 13294
      Lisa: 12395
      Melissa: 12230
      Angela: 11898
      Heather: 11873
      Kimberly: 10895
      Nicole: 9484
      Kelly: 8721
load: "<<entity>> <https://schema.org/givenName> <<value>> ."
---
path: mappers/people/names/male
extract: |
  SELECT ?entity WHERE {
    ?entity a <https://schema.org/Person> ;
            <https://schema.org/gender> "male" .
  }
transform:
  path: mappers/random/weighted-choice
  configuration:
    choices:
      Michael: 35138
      Jason: 26413
      David: 21812
      Christopher: 20721
      James: 20042
      Robert: 19747
      Matthew: 17540
      John: 16659
      Brian: 16605
      Scott: 13412
load: "<<entity>> <https://schema.org/givenName> <<value>> ."

Defining the Reducers

Reducers augment a dataset by running a query and inserting new triples based on the data from the query. The reducer recieves each row and can respond with no data, a single row of data, or multiple rows.

We want to create a reducer that pairs people based on gender.

We’ll save all of the mappers to tutorial-files/data/generators/reducers.yaml.

Pairing People

We want to pair people up somewhat randomly with a chance that people will be paired with the same gender. The default configuration will try to find a match 80% of the time when a new row is given to the reducer. If the reducer tries to find a match, it will try to find a same-gender match 50% of the time. We’ll see later in the pipeline how to make that chance configurable in the pipeline.

path: reducers/people/pairer
extract: |
  SELECT ?entity ?matchValue WHERE {
    ?entity a <https://schema.org/Person> ;
            <https://schema.org/gender> ?matchValue .
  }
transform:
  path: reducers/pairer
  configuration:
    matchOnRowChance: 0.8
    sameValueMatchChance: 0.5
load: |
  <<entity1>> <https://schema.org/spouse> <<entity2>> .
  <<entity2>> <https://schema.org/spouse> <<entity1>> .

Defining the Pipeline

We want to build a pipeline that ties all of our generators together. In this example, we have the counts be parameters so we can generate different size datasets without having to define a new pipeline.

Save the following content into tutorial-files/data/pipelines/pipeline.yaml.


seeders:
  - generator:
      path: seeders/things
      configuration:
        count: "{{thingsCount}}"
  - generator:
      path: seeders/people
      configuration:
        count: "{{peopleCount}}"
generators:
  color-things:
    generator:
      path: mappers/things/color
  name-men:
    generator:
      path: mappers/people/names/male
    dependsOn: [assign-gender]
  assign-gender:
    generator:
      path: mappers/people/gender
  name-women:
    generator:
      path: mappers/people/names/female
    dependsOn: [assign-gender]
  pairing-people:
    generator:
      path: reducers/people/pairer
      configuration:
        sameValueMatchChance: "{{sameGenderMatchChance}}"
    dependsOn: [assign-gender]
path: things-n-people

We set up our seeders and generators to create the data that we want. The platform will run all of the seeders in parallel. The generators may depend on each other, so we indicate how the generators depend on each other. This lets the platform order the generators and run as many as possible at the same time.

We overrode the configuration for the things seeder so that it creates twenty things. You can modify this count (as well as the count for people) to anything you like, though there is a limit of 10,000 for each seeder right now.

Uploading to the Platform

We want to upload the transforms and generators first because the platform will check that the they exist when we upload the pipeline. The sync function is smart enough to manage this for us. It will sync transforms before generators, and generators before pipelines.

ursactl sync --dir tutorial-files

If you get errors during the sync process, rerun the command with the --dry-run option to see what would be uploaded. If there are missing resources, check the files defining those resources for any errors.

Running the Pipeline

Now that our generators and pipeline are uploaded to the platform, we can run the pipeline and see what we get. In this case, we’re creating twenty things and ten people.

ursactl run pipeline things-n-people --param "thingsCount=20" --param "peopleCount=10" --param "sameGenderMatchChance=0.05"

This will print out some information showing a run id and a job status of queued.

Once the pipeline run is completed, we’ll be able to see it in the list of datasets (ursactl list datasets). Pipeline runs store datasets with a path that is the same as the pipeline path extended with the date and time that the run ended.

Assuming that the date and time is in TIMESTAMP, then we can download the generated data.

ursactl get dataset things-n-people/$TIMESTAMP things-and-people.ttl

This will download the contents into the local file things-and-people.ttl.

It should look something like the following.

@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

<entity:1>
    a <https://schema.org/Thing> ;
    <https://example.org/color> "indigo" .

<entity:10>
    a <https://schema.org/Thing> ;
    <https://example.org/color> "violet" .

<entity:21>
    a <https://schema.org/Person> ;
    <https://schema.org/gender> "male" ;
    <https://schema.org/givenName> "Brian" .

<entity:22>
    a <https://schema.org/Person> ;
    <https://schema.org/gender> "female" ;
    <https://schema.org/givenName> "Lisa" .

Sweeping the Pipeline

If you want to see how changing the sameGenderMatchChance configuration value changes the dataset, you can sweep the value through a list:

ursactl run pipeline things-n-people \
  --param "thingsCount=10" \
  --param "peopleCount=500" \
  --sweep-param "sameGenderMatchChance=[0.0, 0.05, 0.10, 0.20, 0.40, 0.60, 0.80, 0.90, 0.95, 1.0]"

This will run the pipeline ten times, once for each value of sameGenderMatchChance.