Tutorial¶
This guide can help you start working with hypergol
. This tutorial assumes you use GitHub
for source control but using other sites must be straightforward as well. Further examples for CLI usage can be found in the run_project_tests.sh
script.
Creating a project¶
After installing hypergol
into your virtual environment, you are ready to create your first project. Before that create an empty repository on GitHub without README.md
and .gitignore
, both will be generated by hypergol
. Once the project was created, follow the commands after git init
.
Make sure you are in the directory you intend to create the project into:
$ python3 -m hypergol.cli.create_project <ProjectName>
Project name must be camel-case, and the command will create a snake-case directory. See (insert link here) documentation for creating a project from python interactive shell or jupyter notebooks.
The example assumes that you created an empty (no README.md
and .gitignore
) repository on GitHub named project_name
. Once this is done, the next step is to create the project’s own virtual environment. This enables encapsulate all the dependencies your project relies on. To do this execute the following steps (Don’t forget to deactivate
your current environment):
$ deactivate
$ cd <project_name>
$ git init
$ git add .
$ git commit -m "first commit"
$ git remote add origin git@github.com:<your_user_name>/<project_name>.git
$ git push -u origin master
$ ./make_venv.sh
$ source .venv/bin/activate
If you have dependencies that you will use in the future (e.g. numpy
add them to requirements.txt
and call:
$ pip3 install -r requirements.txt
Creating data model classes¶
The data model is the description of your project’s data that your code operates on. Instead of raw numpy arrays or pandas dataframes, hypergol
stores all data in these classes. This enables us to create hierarchical structures and store them in files recursively, so you don’t need to worry about loading and reloading complex data structures. This also helps to reason about and iterate on your code in case you need to change one of the objects and update a pipeline accordingly. Again, see (insert link here) documentation on how to create classes from interactive shells.
Creating a simple class¶
$ python3 -m hypergol.cli.create_data_model ExampleClass classId:int:id value:float name:str creation:datetime
This will create the following class (use the --dryrun
switch to display the code instead of writing into data_models/example_class.py
from datetime import datetime
from hypergol import BaseData
class ExampleClass(BaseData):
def __init__(self, classId: int, name: str, creation: datetime):
self.classId = classId
self.name = name
self.creation = creation
def get_id(self):
return (self.classId, )
def to_data(self):
data = self.__dict__.copy()
data['creation'] = data['creation'].isoformat()
return data
@classmethod
def from_data(cls, data):
data['creation'] = datetime.fromisoformat(data['creation'])
return cls(**data)
As you can see hypergol
generated the class with the necessary imports (datetime
) and the correct serialisation functions into a format that can be JSON serialised and saved to disk and back.
Also, the classId:int:id
argument’s id
field made the field this class’s id. It is assumed that this uniquely identifies this class so that the comparison will happen based on this/these fields. Multiple fields can be marked as id
which will result in their tuple to be the id of this class. You do not necessarily need to specify an id field but only classes with id’s can be types of datasets (see later, insert link here) and therefore stored in files, other classes can only be saved if they are part of another id-d class (also known as weak entities). Only int
and str
fields can be marked as id
.
Creating classes that depend on other classes¶
Let’s see a more complicated example to see hierarchical structures as well.
Again use --dryrun
switch to display the code instead of writing it out. Because this example depends on another data model class, hypergol
will check if that class exists and fails if not. So rerun the previous example if you used the --dryrun
switch there.
$ python3 -m hypergol.cli.create_data_model OtherExample classId:int:id name:str "values:List[ExampleClass]" "times:List[time]"
Don’t forget the double quotes or your shell will fail to process the square brackets correctly. This will result in the following code in data_models/other_example.py
from typing import List
from datetime import date
from hypergol import BaseData
from data_models.example_class import ExampleClass
class OtherExample(BaseData):
def __init__(self, classId: int, name: str, values: List[ExampleClass], dates: List[date]):
self.classId = classId
self.name = name
self.values = values
self.dates = dates
def get_id(self):
return (self.classId, )
def to_data(self):
data = self.__dict__.copy()
data['values'] = [v.to_data() for v in data['values']]
data['dates'] = [v.isoformat() for v in data['dates']]
return data
@classmethod
def from_data(cls, data):
data['values'] = [ExampleClass.from_data(v) for v in data['values']]
data['dates'] = [date.fromisoformat(v) for v in data['dates']]
return cls(**data)
As you can see it imports the correct typing class: List
, the correct temporal dependency: time
and the correct data model dependency: data_models.example_class.ExampleClass
. Serialisation and deserialisation of the list is sorted correctly as well.
You can see that in the tests/
directory files were created for each class. Hypergol
automatically create unit tests for these classes and enables you to add further ones. You can run the tests by ./run_tests.sh
. It uses the nosetests
framework and by default verifies the integrity of saving and loading the class which is relevant if you edit the generated code. If your class uses other data model classes as well, the tests will initially fail, replace the None
arguments with proper class initialisations to make the tests pass. Run them after each change to the data model classes to ensure the changes are correct. Also, write new tests if you add other member functions.
Once the code is generated, you can edit it freely and hypergol
will not touch it again.
Creating datasets¶
Let’s head over to jupyter notebook and investigate how we can store data in Datasets
.
First set things up:
import sys
sys.path.insert(0, '<full_path>/<project_name>')
from hypergol import Dataset
from data_models.example_class import ExampleClass
from data_models.other_example import OtherExample
from datetime import date
from datetime import datetime
Create a dataset and specify where you want to store the data. Hypergol
enables a “git-like” workflow for large datasets where data lineage is tracked through a series of commits that records the state of the codebase at the time of its creation. First, we deal with the general syntax and get back to version control later hence repoData
is None
.
Defining datasets¶
First define the dataset itself:
dataset=Dataset(
dataType=OtherExample,
location='<full_path>/<data_location>',
project='<project_name>',
branch='<test_branch>',
name='otherExamples',
chunkCount=16,
repoData=None
)
Each dataset is consist of a set of chunks which is determined by chunkCount
, (valid values are [16, 256, 4096]
, this will determine how granularly it can be parallel processed in the future by the pipelines. Each chunk is identified by a “chunkId” which is a 1-2-3 digit long hexadecimal number. Each object determines a hash_id
which is by default the object’s id, but this can be changed by overriding get_hash_id()
function in the class. This hash_id
is hashed with SHA1 hashing algorithm (same that git
uses for commits) into a 40 digit hexadecimal number. All objects with the same first 1-2-3 digits end up in the same chunk. This enables distributing classes evenly, which will be important in parallel processing. Each chunk is a gzip-ed text file with each line a separate JSON string ended with a single newline.
Writing datasets¶
with dataset.open('w') as datasetWriter:
for k in range(100):
datasetWriter.append(OtherExample(
classId=k,
name=str(k),
values=[
ExampleClass(classId=k, name=str(k), creation=datetime.now()),
ExampleClass(classId=k, name=str(k), creation=datetime.now())
],
dates=[date.today(), date.today()]
))
A dataset can be opened for writing with open('w')
, this returns a datasetWriter
object to which you can append the classes you want to store in the dataset. If you try to insert a class other than the type of the dataset an exception will be raised. The recommended way to write into a dataset is with context managers in the manner shown above. Datasets can be opened directly as well with:
datasetWriter = dataset.open('w')
...
datasetWriter.append(otherExample)
...
datasetWriter.close()
It is your responsibility to call close()
and if you don’t, the dataset will be corrupted and cannot be used. When a dataset is opened for writing it creates a .def
file with all the information that is relevant at the creation: dependencies to other datasets, the commit of the repository present, the creation time. When a dataset is closed after writing it creates a .chk
file with the hashes of the content of the file, and the hash of the .def
file. Dependencies between datasets can be created by calling add_dependency(otherDataset)
which will result in adding the otherDataset
’s .chk
file’s hash to the current dataset (this happens automatically in pipelines). This enables to retrace consistently all information required to recreate the dataset in question.
Reading datasets¶
with dataset.open('r') as datasetReader:
for data in datasetReader:
print(data)
A dataset can be opened for writing with open('r')
, this returns a datasetReader
object on which you can iterate over to access the files. The order of the objects is undefined. The dataset can be opened without a context manager and no close()
needed to be called if the entire set is loaded:
allData = list(dataset.open('r'))
Creating a task class¶
Task classes are computational elements that run on data model classes or create data model classes (in Hypergol
terminology “source”), the pipeline takes care of handling the datasets and the multiple threads where the processing happens. You only write the code that needs to run.
There are three types of tasks:
Source¶
This is the entry point into a processing pipeline, it doesn’t have input dataset. Its purpose is to format any data into a dataset that can be parallel processed by later tasks.
$ python3 -m hypergol.cli.create_task LoadData OtherExample --source
This will create tasks/load_data.py
with the skeleton of the code. You need to implement three functions whose signature is in the code:
def get_jobs(self):
return [Job(id, total, parameters) for k, parameter in enumerate(...)]
def source_iterator(self, parameters):
# read
yield (data, )
def run(self, data):
self.output.append(OtherExample(...))
The pipeline will generate the jobs by calling get_jobs()
and create a Task class instance for each and pass the job to its execute
function. For example, if your source data is in multiple files create one job for each file and pass the file’s path in the parameters
argument. Then pipeline will call the iterator with the parameters and starts yielding data which is then passed to the run()
function which will process it and create a datamodel class. This class is appended to a specific member class in the task self.output
which will ensure that it is saved into the tasks output dataset. Source
classes are derived from the Task
class as well and their mechanisms are the same apart from more functions need to be implemented. See detailed works of the Task
class below:
Task¶
If a task’s inputs are in a Hypergol
datasets then most of the above functionalities are taken care of automatically. You only need to focus on the run()
function. To structure your code correctly, it’s worth splitting long operations into shorter ones, especially if there is a logical separation between the parts. Tasks can have multiple inputs, but only one output dataset and all the inputs must have the same number of chunks.
For example, if you process text with NLP, you can have domain classes as RawText, CleanText, ProcessedText, LabeledText. For these, you can create datasets as rawTexts, cleanTexts, processedTexts, labelledTexts, and tasks: Source, CleanTextCreatorTask, TextProcessorTask, LabellerTask. This enables logically encapsulate each into its own task so maintenance will be easier.
To create a task class, executre in the terminal (List dataclasses to import after the task name):
$ python3 -m hypergol.cli.create_task ExampleTask OtherExample
This will create a tasks/example_task.py
with stubs for two functions init()
and run()
. The task can have multiple inputs, but only one output dataset and all the inputs must have the same number of chunks. It can also have a list “loaded” datasets that will be entirely loaded before any run()
calls.
One of the main constraint of python parallelisation is that all classes that are passed to a thread must be pickle-able. This means that certain classes cannot be member variables and must be constructed in a delayed manner. Hypergol’s Task class takes care of this see the Delayed
section.
The pipeline will execute the task in the following way:
- At construction:
Adds the input datasets to the output dataset as dependenices.
Creates a dataset factory for the temporary datasets.
- Calls the task’s
get_job()
function that will: Create a job for each chunk of the input dataset.
Add the right input chunks to each job.
- Calls the task’s
A copy of a task object is created in a thread.
- Initialise the task:
Constructs any
Delayed
input.Calls the user defined
init()
function (Load any large variable here, e.g., spacy models).
- Open input chunks for reading:
“Loaded” ones are entirely loaded and available as a list. (more on this later)
- Create a temporary dataset for the output.
Because the hashes might not match all chunks are opened for writing at once.
The temporary dataset will be the task’s
output
member variable and in the run function you need to append to this.
Create the
sourceIterator
, this will yield the input data that together with loaded data will be passed to therun()
function.Call the iterator in a loop and call the
run()
function with the yielded data.Close the input chunks.
Call the user defined
finish_job()
function.- At this point the thread stops and execution returns to the main thread to “finalise” the output dataset.
Finalise copies data from all temporary datasets in a _multithreaded_ way.
Calculates the chunks hash.
Creates the output dataset (with
.def
and.chk
files as well).Deletes all temporary datasets.
Calls the user defined
finish_task()
function.
To facilitate this, you need to implement just two functions:
def init(self):
# TODO: initialise members that are NOT "Delayed" here (e.g. load a spacy model)
pass
def run(self, exampleInputObject1, exampleInputObject2):
raise NotImplementedError(f'{self.__class__.__name__} must implement run()')
self.output.append(data)
If you don’t have any heavy-duty initialisation, you can delete the init()
function. The run function will get a tuple of objects, one from each iteration (this must match the order of input datasets in the instantiation of the task in the pipeline), and append any output values to self.output
which will be saved into the output dataset.
Loaded Inputs¶
Having the same id on the input and the output is a strong restriction. To circumvent this limitation, each task can define a set of datasets in loadedInputs
. The objects from these are loaded into lists into the task’s self.loadedData
field and available both in the init()
and in the run()
functions. The init()
function is ideal for converting these into a map for example:
def __init__():
...
self.map1 = None
def init(self):
self.map1 = {value.get_id(): value for value in self.loadedData[0]}
def run(self, inputObject1, inputObject2):
value = self.map1.get(inputObject2.get_id(), None)
return outputObject
Creating a pipeline¶
$ python3 -m hypergol.cli.create_pipeline PipelineName Source1 Task1 Task2 ExampleClass1 ExampleClass2
This will create pipelines/pipeline_name.py
and pipeline_name.sh
. The shell script has examples of how to pass parameters to the script and also (optionally) disables multithreading on popular numerical packages as these may interfere with parallel execution. The pipeline uses the Python Fire package to handle command-line arguments so just follow the example to add more.
In the python script stubs for several functionalities are generated:
Repo
: is the python interface to the local git repository, before execution the repo is checked for being “dirty”, namely if there are any uncommitted changes and if yes processing is halted unless a--force
switch is used. This will may result in saving a commit message into the datasets that don’t allow recovering the actual code that created the dataset, so use it with caution.RepoData
: is the data class that is saved with each dataset, it contains the commit message, hash the email of the committer and the branch name.a
DatasetFactory
: This is a convenience method that is used if several datasets for a pipeline needs to be created. Enables to create datasets by specifying only the type and the name.Several datasets: For each class name specified in the
create_pipeline
command a dataset is created in theexampleClasses = dsf.get(dataType=ExampleClass, name='example_classes')
manner.Stubs for several tasks: Constructors with dummy datasets parameters to be filled by the user.
Pipeline instance: tasks included in the same order as a
create_pipeline
command.
To finish the pipeline fill in the location, project, branch names and the input/output datasets of the tasks and other parameters. Execute the pipeline with the generated shell script. Because the shell script is potentially running for several hours, it is recommended that a window manager like screen
or tmux
to be used.
Delayed¶
Sometimes a class must be passed onto a task that cannot be pickled (e.g. logging.Logger or a database connection). For this, the Delayed
mechanism is provided.
# This will throw an error if attempted to be executed
exampleTask = ExampleTask(
value1=CannotPickle(value2=123),
value2=canPickle
)
# Turn it into this:
exampleTask = ExampleTask(
value1=Delayed(CannotPickle, value1=123),
value2=canPickle
)
This will result in delaying the creation of CannotPickle
object until the task object is recreated inside the thread. This happens exactly between the loadedInputs
loading and the init()
function, so it can be used in the latter. The Delayed
mechanism is recursive so any also non-pickleable classes can be pass on.
Creating a Torch model¶
The structure for creating Torch models is very similar to Tensorflow models. For create_model_block
and create_model
functions use the --torch
switch and you will get Torch dependent templates.
Use the @torch.jit.export
decorator on functions you want to expose with TorchScript (get_outputs
will be decorated by default in the template).
forward()
is not used by default and TorchModelManager
uses the default functions to operate the model.
produce_metrics
should return a {'tag':value, ...}
dictionary of scalars to be recorded to TensorBoard.
Otherwise (training, serving) the treatment of the two types of models should be the same. Contact us if you find this otherwise.
Creating a Tensorflow model¶
Plan the structure of the model before creating a project. Decompose the modelling task into smaller components and describe their responsibilities and their relationship to each other. Both input and output data model class must exist before the create_model
command is called.
Create a block¶
$ python -m hypergol.cli.create_model_block ExampleBlock
This will create models/blocks/example_block.py
, it doesn’t contain too much useful code as blocks are primarily used to organise Tensorflow code.
Create the model¶
All files related to a model (the model itself, the batchprocessor, the train and the serve scripts are located in models/model_name
directory so multiple models for the same project separate easily. The starting shell scripts are all in the project main directory (which should be the current directory when running them).
$ python -m hypergol.cli.create_model ExampleModel TrainingClass EvaluationClass InputClass OutputClass Block1 Block2
This will create three main components: ExampleModel
in example_model.py
that is the model itself, it also imports the blocks specified, ExampleModelBatchProcessor
in example_model_batch_processor.py
and also imports the train/evaluation, data model classes. train_example_model.py is a fire
script that manages the training process through the TensorflowModelManager
class and imports and instantiate all the necessary components. The training can be started with train_example_model.sh
in the main project directory.
Each of the abstract functions the model implement (get_loss
, produce_metrics
and get_outputs
) must accept input the same way. TensorflowModelManager
gets an (input, target)
and passes on the above functions with double asterisk (**) operator. This means that the (key, value) items in the dictionary will turn into keyword arguments in the model’s respective functions:
# in ExampleModelBatchProcessor.process_training_batch()
inputs = {
'exampleInput1': tf.constant(value1, dtype=tf.int32),
'exampleInput2': tf.ragged.constant(values, dtype=tf.string).to_tensor()
}
# in TensorflowModelManager
loss = self.model.get_loss(targets=targets, training=True, **inputs)
# in ExampleModel
def get_loss(self, targets, training, exampleInput1, exampleInput2):
...
This ensures that if either one of the four (process_training_batch
, get_loss
, produce_metrics
and get_outputs
) is changed, the rest must follow suit to be consistent.
For the batch processor two functions must be implemented for training/evaluation. process_training_batch
is responsible for feeding the correct data to the model during training and evaluation. it returns an (inputs, targets)
tuple where inputs
is a dictionary of tensors and targets
is a single tensor. And process_evaluation_batch
that is responsible for turning the output of the model from tensors to data model classes that can be saved into datasets.
The other two functions are process_input_batch
and process_output_batch
that is used in deployment. process_input_batch
takes the input part of the process_training_batch
to feed the model at inference time. process_output_batch
takes the return value of the models get_outputs
(the output part of the arguments of process_evaluation_batch
).
The similarities between these functions enable significant code simplifications that are possible, given that all four functions are in the same class. These similarities can be reflected in the data model as well (e.g., TrainingClass and EvaluationClass are composite/decorated classes of InputClass and OutputClass).
The produce_metrics
function is running in a context of a tf.summary.SummaryWriter
so the implementation just needs to call tf.summary.scalar
or any other summary functions to record data to TensorBoard. See https://www.tensorflow.org/api_docs/python/tf/summary for further documentation on this.
The get_outputs
function must specify the signature with the @tf.function
decorator because this enables the model to be packaged and to be deployed as a standalone model. Again this signature must match the inputs
dictionary. Following from the previous mock example:
@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.int32, name='exampleInput1'),
tf.TensorSpec(shape=[None, None], dtype=tf.string, name='exampleInput2')
])
def get_outputs(self, exampleInput1, exampleInput2):
...
This further ties batch management and the various model-related functionalities to deployment as all of these must stay in sync if end-to-end deployment must be continuously maintained.
The training script uses HypergolProject
to find and maintain Datasets
and also checks the state of the git repository. To ensure data lineage (linking data to one particular code), all relevant code should be committed before training starts. After this, it instantiates the BatchProcessor
and the required input and output datasets (through HypergolProject
DatasetFactory
. instantiates the Model and all of its Blocks. Then creates a TensorflowModelManager
to ties these together and specify the optimiser. Then executes the run()
function to perform the actual training.
Training can be started with the provided shell script. As with other Hypergol functionalities, parameters can be passed from the command line or the script to the python code through the fire
package’s functionalities.
Hypergol TensorFlow models have two identifiers embedded into the packaged models: “longName” “inputDatasetChkFileChecksum” these can be accessed with the get_long_name
and get_input_dataset_chk_file_checksum
functions respectively. Given these are TensorFlow functions you can extract the actual string with model.get_long_name().numpy().decode('utf-8')
for example for the long name. The long name is <classname>_<traind_date>_<git commit hash> format which can be changed in the training script. Out of these the commit hash uniquely identifies the model the rest is only there for human readability reasons. The inputDatasetChkFileChecksum
is the SHA1 checksum of the training dataset’s .chk
file. This can be verified with the sha1sum <dataset>.chk
command or the shasum <dataset>.chk
on a Mac. Both of these are included for data lineage reasons so the origins of an eventual output in a client can be traced back to the version of the training code and the data that was used. Because datasets store their entire lineage of dependent datasets, this can be traced back to the source of the data providing full end-to-end transparency over model decisions.
One important issue: Datasets must be closed properly to generate their .chk file to be openable again. Therefore the entire train/evaluation loop runs in a try/finally block, and the TensorflowModelManager
’s finish()
function is guaranteed to be called, this, in turn, calls the BaseBatchProcessor
’s finish()
function and that closes the OutputDataset so it can be opened at a later stage for evaluation.
The evaluation part is to debug the model at the early stages of the training. Large scale evaluation should happen parallelised through a dedicated Hypergol
pipeline.
Deploying a Tensorflow model¶
Model serving is done by FastAPI and uvicorn with self-contained code generated. To start the server ./serve_<model_name>.sh
(port and host can be set in the shell script), this in turn calls models/serve_<model_name>.py
. The only detail that needs to be specified is the directory of the model to be served:
MODEL_DIRECTORY = '<data_directory>/<project>/<branch>/models/<model_name>/<epoch_number>'
The /
endpoint provides information on the served model. The most important among these are the models "long name"
. (UPDATE
: From version 0.0.10 the response header of the /output
endpoint has this information as well, see example below.) This contains the date the training happened and the repo’s commit hash at that point. This is an optional feature set in the training python script so can be changed freely to something else. This information enables data lineage if saved in logs with the calculated outputs, which is useful in case of historical analysis of the various version of the model. To call it with requests
:
import requests
response = json.loads(requests.get('http://0.0.0.0:8000', headers={'accept': 'application/json'}).text)
modelLongName = response['model']
And the result:
{
"title": "Serve <ModelName>",
"version": "0.1",
"description": "FastApi wrapper on <ModelName>, see /docs for API details",
"model": "<ModelName>_20200101_1108ee9eccb0000dd31d8fda01cc26031dbc3cc6"
}
The autogenerated code creates pydantic types on the fly (even for complex types) from the model’s BatchProcessor’s input/output types (not to be confused with the training/evaluation types in the same class). These pydantic
types are then used by FastAPI
to enable swagger to create typed endpoints automatically.
The /output
endpoint accepts a list of the same types that BatchProcessor.process_inputs()
can in JSON form (see the generated projects README.md
for a working example as well), the example assumes the input objects are available in a dataset called ds
, this can be achieved easiest by creating a HypergolProject
and calling its list_datasets()
function with asCode=True
enabled:
sys.path.insert(0, '<project_directory>')
import json
import requests
from itertools import islice
from data_models.example_output import ExampleOutput
with ds.open('r') as dsr:
values = [value.to_data() for value in islice(dsr, 10)]
response = requests.post(
'http://0.0.0.0:8000/output',
headers={
'accept': 'application/json',
'Content-Type': 'application/json',
},
data=json.dumps(values)
)
outputs = [ExampleOutput.from_data(v) for v in json.loads(response.text)]
print(response.headers['x-model-long-name']
From version 0.0.10 the response header contains the model long name as well in the x-model-long-name
field, so the caller client can record it and use for end-to-end data lineage.
For large scale analysis, it is not recommended to use the deployed model due to multiple factors: The network latency, the conversion back and forth and the single-threaded ness of the execution. If a large set of outputs must be calculated a dedicated pipeline with many threads is the best way forward. Use the implementation of the /output
endpoint for reference (without the pydantic
conversion.
Jupyter notebooks¶
Hypergol creates a notebook directory with an example notebook when creating a project. The example shows how to load data from datasets and how to call a deployed model.
$ cd notebooks
$ jupyter notebooks --port=8889
Set the port as you see suited.
Utilities¶
The following command lists all available datasets with their construction commands so they can be used in interactive environments
$ python -m hypergol.cli.list_datasets <data_directory>
Use the --pattern="<regex>"
feature to filter for a specific (lowercase) dataset directory.
Alternatively the DatasetFactory can create dataset classes:
import sys
sys.path.insert(0, '<project_directory>')
from hypergol import HypergolProject
from data_models.<example_class> import ExampleClass
hypergolProject=HypergolProject(
projectDirectory='<project_directory>',
dataDirectory='<data_directory>'
)
exampleClasses = hypergolProject.datasetFactory.get(dataType=ExampleClass, name='example_classes')