import csv import os from .lockfile import Lockfile from .utils import identity class CsvTable: def __init__(self, path, schema: list): """A generic CSV table of data, and basic I/O operations. The first column should be unique. If so, it can be used to index rows, a feature of method `_find`. Arguments: - path | file path for table. - schema | list of dicts, each for a datum field (column). | each dict contains the following keys: | "name": what the column will be called in the dict that `_read` returns. | "serializer": function to be applied to the datum when being written. | "deserializer": same as above except it is when datum is being read. | the latter two are optional, and when they are not specified, the default is | the identity function, i.e. f such that f(x) = x for any x. """ self.path = path for col in schema: for func in ("serializer", "deserializer"): if func not in col or col[func] is None: col[func] = identity # schema: column number -> name and (de)serialization functions # reverse_schema: name -> column number self.schema = schema self.reverse_schema = {schema[i]["name"]: i for i in range(len(schema))} self.lockfile = Lockfile(self.path) # Create file if it does not yet exist try: f = open(self.path, "x") f.close() except FileExistsError: pass def _read(self) -> list: """Deserialize table.""" with open(self.path) as f: reader = csv.reader(f) rows = [] # `rows` is a list of 2-tuples for row in reader: # for each tuple (k, v) in `rows`, # it will be unzipped into a dict key-value pair rows.append( dict( [ (sch["name"], sch["deserializer"](datum)) for sch, datum in zip(self.schema, row) ] ) ) f.close() return rows def _write(self, rows: list) -> None: """Serialize table. When a failure occurs, abort and recover data.""" # make backup in memory with open(self.path) as f: backup = f.read() f.close() self.lockfile.lock() f = open(self.path, "w") try: writer = csv.writer(f) for row in rows: writer.writerow([row[col["name"]] for col in self.schema]) except Exception as e: # failure occurred on write # abort write, and write back contents as they were before # TODO: keep log f.close() f = open(self.path, "w") f.write(backup) raise e finally: f.close() self.lockfile.unlock() def _append(self, row) -> list: """Append one row, and return the entire updated table.""" rows = self._read() rows.append(row) self._write(rows) return rows def _update(self, update: dict) -> list: """Update one row, and return the entire updated table.""" rows = self._read() index_column = self.schema[0]["name"] for idx, row in enumerate(rows): if row[index_column] == update[index_column]: rows[idx] = update self._write(rows) return rows