Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Sweeping a Pipeline

In this tutorial, we’ll see how to run a pipeline sweep using a Python script and how to pull down the resulting data for analysis.

Prerequisites

You will need recent versions of ursactl and rdflib installed. You can install them with pip.

This tutorial depends on the transforms and generators created in the Create and Run a Simple Pipeline tutorial. We recommend you complete that tutorial before starting this one. That tutorial will walk you through setting up a project, creating local files defining the pipeline and its components, and uploading those definitions to a project in the platform.

Planning the Sweep

We will be running two pipelines: the first to create a base population of people and a second to pair them up into couples. This minimizes the number of triples we have to create and lets us change only one variable at a time.

We will start by assigning gender to each of 1,000, 2,000, and 4,000 people. Then we’ll take the resulting dataset and use it as the seed for the second pipeline where we pair people into couples.

Creating the Pipelines

We will use two different pipelines using generators from the Create and Run a Simple Pipeline tutorial.


seeders:
  - generator:
      path: seeders/people
      configuration:
        count: "{{peopleCount}}"
generators:
  assign-gender:
    generator:
      path: mappers/people/gender
path: people/base
---
seeders:
  - dataset:
      path: "{{dataset}}"
      configuration: {}
generators:
  pairing-people:
    generator:
      path: reducers/people/pairer
      configuration:
        sameValueMatchChance: "{{sameGenderMatchChance}}"
path: people/pairer

Creating the Python Sweep Script

We will use the Python library to run the sweep. Rather than give you the whole script up front, let’s start simple and expand.

Run a single sweep and wait for completion

We’ll start by running a single sweep and waiting for it to complete.

When we construct a Project() object without any parameters, it will use the default project from the ursactl configuration file. We can use the project object to find resources in the project. In this example, we’re using the people/base pipeline.

Class Attribute Description
Pipeline sweep() runs a pipeline sweep on the server, returning an object representing the sweep
Project pipeline() creates a Pipeline object representing the pipeline on the platform
from ursactl.core.project import Project


def run():
    base_pipeline = Project().pipeline('people/base')

    base_parameters = {'peopleCount': [1000, 2000, 4000]}

    # build our base populations
    with base_pipeline.sweep(sweep_parameters=base_parameters) as base_sweep:
        print("Base sweep complete.\n")

        run_pairing_pipeline(base_sweep)


# next step in the tutorial...
def run_pairing_pipeline(base_sweep):
    pass


if __name__ == '__main__':
    run()

This is all we need to run a pipeline sweep and wait for it to complete. We’ll see how to use the results in the next section.

Use the datasets to run a second sweep

Once we’ve generated our base datasets, we can use those to run another sweep with the people/pairer pipeline.

We can update our script by adding a body to the run_pairing_pipeline function. This will kick off a second sweep over the datasets we just generated as well as the values of sameGenderMatchChance that we want to try.

Class Attribute Description
Dataset path the path of the dataset in the project
PipelineRun dataset the dataset resulting from the pipeline run, if the run is completed
PipelineRun parameters the parameters used to create the pipeline run
PipelineSweep pipeline_runs the list of pipeline runs that are part of the sweep
def run_pairing_pipeline(base_sweep):
    pairer_pipeline = Project().pipeline('people/pairer')

    # track which dataset has how many people
    counts = {
        run.dataset.path: run.parameters['peopleCount']
        for run in base_sweep.pipeline_runs}

    pairer_parameters = {
        'dataset': list(counts.keys()),
        'sameGenderMatchChance': [0.0, 0.25, 0.5, 0.75, 1.0]}

    # pair our populations
    with pairer_pipeline.sweep(sweep_parameters=pairer_parameters) as pairer_sweep:
        print("Pairer sweep complete.\n")

        analyze_results(pairer_sweep, counts)


def analyze_results(pairer_sweep, counts):
    pass

The pipeline_runs property of the sweep object gives us a list of the runs that were created. Each run has a dataset property that gives us the dataset that was created. We can use that to create the parameters for the second sweep.

Each run also has the parameters that were used to create it. This lets us associate the number of people in the dataset with the dataset path. We want to use that later when we analyze the results of the second sweep.

We’ll see how to analyze the results in the next section.

Analyze the results

Once we have the second sweep complete, we want to download the results and analyze them.

We can update our script by adding a body to the analyze_results function. This will download the results of the second sweep, convert them to RDF graph objects, and analyze them.

Class Attribute Description
Dataset content the content of the dataset
Dataset content_type the MIME type of the dataset
from rdflib import Graph

