Source code for hypergol.hypergol_project

import os
import re
import json
import stat
import glob
from pathlib import Path

import jinja2
from git import Repo
from git.exc import NoSuchPathError
from git.exc import InvalidGitRepositoryError

import hypergol
from hypergol import DatasetFactory
from hypergol import RepoData
from hypergol.utils import Mode
from hypergol.utils import create_text_file
from hypergol.utils import create_directory
from hypergol.name_string import NameString


DATASET_TEMPLATE = """sys.path.insert(0, '{projectDirectory}')
from data_models.{dataTypeFile} import {dataType}
from hypergol import Dataset
from hypergol import RepoData
ds=Dataset(
    dataType={dataType},
    location='{location}',
    project='{project}',
    branch='{branch}',
    name='{name}',
    chunkCount={chunkCount},
    repoData=RepoData(
        branchName='{branchName}',
        commitHash='{commitHash}',
        commitMessage='{commitMessage}',
        comitterName='{comitterName}',
        comitterEmail='{comitterEmail}'
    )
)"""


def locate(fname):
    return Path(hypergol.__path__[0], 'cli', 'templates', fname)


[docs]class RepoManager: """Wrapper class around git that provides all information about the repo connected to the project. """
[docs] def __init__(self, repoDirectory=None, raiseIfDirty=True): """ Parameters ---------- repoDirectory : string directory where the the `.git` directory is located raiseIfDirty : bool if set and the repo contains uncommitted code, it raises an error """ self.repoDirectory = repoDirectory self.raiseIfDirty = raiseIfDirty self.repoExists = False try: repo = Repo(path=self.repoDirectory) self.repoExists = True except NoSuchPathError: print(f'Directory {self.repoDirectory} does not exist') return except InvalidGitRepositoryError: print(f'No git repository in {self.repoDirectory}') return if repo.is_dirty(): if self.raiseIfDirty: raise ValueError("The current git repo is dirty; please commit your work before you run the pipeline.") print('Warning! The current git repo is dirty; this will result in incorrect commit hash in datasets.') try: commit = repo.commit() except ValueError as ex: print('No commits in this repo; please create an initial commit.') raise ex self.commitHash = commit.hexsha self.commitMessage = commit.message self.comitterName = commit.committer.name self.comitterEmail = commit.committer.email try: self.branchName = repo.active_branch.name except TypeError: self.branchName = 'DETACHED'
[docs]class HypergolProject: """Owner of all information about the project CLI functions define what needs to be created, and this class creates them. It also consistently handles the mode flags (normal/dryrun/force) It also verifies if a requested class exists in the respective directory (data_models, tasks) and identifies its type, e.g.: for ``HelloWorld`` it checks if ``data_models/hello_world.py`` or ``tasks/hello_world.py`` exists and assumes its role from that. Used in :func:`.create_data_model` and :func:`.create_pipeline` """
[docs] def __init__(self, projectDirectory=None, dataDirectory='.', chunkCount=16, dryrun=None, force=None, repoManager=None): """ Parameters ---------- projectDirectory : string location of the project: e.g.: ``~/repo_name``, models will be in ``~/repo_name/models`` projectDirectory : string location of the data for the project project: e.g.: ``~/data``, files will be stored in ``~/data/repo_name`` dryrun : bool (default=None) If set to ``True`` it returns the generated code as a string force : bool (default=None) If set to ``True`` it overwrites the target file """ if force and dryrun: raise ValueError('Both force and dryrun are set') if projectDirectory is None: projectDirectory = os.getcwd() if projectDirectory.endswith('/'): projectDirectory = projectDirectory[:-1] if dataDirectory.endswith('/'): dataDirectory = dataDirectory[:-1] if repoManager is None: repoManager = RepoManager(repoDirectory=projectDirectory, raiseIfDirty=not force) self.repoManager = repoManager self.projectName = NameString(os.path.basename(projectDirectory)) self.projectDirectory = projectDirectory self.dataDirectory = dataDirectory self.dataModelsPath = Path(projectDirectory, 'data_models') self.tasksPath = Path(projectDirectory, 'tasks') self.pipelinesPath = Path(projectDirectory, 'pipelines') self.modelsPath = Path(projectDirectory, 'models') self.blocksPath = Path(projectDirectory, 'models', 'blocks') self.testsPath = Path(projectDirectory, 'tests') self._init_known_class_lists() self.templateEnvironment = jinja2.Environment( loader=jinja2.FileSystemLoader( searchpath=Path(hypergol.__path__[0], 'cli', 'templates') ) ) self.mode = Mode.DRY_RUN if dryrun else Mode.FORCE if force else Mode.NORMAL if not self.repoManager.repoExists: self.datasetFactory = None self.tensorboardPath = None self.modelDataPath = None print('Repo does not exist, data related functionality disabled.') return self.datasetFactory = DatasetFactory( location=self.dataDirectory, project=self.projectName.asSnake, branch=self.repoManager.branchName, chunkCount=chunkCount, repoData=RepoData( branchName=self.repoManager.branchName, commitHash=self.repoManager.commitHash, commitMessage=self.repoManager.commitMessage, comitterName=self.repoManager.comitterName, comitterEmail=self.repoManager.comitterEmail ) ) self.tensorboardPath = Path(dataDirectory, self.projectName.asSnake, 'tensorboard', self.repoManager.branchName) self.modelDataPath = Path(dataDirectory, self.projectName.asSnake, self.repoManager.branchName, 'models')
def _init_known_class_lists(self): self._dataModelClasses = [] self._taskClasses = [] self._modelBlockClasses = [] if os.path.exists(self.dataModelsPath): dataModelFiles = glob.glob(str(Path(self.dataModelsPath, '[!_][!_]*.py'))) self._dataModelClasses = [NameString(os.path.split(filePath)[1][:-3]) for filePath in dataModelFiles] if os.path.exists(self.tasksPath): taskFiles = glob.glob(str(Path(self.projectDirectory, 'tasks', '[!_][!_]*.py'))) self._taskClasses = [NameString(os.path.split(filePath)[1][:-3]) for filePath in taskFiles] if os.path.exists(self.blocksPath): blockFiles = glob.glob(str(Path(self.projectDirectory, 'models', 'blocks', '[!_][!_]*.py'))) self._modelBlockClasses = [NameString(os.path.split(filePath)[1][:-3]) for filePath in blockFiles] @property def isDryRun(self): return self.mode == Mode.DRY_RUN @property def modeMessage(self): if self.mode == Mode.NORMAL: return '' return f' - Mode: {self.mode}'
[docs] def cli_final_message(self, creationType, name, content): creationPath = None if creationType == 'Model': creationPath = self.modelsPath elif creationType == 'Class': creationPath = self.dataModelsPath elif creationType == 'ModelBlock': creationPath = self.modelsPath elif creationType == 'PipeLine': creationPath = self.pipelinesPath elif creationType == 'Project': creationPath = self.projectDirectory elif str(creationType) in ['Source', 'Task']: creationPath = self.tasksPath if creationPath is None: raise ValueError(f'{creationType} is an unknown type') print('') print(f'{creationType} {name} was created in directory {creationPath}.{self.modeMessage}') print('') if self.isDryRun: return content return None
[docs] def create_model_directory(self, modelName): create_directory(path=Path(self.modelsPath, modelName.asSnake), mode=self.mode)
[docs] def create_project_directory(self): create_directory(path=self.projectDirectory, mode=self.mode)
[docs] def create_data_models_directory(self): create_directory(path=self.dataModelsPath, mode=self.mode)
[docs] def create_tasks_directory(self): create_directory(path=self.tasksPath, mode=self.mode)
[docs] def create_pipelines_directory(self): create_directory(path=self.pipelinesPath, mode=self.mode)
[docs] def create_blocks_directory(self): create_directory(path=self.blocksPath, mode=self.mode)
[docs] def create_models_directory(self): create_directory(path=self.modelsPath, mode=self.mode)
[docs] def create_tests_directory(self): create_directory(path=self.testsPath, mode=self.mode)
[docs] def is_data_model_class(self, value: NameString): """Checks if a name is a data_model class (based on if the snakecase .py file exists)""" return value in self._dataModelClasses
[docs] def is_task_class(self, value: NameString): """Checks if a name is in tasks class (based on if the snakecase .py file exists)""" return value in self._taskClasses
[docs] def is_model_block_class(self, value: NameString): """Checks if a name is in blocks class (based on if the snakecase .py file exists)""" return value in self._modelBlockClasses
[docs] def check_dependencies(self, dependencies): """Raises an error if any dependency is unknown""" for dependency in dependencies: if dependency not in self._dataModelClasses + self._taskClasses + self._modelBlockClasses: raise ValueError(f'Unknown dependency {dependency}')
[docs] def create_text_file(self, filePath, content): create_text_file(filePath=filePath, content=content, mode=self.mode)
[docs] def render(self, templateName, templateData, filePath): """Creates a file from a template using jinja2 Parameters ---------- templateName : string filename of the template templateData : dict data to fill the template with filePath : Path full path of the destination file (ignored if self.mode != Mode.DRY_RUN) """ content = self.templateEnvironment.get_template(templateName).render(templateData) if len(content) > 0 and content[-1] != '\n': content += '\n' self.create_text_file(filePath=filePath, content=content) return content
[docs] def make_file_executable(self, filePath): print(f'Making file {filePath} executable.{self.modeMessage}') self._test_existence(path=filePath, objectName='File') if self.mode != Mode.DRY_RUN: fileStat = os.stat(filePath) if os.getuid() == fileStat.st_uid: os.chmod(filePath, fileStat.st_mode | stat.S_IXUSR)
def _test_existence(self, path, objectName): if not os.path.exists(path): if self.mode == Mode.DRY_RUN: print(f'{objectName} {path} does not exist.{self.modeMessage}') else: raise ValueError(f'{objectName} {path} does not exist.{self.modeMessage}')
[docs] def render_executable(self, templateName, templateData, filePath): content = self.render(templateName=templateName, templateData=templateData, filePath=filePath) self.make_file_executable(filePath=filePath) return content
[docs] def render_simple(self, templateName, filePath): return self.render(templateName=templateName, templateData={'name': self.projectName}, filePath=filePath)
[docs] def list_datasets(self, pattern=None, asCode=False): """Convenience function to list datasets for a project Returns a list of data loaded from the ``.def`` files in the directory Parameters ---------- pattern : string (None) Regex pattern to filter on dataset names, if unspecified, defaults to ``.*`` asCode : bool (False) If True prints a code snippet that allows the dataset to be loaded (with imports and path updates) """ if pattern is None: pattern = '.*' dataPath = Path(self.dataDirectory, self.projectName.asSnake) result = [] for pathName, _, fileNames in os.walk(dataPath): for fileName in fileNames: if fileName.endswith('.def') and re.match(pattern, fileName[:-4]) is not None: data = json.load(open(Path(pathName, fileName), 'rt')) result.append(data) if asCode: values = {**data, **data['repo']} values['location'] = self.dataDirectory values['commitMessage'] = values['commitMessage'].replace('\n', '\\n') values['dataTypeFile'] = NameString(name=values['dataType']).asSnake values['projectDirectory'] = self.projectDirectory print(DATASET_TEMPLATE.format(**values)) return result
[docs] def diff_data_model(self, commit, *args): """Convenience function to compare old data model class definitions to the current one Prints the diffs from the specified commit to the current commit Parameters ---------- commit : string The git commit from where the comparison starts *args : List[string] List of class names to compare, if empty it compares all """ if len(args) == 0: names = self._dataModelClasses else: names = [NameString(name) for name in args] repo = Repo(self.projectDirectory) if repo.is_dirty(): print('Warning! Current git repo is dirty, this will result in incorrect diff') currentCommit = repo.commit().hexsha for name in names: print(f'------ data_models/{name.asSnake}.py ------') print(repo.git.diff(commit, currentCommit, f'data_models/{name.asSnake}.py'))
[docs] def create_old_data_model(self, commit, *args): """Convenience function to generate data model classes at an old commit to be able to load datasets created then Full commit hash required. ``project.create_old_data_model(commit='fbd8110b7194425e2323f68ef54dac15bb01ee7b', 'OneClass', 'TwoClass')`` Will create ``data_models/one_class_fbd8110.py`` and ``data_models/two_class_fbd8110.py`` and replaces all occurences of ``OneClass`` and ``TwoClass`` to ``OneClassFBD8110`` and ``TwoClassFBD8110`` in each file. Parameters ---------- commit : string git commit to retrieve classes from args : List[string] List of class names to generate, if empty it generates all """ if len(args) == 0: names = self._dataModelClasses else: names = [NameString(name) for name in args] result = [] repo = Repo(self.projectDirectory) if repo.is_dirty(): print('Warning! The current git repo is dirty; this will result in incorrect data_model_files created.') for name in names: content = repo.git.show(f'{commit}:data_models/{name.asSnake}.py') for oldName in names: content = content.replace(oldName.asClass, f'{oldName.asClass}{commit[:7].upper()}') content = content.replace(f'data_models.{oldName.asSnake}', f'data_models.{oldName.asSnake}_{commit[:7]}') if self.isDryRun: result.append(content) print(f'DRYRUN - Creating class {name.asClass}{commit[:7].upper()} in {name.asSnake}_{commit[:7]}.py') print(content+'\n') else: print(f'Creating class {name.asClass}{commit[:7].upper()} in {name.asSnake}_{commit[:7]}.py') with open(Path(self.dataModelsPath, f'{name.asSnake}_{commit[:7]}.py'), 'wt') as outFile: outFile.write(content+'\n') self._init_known_class_lists() return result