Tutorial

This guide can help you start working with hypergol. This tutorial assumes you use GitHub for source control but using other sites must be straightforward as well. Further examples for CLI usage can be found in the run_project_tests.sh script.

Creating a project

After installing hypergol into your virtual environment, you are ready to create your first project. Before that create an empty repository on GitHub without README.md and .gitignore, both will be generated by hypergol. Once the project was created, follow the commands after git init.

Make sure you are in the directory you intend to create the project into:

$ python3 -m hypergol.cli.create_project <ProjectName>

Project name must be camel-case, and the command will create a snake-case directory. See (insert link here) documentation for creating a project from python interactive shell or jupyter notebooks.

The example assumes that you created an empty (no README.md and .gitignore) repository on GitHub named project_name. Once this is done, the next step is to create the project’s own virtual environment. This enables encapsulate all the dependencies your project relies on. To do this execute the following steps (Don’t forget to deactivate your current environment):

$ deactivate
$ cd <project_name>
$ git init
$ git add .
$ git commit -m "first commit"
$ git remote add origin git@github.com:<your_user_name>/<project_name>.git
$ git push -u origin master
$ ./make_venv.sh
$ source .venv/bin/activate

If you have dependencies that you will use in the future (e.g. numpy add them to requirements.txt and call:

$ pip3 install -r requirements.txt

Creating data model classes

The data model is the description of your project’s data that your code operates on. Instead of raw numpy arrays or pandas dataframes, hypergol stores all data in these classes. This enables us to create hierarchical structures and store them in files recursively, so you don’t need to worry about loading and reloading complex data structures. This also helps to reason about and iterate on your code in case you need to change one of the objects and update a pipeline accordingly. Again, see (insert link here) documentation on how to create classes from interactive shells.

Creating a simple class

$ python3 -m hypergol.cli.create_data_model ExampleClass classId:int:id value:float name:str creation:datetime

This will create the following class (use the --dryrun switch to display the code instead of writing into data_models/example_class.py

from datetime import datetime
from hypergol import BaseData


class ExampleClass(BaseData):

    def __init__(self, classId: int, name: str, creation: datetime):
        self.classId = classId
        self.name = name
        self.creation = creation

    def get_id(self):
        return (self.classId, )

    def to_data(self):
        data = self.__dict__.copy()
        data['creation'] = data['creation'].isoformat()
        return data

    @classmethod
    def from_data(cls, data):
        data['creation'] = datetime.fromisoformat(data['creation'])
        return cls(**data)

As you can see hypergol generated the class with the necessary imports (datetime) and the correct serialisation functions into a format that can be JSON serialised and saved to disk and back.

Also, the classId:int:id argument’s id field made the field this class’s id. It is assumed that this uniquely identifies this class so that the comparison will happen based on this/these fields. Multiple fields can be marked as id which will result in their tuple to be the id of this class. You do not necessarily need to specify an id field but only classes with id’s can be types of datasets (see later, insert link here) and therefore stored in files, other classes can only be saved if they are part of another id-d class (also known as weak entities). Only int and str fields can be marked as id.

Creating classes that depend on other classes

Let’s see a more complicated example to see hierarchical structures as well.

Again use --dryrun switch to display the code instead of writing it out. Because this example depends on another data model class, hypergol will check if that class exists and fails if not. So rerun the previous example if you used the --dryrun switch there.

$ python3 -m hypergol.cli.create_data_model OtherExample classId:int:id name:str "values:List[ExampleClass]" "times:List[time]"

Don’t forget the double quotes or your shell will fail to process the square brackets correctly. This will result in the following code in data_models/other_example.py

from typing import List
from datetime import date
from hypergol import BaseData
from data_models.example_class import ExampleClass


class OtherExample(BaseData):

    def __init__(self, classId: int, name: str, values: List[ExampleClass], dates: List[date]):
        self.classId = classId
        self.name = name
        self.values = values
        self.dates = dates

    def get_id(self):
        return (self.classId, )

    def to_data(self):
        data = self.__dict__.copy()
        data['values'] = [v.to_data() for v in data['values']]
        data['dates'] = [v.isoformat() for v in data['dates']]
        return data

    @classmethod
    def from_data(cls, data):
        data['values'] = [ExampleClass.from_data(v) for v in data['values']]
        data['dates'] = [date.fromisoformat(v) for v in data['dates']]
        return cls(**data)

As you can see it imports the correct typing class: List, the correct temporal dependency: time and the correct data model dependency: data_models.example_class.ExampleClass. Serialisation and deserialisation of the list is sorted correctly as well.

You can see that in the tests/ directory files were created for each class. Hypergol automatically create unit tests for these classes and enables you to add further ones. You can run the tests by ./run_tests.sh. It uses the nosetests framework and by default verifies the integrity of saving and loading the class which is relevant if you edit the generated code. If your class uses other data model classes as well, the tests will initially fail, replace the None arguments with proper class initialisations to make the tests pass. Run them after each change to the data model classes to ensure the changes are correct. Also, write new tests if you add other member functions.

Once the code is generated, you can edit it freely and hypergol will not touch it again.

Creating datasets

Let’s head over to jupyter notebook and investigate how we can store data in Datasets.

First set things up:

import sys
sys.path.insert(0, '<full_path>/<project_name>')
from hypergol import Dataset
from data_models.example_class import ExampleClass
from data_models.other_example import OtherExample
from datetime import date
from datetime import datetime

Create a dataset and specify where you want to store the data. Hypergol enables a “git-like” workflow for large datasets where data lineage is tracked through a series of commits that records the state of the codebase at the time of its creation. First, we deal with the general syntax and get back to version control later hence repoData is None.

Defining datasets

First define the dataset itself:

dataset=Dataset(
    dataType=OtherExample,
    location='<full_path>/<data_location>',
    project='<project_name>',
    branch='<test_branch>',
    name='otherExamples',
    chunkCount=16,
    repoData=None
)

Each dataset is consist of a set of chunks which is determined by chunkCount, (valid values are [16, 256, 4096], this will determine how granularly it can be parallel processed in the future by the pipelines. Each chunk is identified by a “chunkId” which is a 1-2-3 digit long hexadecimal number. Each object determines a hash_id which is by default the object’s id, but this can be changed by overriding get_hash_id() function in the class. This hash_id is hashed with SHA1 hashing algorithm (same that git uses for commits) into a 40 digit hexadecimal number. All objects with the same first 1-2-3 digits end up in the same chunk. This enables distributing classes evenly, which will be important in parallel processing. Each chunk is a gzip-ed text file with each line a separate JSON string ended with a single newline.

Writing datasets

with dataset.open('w') as datasetWriter:
    for k in range(100):
        datasetWriter.append(OtherExample(
            classId=k,
            name=str(k),
            values=[
                ExampleClass(classId=k, name=str(k), creation=datetime.now()),
                ExampleClass(classId=k, name=str(k), creation=datetime.now())
            ],
            dates=[date.today(), date.today()]
        ))

A dataset can be opened for writing with open('w'), this returns a datasetWriter object to which you can append the classes you want to store in the dataset. If you try to insert a class other than the type of the dataset an exception will be raised. The recommended way to write into a dataset is with context managers in the manner shown above. Datasets can be opened directly as well with:

datasetWriter = dataset.open('w')
...
datasetWriter.append(otherExample)
...
datasetWriter.close()

It is your responsibility to call close() and if you don’t, the dataset will be corrupted and cannot be used. When a dataset is opened for writing it creates a .def file with all the information that is relevant at the creation: dependencies to other datasets, the commit of the repository present, the creation time. When a dataset is closed after writing it creates a .chk file with the hashes of the content of the file, and the hash of the .def file. Dependencies between datasets can be created by calling add_dependency(otherDataset) which will result in adding the otherDataset’s .chk file’s hash to the current dataset (this happens automatically in pipelines). This enables to retrace consistently all information required to recreate the dataset in question.

Reading datasets

with dataset.open('r') as datasetReader:
    for data in datasetReader:
        print(data)

A dataset can be opened for writing with open('r'), this returns a datasetReader object on which you can iterate over to access the files. The order of the objects is undefined. The dataset can be opened without a context manager and no close() needed to be called if the entire set is loaded:

allData = list(dataset.open('r'))

Creating a task class

Task classes are computational elements that run on data model classes or create data model classes (in Hypergol terminology “source”), the pipeline takes care of handling the datasets and the multiple threads where the processing happens. You only write the code that needs to run.

There are three types of tasks:

Source

This is the entry point into a processing pipeline, it doesn’t have input dataset. Its purpose is to format any data into a dataset that can be parallel processed by later tasks.

$ python3 -m hypergol.cli.create_task LoadData OtherExample --source

This will create tasks/load_data.py with the skeleton of the code. You need to implement three functions whose signature is in the code:

def get_jobs(self):
    return [Job(id, total, parameters) for k, parameter in enumerate(...)]

def source_iterator(self, parameters):
    # read
    yield (data, )

def run(self, data):
    self.output.append(OtherExample(...))

The pipeline will generate the jobs by calling get_jobs() and create a Task class instance for each and pass the job to its execute function. For example, if your source data is in multiple files create one job for each file and pass the file’s path in the parameters argument. Then pipeline will call the iterator with the parameters and starts yielding data which is then passed to the run() function which will process it and create a datamodel class. This class is appended to a specific member class in the task self.output which will ensure that it is saved into the tasks output dataset. Source classes are derived from the Task class as well and their mechanisms are the same apart from more functions need to be implemented. See detailed works of the Task class below:

Task

If a task’s inputs are in a Hypergol datasets then most of the above functionalities are taken care of automatically. You only need to focus on the run() function. To structure your code correctly, it’s worth splitting long operations into shorter ones, especially if there is a logical separation between the parts. Tasks can have multiple inputs, but only one output dataset and all the inputs must have the same number of chunks.

For example, if you process text with NLP, you can have domain classes as RawText, CleanText, ProcessedText, LabeledText. For these, you can create datasets as rawTexts, cleanTexts, processedTexts, labelledTexts, and tasks: Source, CleanTextCreatorTask, TextProcessorTask, LabellerTask. This enables logically encapsulate each into its own task so maintenance will be easier.

To create a task class, executre in the terminal (List dataclasses to import after the task name):

$ python3 -m hypergol.cli.create_task ExampleTask OtherExample

This will create a tasks/example_task.py with stubs for two functions init() and run(). The task can have multiple inputs, but only one output dataset and all the inputs must have the same number of chunks. It can also have a list “loaded” datasets that will be entirely loaded before any run() calls.

One of the main constraint of python parallelisation is that all classes that are passed to a thread must be pickle-able. This means that certain classes cannot be member variables and must be constructed in a delayed manner. Hypergol’s Task class takes care of this see the Delayed section.

The pipeline will execute the task in the following way:

  • At construction:
    • Adds the input datasets to the output dataset as dependenices.

    • Creates a dataset factory for the temporary datasets.

  • Calls the task’s get_job() function that will:
    • Create a job for each chunk of the input dataset.

    • Add the right input chunks to each job.

  • A copy of a task object is created in a thread.

  • Initialise the task:
    • Constructs any Delayed input.

    • Calls the user defined init() function (Load any large variable here, e.g., spacy models).

  • Open input chunks for reading:
    • “Loaded” ones are entirely loaded and available as a list. (more on this later)

  • Create a temporary dataset for the output.
    • Because the hashes might not match all chunks are opened for writing at once.

    • The temporary dataset will be the task’s output member variable and in the run function you need to append to this.

  • Create the sourceIterator, this will yield the input data that together with loaded data will be passed to the run() function.

  • Call the iterator in a loop and call the run() function with the yielded data.

  • Close the input chunks.

  • Call the user defined finish_job() function.

  • At this point the thread stops and execution returns to the main thread to “finalise” the output dataset.
    • Finalise copies data from all temporary datasets in a _multithreaded_ way.

    • Calculates the chunks hash.

    • Creates the output dataset (with .def and .chk files as well).

    • Deletes all temporary datasets.

    • Calls the user defined finish_task() function.

To facilitate this, you need to implement just two functions:

def init(self):
    # TODO: initialise members that are NOT "Delayed" here (e.g. load a spacy model)
    pass

def run(self, exampleInputObject1, exampleInputObject2):
    raise NotImplementedError(f'{self.__class__.__name__} must implement run()')
    self.output.append(data)

If you don’t have any heavy-duty initialisation, you can delete the init() function. The run function will get a tuple of objects, one from each iteration (this must match the order of input datasets in the instantiation of the task in the pipeline), and append any output values to self.output which will be saved into the output dataset.

Loaded Inputs

Having the same id on the input and the output is a strong restriction. To circumvent this limitation, each task can define a set of datasets in loadedInputs. The objects from these are loaded into lists into the task’s self.loadedData field and available both in the init() and in the run() functions. The init() function is ideal for converting these into a map for example:

def __init__():
    ...
    self.map1 = None

def init(self):
    self.map1 = {value.get_id(): value for value in self.loadedData[0]}

def run(self, inputObject1, inputObject2):
    value = self.map1.get(inputObject2.get_id(), None)
    return outputObject

Creating a pipeline

$ python3 -m hypergol.cli.create_pipeline PipelineName Source1 Task1 Task2 ExampleClass1 ExampleClass2

This will create pipelines/pipeline_name.py and pipeline_name.sh. The shell script has examples of how to pass parameters to the script and also (optionally) disables multithreading on popular numerical packages as these may interfere with parallel execution. The pipeline uses the Python Fire package to handle command-line arguments so just follow the example to add more.

In the python script stubs for several functionalities are generated:

  • Repo: is the python interface to the local git repository, before execution the repo is checked for being “dirty”, namely if there are any uncommitted changes and if yes processing is halted unless a --force switch is used. This will may result in saving a commit message into the datasets that don’t allow recovering the actual code that created the dataset, so use it with caution.

  • RepoData: is the data class that is saved with each dataset, it contains the commit message, hash the email of the committer and the branch name.

  • a DatasetFactory: This is a convenience method that is used if several datasets for a pipeline needs to be created. Enables to create datasets by specifying only the type and the name.

  • Several datasets: For each class name specified in the create_pipeline command a dataset is created in the exampleClasses = dsf.get(dataType=ExampleClass, name='example_classes') manner.

  • Stubs for several tasks: Constructors with dummy datasets parameters to be filled by the user.

  • Pipeline instance: tasks included in the same order as a create_pipeline command.

To finish the pipeline fill in the location, project, branch names and the input/output datasets of the tasks and other parameters. Execute the pipeline with the generated shell script. Because the shell script is potentially running for several hours, it is recommended that a window manager like screen or tmux to be used.

Delayed

Sometimes a class must be passed onto a task that cannot be pickled (e.g. logging.Logger or a database connection). For this, the Delayed mechanism is provided.

# This will throw an error if attempted to be executed
exampleTask = ExampleTask(
    value1=CannotPickle(value2=123),
    value2=canPickle
)

# Turn it into this:
exampleTask = ExampleTask(
    value1=Delayed(CannotPickle, value1=123),
    value2=canPickle
)

This will result in delaying the creation of CannotPickle object until the task object is recreated inside the thread. This happens exactly between the loadedInputs loading and the init() function, so it can be used in the latter. The Delayed mechanism is recursive so any also non-pickleable classes can be pass on.

Creating a Torch model

The structure for creating Torch models is very similar to Tensorflow models. For create_model_block and create_model functions use the --torch switch and you will get Torch dependent templates.

Use the @torch.jit.export decorator on functions you want to expose with TorchScript (get_outputs will be decorated by default in the template).

forward() is not used by default and TorchModelManager uses the default functions to operate the model.

produce_metrics should return a {'tag':value, ...} dictionary of scalars to be recorded to TensorBoard.

Otherwise (training, serving) the treatment of the two types of models should be the same. Contact us if you find this otherwise.

Creating a Tensorflow model

Plan the structure of the model before creating a project. Decompose the modelling task into smaller components and describe their responsibilities and their relationship to each other. Both input and output data model class must exist before the create_model command is called.

Create a block

$ python -m hypergol.cli.create_model_block ExampleBlock

This will create models/blocks/example_block.py, it doesn’t contain too much useful code as blocks are primarily used to organise Tensorflow code.

Create the model

All files related to a model (the model itself, the batchprocessor, the train and the serve scripts are located in models/model_name directory so multiple models for the same project separate easily. The starting shell scripts are all in the project main directory (which should be the current directory when running them).

$ python -m hypergol.cli.create_model ExampleModel TrainingClass EvaluationClass InputClass OutputClass Block1 Block2

This will create three main components: ExampleModel in example_model.py that is the model itself, it also imports the blocks specified, ExampleModelBatchProcessor in example_model_batch_processor.py and also imports the train/evaluation, data model classes. train_example_model.py is a fire script that manages the training process through the TensorflowModelManager class and imports and instantiate all the necessary components. The training can be started with train_example_model.sh in the main project directory.

Each of the abstract functions the model implement (get_loss, produce_metrics and get_outputs) must accept input the same way. TensorflowModelManager gets an (input, target) and passes on the above functions with double asterisk (**) operator. This means that the (key, value) items in the dictionary will turn into keyword arguments in the model’s respective functions:

# in ExampleModelBatchProcessor.process_training_batch()
inputs = {
    'exampleInput1': tf.constant(value1, dtype=tf.int32),
    'exampleInput2': tf.ragged.constant(values, dtype=tf.string).to_tensor()
}

# in TensorflowModelManager
loss = self.model.get_loss(targets=targets, training=True, **inputs)

# in ExampleModel
def get_loss(self, targets, training, exampleInput1, exampleInput2):
    ...

This ensures that if either one of the four (process_training_batch, get_loss, produce_metrics and get_outputs) is changed, the rest must follow suit to be consistent.

For the batch processor two functions must be implemented for training/evaluation. process_training_batch is responsible for feeding the correct data to the model during training and evaluation. it returns an (inputs, targets) tuple where inputs is a dictionary of tensors and targets is a single tensor. And process_evaluation_batch that is responsible for turning the output of the model from tensors to data model classes that can be saved into datasets.

The other two functions are process_input_batch and process_output_batch that is used in deployment. process_input_batch takes the input part of the process_training_batch to feed the model at inference time. process_output_batch takes the return value of the models get_outputs (the output part of the arguments of process_evaluation_batch).

The similarities between these functions enable significant code simplifications that are possible, given that all four functions are in the same class. These similarities can be reflected in the data model as well (e.g., TrainingClass and EvaluationClass are composite/decorated classes of InputClass and OutputClass).

The produce_metrics function is running in a context of a tf.summary.SummaryWriter so the implementation just needs to call tf.summary.scalar or any other summary functions to record data to TensorBoard. See https://www.tensorflow.org/api_docs/python/tf/summary for further documentation on this.

The get_outputs function must specify the signature with the @tf.function decorator because this enables the model to be packaged and to be deployed as a standalone model. Again this signature must match the inputs dictionary. Following from the previous mock example:

@tf.function(input_signature=[
    tf.TensorSpec(shape=[None], dtype=tf.int32, name='exampleInput1'),
    tf.TensorSpec(shape=[None, None], dtype=tf.string, name='exampleInput2')
])
def get_outputs(self, exampleInput1, exampleInput2):
    ...

This further ties batch management and the various model-related functionalities to deployment as all of these must stay in sync if end-to-end deployment must be continuously maintained.

The training script uses HypergolProject to find and maintain Datasets and also checks the state of the git repository. To ensure data lineage (linking data to one particular code), all relevant code should be committed before training starts. After this, it instantiates the BatchProcessor and the required input and output datasets (through HypergolProject DatasetFactory. instantiates the Model and all of its Blocks. Then creates a TensorflowModelManager to ties these together and specify the optimiser. Then executes the run() function to perform the actual training.

Training can be started with the provided shell script. As with other Hypergol functionalities, parameters can be passed from the command line or the script to the python code through the fire package’s functionalities.

Hypergol TensorFlow models have two identifiers embedded into the packaged models: “longName” “inputDatasetChkFileChecksum” these can be accessed with the get_long_name and get_input_dataset_chk_file_checksum functions respectively. Given these are TensorFlow functions you can extract the actual string with model.get_long_name().numpy().decode('utf-8') for example for the long name. The long name is <classname>_<traind_date>_<git commit hash> format which can be changed in the training script. Out of these the commit hash uniquely identifies the model the rest is only there for human readability reasons. The inputDatasetChkFileChecksum is the SHA1 checksum of the training dataset’s .chk file. This can be verified with the sha1sum <dataset>.chk command or the shasum <dataset>.chk on a Mac. Both of these are included for data lineage reasons so the origins of an eventual output in a client can be traced back to the version of the training code and the data that was used. Because datasets store their entire lineage of dependent datasets, this can be traced back to the source of the data providing full end-to-end transparency over model decisions.

One important issue: Datasets must be closed properly to generate their .chk file to be openable again. Therefore the entire train/evaluation loop runs in a try/finally block, and the TensorflowModelManager’s finish() function is guaranteed to be called, this, in turn, calls the BaseBatchProcessor’s finish() function and that closes the OutputDataset so it can be opened at a later stage for evaluation.

The evaluation part is to debug the model at the early stages of the training. Large scale evaluation should happen parallelised through a dedicated Hypergol pipeline.

Deploying a Tensorflow model

Model serving is done by FastAPI and uvicorn with self-contained code generated. To start the server ./serve_<model_name>.sh (port and host can be set in the shell script), this in turn calls models/serve_<model_name>.py. The only detail that needs to be specified is the directory of the model to be served:

MODEL_DIRECTORY = '<data_directory>/<project>/<branch>/models/<model_name>/<epoch_number>'

The / endpoint provides information on the served model. The most important among these are the models "long name". (UPDATE: From version 0.0.10 the response header of the /output endpoint has this information as well, see example below.) This contains the date the training happened and the repo’s commit hash at that point. This is an optional feature set in the training python script so can be changed freely to something else. This information enables data lineage if saved in logs with the calculated outputs, which is useful in case of historical analysis of the various version of the model. To call it with requests:

import requests
response = json.loads(requests.get('http://0.0.0.0:8000', headers={'accept': 'application/json'}).text)
modelLongName = response['model']

And the result:

{
    "title": "Serve <ModelName>",
    "version": "0.1",
    "description": "FastApi wrapper on <ModelName>, see /docs for API details",
    "model": "<ModelName>_20200101_1108ee9eccb0000dd31d8fda01cc26031dbc3cc6"
}

The autogenerated code creates pydantic types on the fly (even for complex types) from the model’s BatchProcessor’s input/output types (not to be confused with the training/evaluation types in the same class). These pydantic types are then used by FastAPI to enable swagger to create typed endpoints automatically.

The /output endpoint accepts a list of the same types that BatchProcessor.process_inputs() can in JSON form (see the generated projects README.md for a working example as well), the example assumes the input objects are available in a dataset called ds, this can be achieved easiest by creating a HypergolProject and calling its list_datasets() function with asCode=True enabled:

sys.path.insert(0, '<project_directory>')
import json
import requests
from itertools import islice

from data_models.example_output import ExampleOutput


with ds.open('r') as dsr:
    values = [value.to_data() for value in islice(dsr, 10)]


response = requests.post(
    'http://0.0.0.0:8000/output',
    headers={
        'accept': 'application/json',
        'Content-Type': 'application/json',
    },
    data=json.dumps(values)
)
outputs = [ExampleOutput.from_data(v) for v in json.loads(response.text)]
print(response.headers['x-model-long-name']

From version 0.0.10 the response header contains the model long name as well in the x-model-long-name field, so the caller client can record it and use for end-to-end data lineage.

For large scale analysis, it is not recommended to use the deployed model due to multiple factors: The network latency, the conversion back and forth and the single-threaded ness of the execution. If a large set of outputs must be calculated a dedicated pipeline with many threads is the best way forward. Use the implementation of the /output endpoint for reference (without the pydantic conversion.

Jupyter notebooks

Hypergol creates a notebook directory with an example notebook when creating a project. The example shows how to load data from datasets and how to call a deployed model.

$ cd notebooks
$ jupyter notebooks --port=8889

Set the port as you see suited.

Utilities

The following command lists all available datasets with their construction commands so they can be used in interactive environments

$ python -m hypergol.cli.list_datasets <data_directory>

Use the --pattern="<regex>" feature to filter for a specific (lowercase) dataset directory.

Alternatively the DatasetFactory can create dataset classes:

import sys
sys.path.insert(0, '<project_directory>')
from hypergol import HypergolProject
from data_models.<example_class> import ExampleClass
hypergolProject=HypergolProject(
    projectDirectory='<project_directory>',
    dataDirectory='<data_directory>'
)
exampleClasses = hypergolProject.datasetFactory.get(dataType=ExampleClass, name='example_classes')