def analyze_results(pairer_sweep, counts):
    results = []
    for run in pairer_sweep.pipeline_runs:
        count = counts[run.parameters['dataset']]
        chance = run.parameters['sameGenderMatchChance']
        same, different = analyze_dataset(run.dataset)
        results.append([
            chance * 100,
            count,
            round(100 * same / count, 1),
            round(100 * different / count, 1),
            round(100 * (count - same - different) / count, 1)])

    print('| Chance | Count | Same    | Diff    | None    |')
    print('| ------ | ----- | ------- | ------- | ------- |')

    for row in sorted(results):
        print('| {:5}% | {:5} | {:6}% | {:6}% | {:6}% |'.format(*row))


def analyze_dataset(dataset):
    g = Graph()
    g.parse(data=dataset.content, format=dataset.content_type)
    same = 0
    different = 0
    for row in g.query("""
            SELECT ?gender1 ?gender2 WHERE {
                ?entity1 <https://schema.org/spouse> ?entity2 .
                ?entity1 <https://schema.org/gender> ?gender1 .
                ?entity2 <https://schema.org/gender> ?gender2 .
            }
            """):
        if row.gender1 == row.gender2:
            same += 1
        else:
            different += 1
    return same, different

Running the script will produce a table similar to the following:

| Chance | Count | Same    | Diff    | None    |
| ------ | ----- | ------- | ------- | ------- |
|   0.0% |  1000 |    0.0% |   99.0% |    1.0% |
|   0.0% |  2000 |    0.0% |   97.3% |    2.7% |
|   0.0% |  4000 |    0.0% |   99.5% |    0.4% |
|  25.0% |  1000 |   27.2% |   71.8% |    1.0% |
|  25.0% |  2000 |   24.5% |   75.2% |    0.3% |
|  25.0% |  4000 |   24.7% |   75.2% |    0.1% |
|  50.0% |  1000 |   43.8% |   55.8% |    0.4% |
|  50.0% |  2000 |   44.3% |   55.4% |    0.3% |
|  50.0% |  4000 |   44.5% |   55.8% |    0.0% |
|  75.0% |  1000 |   63.4% |   36.6% |    0.0% |
|  75.0% |  2000 |   57.8% |   42.0% |    0.2% |
|  75.0% |  4000 |   59.8% |   40.1% |    0.1% |
| 100.0% |  1000 |   66.2% |   33.8% |    0.0% |
| 100.0% |  2000 |   68.4% |   31.6% |    0.0% |
| 100.0% |  4000 |   67.7% |   32.2% |    0.1% |

Next Steps

This tutorial showed how to use the Python library to run a pipeline sweep and analyze the results. You can use this framework to run more complex sweeps and analyze the results. For example, if you are developing a transform or a generator, you can use this framework to run a sweep over a range of parameters to see how the results change as part of the validation process.

Complete Script

Here’s the complete script.

from ursactl.core.project import Project
from rdflib import Graph


def run():
    base_pipeline = Project().pipeline('people/base')

    base_parameters = {'peopleCount': [1000, 2000, 4000]}

    # build our base populations
    with base_pipeline.sweep(sweep_parameters=base_parameters) as base_sweep:
        print("Base sweep complete.\n")

        run_pairing_pipeline(base_sweep)


def run_pairing_pipeline(base_sweep):
    pairer_pipeline = Project().pipeline('people/pairer')

    # track which dataset has how many people
    counts = {
        run.dataset.path: run.parameters['peopleCount']
        for run in base_sweep.pipeline_runs}

    pairer_parameters = {
        'dataset': list(counts.keys()),
        'sameGenderMatchChance': [0.0, 0.25, 0.5, 0.75, 1.0]}

    # pair our populations
    with pairer_pipeline.sweep(sweep_parameters=pairer_parameters) as pairer_sweep:
        print("Pairer sweep complete.\n")

        analyze_results(pairer_sweep, counts)


def analyze_results(pairer_sweep, counts):
    results = []
    for run in pairer_sweep.pipeline_runs:
        count = counts[run.parameters['dataset']]
        chance = run.parameters['sameGenderMatchChance']
        same, different = analyze_dataset(run.dataset)
        results.append([
            chance * 100,
            count,
            round(100 * same / count, 2),
            round(100 * different / count, 2),
            round(100 * (count - same - different) / count, 2)])

    print('| Chance | Count | Same    | Diff    | None    |')
    print('| ------ | ----- | ------- | ------- | ------- |')

    for row in sorted(results):
        print('| {:5}% | {:5} | {:6}% | {:6}% | {:6}% |'.format(*row))


def analyze_dataset(dataset):
    g = Graph()
    g.parse(data=dataset.content, format=dataset.content_type)
    same = 0
    different = 0
    for row in g.query("""
            SELECT ?gender1 ?gender2 WHERE {
                ?entity1 <https://schema.org/spouse> ?entity2 .
                ?entity1 <https://schema.org/gender> ?gender1 .
                ?entity2 <https://schema.org/gender> ?gender2 .
            }
            """):
        if row.gender1 == row.gender2:
            same += 1
        else:
            different += 1
    return same, different


if __name__ == '__main__':
    run()