diff options
Diffstat (limited to 'jimbrella/csv_table.py')
-rw-r--r-- | jimbrella/csv_table.py | 108 |
1 files changed, 0 insertions, 108 deletions
diff --git a/jimbrella/csv_table.py b/jimbrella/csv_table.py deleted file mode 100644 index 8827e5f..0000000 --- a/jimbrella/csv_table.py +++ /dev/null @@ -1,108 +0,0 @@ -import csv -import os -import logging -from .lockfile import Lockfile -from .utils import identity - - -class CsvTable: - def __init__(self, path, schema: list): - """A generic CSV table of data, and basic I/O operations. - - The first column should be unique. If so, it can be used to index rows, a feature of - method `_find`. - - Arguments: - - path | file path for table. - - schema | list of dicts, each for a datum field (column). - | each dict contains the following keys: - | "name": what the column will be called in the dict that `_read` returns. - | "serializer": function to be applied to the datum when being written. - | "deserializer": same as above except it is when datum is being read. - | the latter two are optional, and when they are not specified, the default is - | the identity function, i.e. f such that f(x) = x for any x. - """ - self.path = path - for col in schema: - for func in ("serializer", "deserializer"): - if func not in col or col[func] is None: - col[func] = identity - - # schema: column number -> name and (de)serialization functions - # reverse_schema: name -> column number - self.schema = schema - self.reverse_schema = {schema[i]["name"]: i for i in range(len(schema))} - self.lockfile = Lockfile(self.path) - # Create file if it does not yet exist - try: - f = open(self.path, "x") - f.close() - except FileExistsError: - pass - - def _read(self) -> list: - """Deserialize table.""" - with open(self.path) as f: - reader = csv.reader(f) - rows = [] - # `rows` is a list of 2-tuples - for ln, row in enumerate(reader): - # for each tuple (k, v) in `rows`, - # it will be unzipped into a dict key-value pair - try: - rows.append( - dict( - [ - (sch["name"], sch["deserializer"](datum)) - for sch, datum in zip(self.schema, row) - ] - ) - ) - except Exception: - logging.warning("%s:%d cannot be read. Skip.", self.path, ln) - continue - - f.close() - return rows - - def _write(self, rows: list) -> None: - """Serialize table. When a failure occurs, abort and recover data.""" - # make backup in memory - with open(self.path) as f: - backup = f.read() - f.close() - - self.lockfile.lock() - - f = open(self.path, "w") - try: - writer = csv.writer(f) - for row in rows: - writer.writerow([row[col["name"]] for col in self.schema]) - except Exception as e: - # failure occurred on write - # abort write, and write back contents as they were before - f.close() - f = open(self.path, "w") - f.write(backup) - raise e - finally: - f.close() - self.lockfile.unlock() - - def _append(self, row) -> list: - """Append one row, and return the entire updated table.""" - rows = self._read() - rows.append(row) - self._write(rows) - return rows - - def _update(self, update: dict) -> list: - """Update one row, and return the entire updated table.""" - rows = self._read() - index_column = self.schema[0]["name"] - for idx, row in enumerate(rows): - if row[index_column] == update[index_column]: - rows[idx] = update - self._write(rows) - return rows |