import os
import glob
from pathlib import Path
from hypergol.datachunk import DataChunk
from hypergol.repr import Repr
from hypergol.utils import get_hash
from hypergol.repo_data import RepoData
from hypergol.dataset_chk_file import DataSetChkFile
from hypergol.dataset_def_file import DataSetDefFile
VALID_CHUNKS = {16: 1, 256: 2, 4096: 3}
class DatasetDoesNotExistException(Exception):
pass
class DatasetAlreadyExistsException(Exception):
pass
[docs]class Dataset(Repr):
"""
Dataset class to store BaseData objects that is readable/writable in a parallel manner.
Files will be stored in: ``location/project/branch/name/name_???.jsonl.gz``
"""
[docs] def __init__(self, dataType, location, project, branch, name, repoData=None, chunkCount=16):
"""
Parameters
----------
dataType : BaseData
Type of this dataset, only ``dataType`` objects can be stored in this dataset
location : str
path the project is in
project : str
project name
branch : str
branch name
name : str
name of this dataset
repoData : RepoData = None
stores the commit information at the creation of the dataset
chunkCount : int = {16 ( default), 256, 4096}
How many files the data will be stored in, sets the granularity of multithreaded processing
"""
self.dataType = dataType
self.location = location
self.project = project
self.branch = branch
self.name = name
self.chunkCount = chunkCount
self.repoData = repoData or RepoData.get_dummy()
self.chkFile = DataSetChkFile(dataset=self)
self.defFile = DataSetDefFile(dataset=self)
[docs] def add_dependency(self, dataset):
"""Adds the ``.def`` file of a dataset to the ``.def`` file of this dataset so data lineage can be retraced
Parameters
----------
dataset : Dataset
dataset that contributes to the generation of this dataset
"""
self.defFile.add_dependency(dataset)
@property
def directory(self):
"""Full path of the directory this dataset will be in"""
return Path(self.location, self.project, self.branch, self.name)
[docs] def init(self, mode):
"""Checks the existence of the dataset
Parameters
----------
mode : str = ('w', 'r')
The mode the dataset is about to be opened
Based on the mode if
- mode=='w' : fails if the dataset already exists otherwise creates the ``.def`` file
- mode=='r' : fails if the dataset doesn't exist otherwise compares the data in the ``.def.`` file to the definition in the class.
- otherwise : fails due to unknown mode
"""
if mode == 'w':
if self.exists():
raise DatasetAlreadyExistsException(f"Dataset {self.directory} already exist, delete the dataset first with Dataset.delete()")
self.defFile.make_def_file()
elif mode == 'r':
if not self.exists():
raise DatasetDoesNotExistException(f'Dataset {self.directory} does not exist')
self.defFile.check_def_file()
else:
raise ValueError(f'Invalid mode: {mode} in {self.directory}')
[docs] def open(self, mode):
"""Opens the dataset for reading or writing
Parameters
----------
mode : str = ('w', 'r')
The mode the dataset is about to be opened
Returns a :class:`DatasetWriter` or :class:`DatasetReader` object that handles the reading or writing of the files through the dataset's chunks.
"""
if mode == 'w':
return DatasetWriter(dataset=self)
if mode == 'r':
return DatasetReader(dataset=self)
raise ValueError(f'Invalid mode: {mode} in {self.name}')
[docs] def get_chunk_ids(self):
"""Returns the list of :term:`chunk id`-s """
return [f'{k:0{VALID_CHUNKS[self.chunkCount]}x}' for k in range(self.chunkCount)]
[docs] def get_data_chunks(self, mode):
"""Initialises the dataset and creates all the :class:`Datachunk` classes"""
self.init(mode=mode)
return [
DataChunk(dataset=self, chunkId=chunkId, mode=mode)
for chunkId in self.get_chunk_ids()
]
[docs] def get_object_chunk_id(self, objectHashId):
"""Finds out which chunk the object belongs based on the :term:`hash id` """
return get_hash(objectHashId)[:VALID_CHUNKS[self.chunkCount]]
[docs] def delete(self):
"""Deletes the files and the directory of the dataset"""
if not self.exists():
raise DatasetDoesNotExistException(f'Dataset {self.name} does not exist')
for filename in glob.glob(f'{self.directory}/*'):
os.remove(filename)
os.rmdir(self.directory)
[docs] def exists(self):
"""True if the dataset's ``.def`` file exists"""
return os.path.exists(self.defFile.defFilename)
[docs]class DatasetReader(Repr):
"""Class to read from a dataset
Implements context manager and iterator. It doesn't open any file until any reading actually happens and then opens each chunk one by one.
"""
[docs] def __init__(self, dataset):
self.dataset = dataset
self.dataChunks = self.dataset.get_data_chunks(mode='r')
def __enter__(self):
return self
def __exit__(self, *args):
pass
def __iter__(self):
for chunk in self.dataChunks:
try:
chunk.open()
for elem in chunk:
yield elem
finally:
chunk.close()
[docs]class DatasetWriter(Repr):
"""Class to write into a dataset"""
[docs] def __init__(self, dataset):
"""Opens all chunks at once and puts them in a dictionary for easy lookup
Implements context manager for proper file open/close.
Parameters
----------
dataset : Dataset
Dataset to be written into, at this point it is already established that it doesn't yet exist.
"""
self.dataset = dataset
self.dataChunks = {dataChunk.chunkId: dataChunk.open() for dataChunk in self.dataset.get_data_chunks(mode='w')}
[docs] def append(self, elem):
"""Writes a single object into the right chunks"""
chunkHash = self.dataset.get_object_chunk_id(elem.get_hash_id())
self.dataChunks[chunkHash].append(elem)
[docs] def close(self):
"""Closes the files and writes the ``.chk`` file"""
checksums = []
for chunk in self.dataChunks.values():
checksum = chunk.close()
checksums.append(checksum)
self.dataset.chkFile.make_chk_file(checksums=checksums)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()