Source code for hypergol.datachunk

import gzip
import json
import hashlib

from hypergol.repr import Repr


class DatasetTypeDoesNotMatchDataTypeException(Exception):
    pass


class DataChunkChecksum(Repr):

    def __init__(self, chunk, value):
        self.chunk = chunk
        self.value = value


[docs]class DataChunk(Repr): """This class represents the file that the data is actually stored in When opened for writing it implements the :func:`append()` method and when reading the :func:`__iter__` iterator. Upon close, it returns the checksum (SHA1 hash) of the content that was written into it. """
[docs] def __init__(self, dataset, chunkId, mode): """ Parameters ---------- dataset : Dataset The dataset this class chunk belongs to chunkId : str The hexadecimal identified of this chunk mode : str = ('w' or 'r') The mode this chunk was created to be opened in, determined by :func:`Dataset.get_data_chunks()` """ self.dataset = dataset self.chunkId = chunkId self.mode = mode self.file = None self.hasher = None self.checksum = None
@property def fileName(self): """Name of the file the data will be stored""" return f'{self.dataset.name}_{self.chunkId}.jsonl.gz'
[docs] def open(self): """Opens the chunk according to the mode specified at creation""" fileName = f'{self.dataset.directory}/{self.fileName}' self.file = gzip.open(fileName, f'{self.mode}t') self.hasher = hashlib.sha1((self.checksum or '').encode('utf-8')) return self
[docs] def close(self): """Closes the file handler and gets the checksum and returns it as ``DataChunkChecksum`` object""" self.file.close() self.file = None self.checksum = self.hasher.hexdigest() self.hasher = None return DataChunkChecksum(chunk=self, value=self.checksum)
[docs] def append(self, value): """Adds a data model object to the file, raises an error if the type doesn't match the dataset's type or the hash of the object doesn't match the chunkId Parameters ---------- value : object Data model object matching the type of the Dataset this chunk belongs to """ if not isinstance(value, self.dataset.dataType): raise DatasetTypeDoesNotMatchDataTypeException(f"Trying to append an object of type {value.__class__.__name__} into a dataset of type {self.dataset.dataType.__name__}") if self.dataset.get_object_chunk_id(value.get_hash_id()) != self.chunkId: raise ValueError(f'Incorrect hashId {self.dataset.get_object_chunk_id(value)} was inserted into {self.dataset.name} chunk {self.chunkId}.') self.write(data=f'{json.dumps(value.to_data(), sort_keys=True)}\n')
[docs] def write(self, data): """Writes into the file and updates the hash, used in multithreaded rechunking in :class:`Task`""" self.hasher.update(data.encode('utf-8')) self.file.write(data)
def __iter__(self): """Iterator to read all the data from the file""" for line in self.file: yield self.dataset.dataType.from_data(json.loads(line.rstrip()))