Sweeping a Pipeline
In this tutorial, we’ll see how to run a pipeline sweep using a Python script and how to pull down the resulting data for analysis.
Prerequisites
You will need recent versions of ursactl
and rdflib
installed.
You can install them with pip
.
This tutorial depends on the transforms and generators created in the Create and Run a Simple Pipeline tutorial. We recommend you complete that tutorial before starting this one. That tutorial will walk you through setting up a project, creating local files defining the pipeline and its components, and uploading those definitions to a project in the platform.
Planning the Sweep
We will be running two pipelines: the first to create a base population of people and a second to pair them up into couples. This minimizes the number of triples we have to create and lets us change only one variable at a time.
We will start by assigning gender to each of 1,000, 2,000, and 4,000 people. Then we’ll take the resulting dataset and use it as the seed for the second pipeline where we pair people into couples.
Creating the Pipelines
We will use two different pipelines using generators from the Create and Run a Simple Pipeline tutorial.
seeders:
- generator:
path: seeders/people
configuration:
count: "{{peopleCount}}"
generators:
assign-gender:
generator:
path: mappers/people/gender
path: people/base
---
seeders:
- dataset:
path: "{{dataset}}"
configuration: {}
generators:
pairing-people:
generator:
path: reducers/people/pairer
configuration:
sameValueMatchChance: "{{sameGenderMatchChance}}"
path: people/pairer
Creating the Python Sweep Script
We will use the Python library to run the sweep. Rather than give you the whole script up front, let’s start simple and expand.
Run a single sweep and wait for completion
We’ll start by running a single sweep and waiting for it to complete.
When we construct a Project()
object without any parameters,
it will use the default project from the ursactl
configuration file.
We can use the project object to find resources in the project.
In this example, we’re using the people/base
pipeline.
Class | Attribute | Description |
---|---|---|
Pipeline | sweep() | runs a pipeline sweep on the server, returning an object representing the sweep |
Project | pipeline() | creates a Pipeline object representing the pipeline on the platform |
from ursactl.core.project import Project
def run():
base_pipeline = Project().pipeline('people/base')
base_parameters = {'peopleCount': [1000, 2000, 4000]}
# build our base populations
with base_pipeline.sweep(sweep_parameters=base_parameters) as base_sweep:
print("Base sweep complete.\n")
run_pairing_pipeline(base_sweep)
# next step in the tutorial...
def run_pairing_pipeline(base_sweep):
pass
if __name__ == '__main__':
run()
This is all we need to run a pipeline sweep and wait for it to complete. We’ll see how to use the results in the next section.
Use the datasets to run a second sweep
Once we’ve generated our base datasets, we can use those to run another sweep with the people/pairer
pipeline.
We can update our script by adding a body to the run_pairing_pipeline
function.
This will kick off a second sweep over the datasets we just generated
as well as the values of sameGenderMatchChance
that we want to try.
Class | Attribute | Description |
---|---|---|
Dataset | path | the path of the dataset in the project |
PipelineRun | dataset | the dataset resulting from the pipeline run, if the run is completed |
PipelineRun | parameters | the parameters used to create the pipeline run |
PipelineSweep | pipeline_runs | the list of pipeline runs that are part of the sweep |
def run_pairing_pipeline(base_sweep):
pairer_pipeline = Project().pipeline('people/pairer')
# track which dataset has how many people
counts = {
run.dataset.path: run.parameters['peopleCount']
for run in base_sweep.pipeline_runs}
pairer_parameters = {
'dataset': list(counts.keys()),
'sameGenderMatchChance': [0.0, 0.25, 0.5, 0.75, 1.0]}
# pair our populations
with pairer_pipeline.sweep(sweep_parameters=pairer_parameters) as pairer_sweep:
print("Pairer sweep complete.\n")
analyze_results(pairer_sweep, counts)
def analyze_results(pairer_sweep, counts):
pass
The pipeline_runs
property of the sweep
object gives us a list of the runs that were created.
Each run has a dataset
property that gives us the dataset that was created.
We can use that to create the parameters for the second sweep.
Each run also has the parameters that were used to create it. This lets us associate the number of people in the dataset with the dataset path. We want to use that later when we analyze the results of the second sweep.
We’ll see how to analyze the results in the next section.
Analyze the results
Once we have the second sweep complete, we want to download the results and analyze them.
We can update our script by adding a body to the analyze_results
function.
This will download the results of the second sweep,
convert them to RDF graph objects,
and analyze them.
Class | Attribute | Description |
---|---|---|
Dataset | content | the content of the dataset |
Dataset | content_type | the MIME type of the dataset |
from rdflib import Graph
def analyze_results(pairer_sweep, counts):
results = []
for run in pairer_sweep.pipeline_runs:
count = counts[run.parameters['dataset']]
chance = run.parameters['sameGenderMatchChance']
same, different = analyze_dataset(run.dataset)
results.append([
chance * 100,
count,
round(100 * same / count, 1),
round(100 * different / count, 1),
round(100 * (count - same - different) / count, 1)])
print('| Chance | Count | Same | Diff | None |')
print('| ------ | ----- | ------- | ------- | ------- |')
for row in sorted(results):
print('| {:5}% | {:5} | {:6}% | {:6}% | {:6}% |'.format(*row))
def analyze_dataset(dataset):
g = Graph()
g.parse(data=dataset.content, format=dataset.content_type)
same = 0
different = 0
for row in g.query("""
SELECT ?gender1 ?gender2 WHERE {
?entity1 <https://schema.org/spouse> ?entity2 .
?entity1 <https://schema.org/gender> ?gender1 .
?entity2 <https://schema.org/gender> ?gender2 .
}
"""):
if row.gender1 == row.gender2:
same += 1
else:
different += 1
return same, different
Running the script will produce a table similar to the following:
| Chance | Count | Same | Diff | None |
| ------ | ----- | ------- | ------- | ------- |
| 0.0% | 1000 | 0.0% | 99.0% | 1.0% |
| 0.0% | 2000 | 0.0% | 97.3% | 2.7% |
| 0.0% | 4000 | 0.0% | 99.5% | 0.4% |
| 25.0% | 1000 | 27.2% | 71.8% | 1.0% |
| 25.0% | 2000 | 24.5% | 75.2% | 0.3% |
| 25.0% | 4000 | 24.7% | 75.2% | 0.1% |
| 50.0% | 1000 | 43.8% | 55.8% | 0.4% |
| 50.0% | 2000 | 44.3% | 55.4% | 0.3% |
| 50.0% | 4000 | 44.5% | 55.8% | 0.0% |
| 75.0% | 1000 | 63.4% | 36.6% | 0.0% |
| 75.0% | 2000 | 57.8% | 42.0% | 0.2% |
| 75.0% | 4000 | 59.8% | 40.1% | 0.1% |
| 100.0% | 1000 | 66.2% | 33.8% | 0.0% |
| 100.0% | 2000 | 68.4% | 31.6% | 0.0% |
| 100.0% | 4000 | 67.7% | 32.2% | 0.1% |
Next Steps
This tutorial showed how to use the Python library to run a pipeline sweep and analyze the results. You can use this framework to run more complex sweeps and analyze the results. For example, if you are developing a transform or a generator, you can use this framework to run a sweep over a range of parameters to see how the results change as part of the validation process.
Complete Script
Here’s the complete script.
from ursactl.core.project import Project
from rdflib import Graph
def run():
base_pipeline = Project().pipeline('people/base')
base_parameters = {'peopleCount': [1000, 2000, 4000]}
# build our base populations
with base_pipeline.sweep(sweep_parameters=base_parameters) as base_sweep:
print("Base sweep complete.\n")
run_pairing_pipeline(base_sweep)
def run_pairing_pipeline(base_sweep):
pairer_pipeline = Project().pipeline('people/pairer')
# track which dataset has how many people
counts = {
run.dataset.path: run.parameters['peopleCount']
for run in base_sweep.pipeline_runs}
pairer_parameters = {
'dataset': list(counts.keys()),
'sameGenderMatchChance': [0.0, 0.25, 0.5, 0.75, 1.0]}
# pair our populations
with pairer_pipeline.sweep(sweep_parameters=pairer_parameters) as pairer_sweep:
print("Pairer sweep complete.\n")
analyze_results(pairer_sweep, counts)
def analyze_results(pairer_sweep, counts):
results = []
for run in pairer_sweep.pipeline_runs:
count = counts[run.parameters['dataset']]
chance = run.parameters['sameGenderMatchChance']
same, different = analyze_dataset(run.dataset)
results.append([
chance * 100,
count,
round(100 * same / count, 2),
round(100 * different / count, 2),
round(100 * (count - same - different) / count, 2)])
print('| Chance | Count | Same | Diff | None |')
print('| ------ | ----- | ------- | ------- | ------- |')
for row in sorted(results):
print('| {:5}% | {:5} | {:6}% | {:6}% | {:6}% |'.format(*row))
def analyze_dataset(dataset):
g = Graph()
g.parse(data=dataset.content, format=dataset.content_type)
same = 0
different = 0
for row in g.query("""
SELECT ?gender1 ?gender2 WHERE {
?entity1 <https://schema.org/spouse> ?entity2 .
?entity1 <https://schema.org/gender> ?gender1 .
?entity2 <https://schema.org/gender> ?gender2 .
}
"""):
if row.gender1 == row.gender2:
same += 1
else:
different += 1
return same, different
if __name__ == '__main__':
run()