diff options
author | Frederick Yin <fkfd@fkfd.me> | 2021-10-27 20:50:47 +0800 |
---|---|---|
committer | Frederick Yin <fkfd@fkfd.me> | 2021-10-27 20:50:47 +0800 |
commit | 328e7891bbdcb4c869112fa44438d8da5ea1ce5e (patch) | |
tree | c459af04a16f7677836cff2afd18127e1f817bb3 /jimbrella | |
parent | 00e4e6ecc80f8e3654bd176ec7067ab35b55d6f6 (diff) |
Refactor: separate CSV table into new module
Database is now subclass of CsvTable
Diffstat (limited to 'jimbrella')
-rw-r--r-- | jimbrella/csv_table.py | 97 | ||||
-rw-r--r-- | jimbrella/database.py | 100 | ||||
-rw-r--r-- | jimbrella/utils.py | 3 |
3 files changed, 119 insertions, 81 deletions
diff --git a/jimbrella/csv_table.py b/jimbrella/csv_table.py new file mode 100644 index 0000000..6f431a7 --- /dev/null +++ b/jimbrella/csv_table.py @@ -0,0 +1,97 @@ +import csv +import os +from .lockfile import Lockfile +from .utils import identity + + +class CsvTable: + def __init__(self, path, schema: list[dict]): + """A generic CSV table of data, and basic I/O operations. + + The first column should be unique. If so, it can be used to index rows, a feature of + method `_find`. + + Arguments: + - path | file path for table. + - schema | list of dicts, each for a datum field (column). + | each dict contains the following keys: + | "name": what the column will be called in the dict that `_read` returns. + | "serializer": function to be applied to the datum when being written. + | "deserializer": same as above except it is when datum is being read. + | the latter two are optional, and when they are not specified, the default is + | the identity function, i.e. f such that f(x) = x for any x. + """ + self.path = path + for col in schema: + for func in ("serializer", "deserializer"): + if func not in col or col[func] is None: + col[func] = identity + + # schema: column number -> name and (de)serialization functions + # reverse_schema: name -> column number + self.schema = schema + self.reverse_schema = {schema[i]["name"]: i for i in range(len(schema))} + self.lockfile = Lockfile(self.path) + # Create file if it does not yet exist + try: + f = open(self.path, "x") + f.close() + except FileExistsError: + pass + + def _read(self) -> list: + """Deserialize table.""" + with open(self.path) as f: + reader = csv.reader(f) + rows = [] + # `rows` is a list of 2-tuples + for row in reader: + # for each tuple (k, v) in `rows`, + # it will be unzipped into a dict key-value pair + rows.append( + dict( + [ + (sch["name"], sch["deserializer"](datum)) + for sch, datum in zip(self.schema, row) + ] + ) + ) + f.close() + return rows + + def _write(self, rows: list) -> None: + """Serialize table. When a failure occurs, abort and recover data.""" + # make backup in memory + with open(self.path) as f: + backup = f.read() + f.close() + + self.lockfile.lock() + + f = open(self.path, "w") + try: + writer = csv.writer(f) + for row in rows: + writer.writerow([row[col["name"]] for col in self.schema]) + except Exception as e: + # failure occurred on write + # abort write, and write back contents as they were before + # TODO: keep log + f.close() + f = open(self.path, "w") + f.write(backup) + raise e + finally: + f.close() + + self.lockfile.unlock() + + def _update(self, update: dict) -> list: + """Update status of one row, and return the entire updated table.""" + rows = self._read() + index_column = self.schema[0]["name"] + for idx, row in enumerate(rows): + if row[index_column] == update[index_column]: + rows[idx] = update + self._write(rows) + return rows diff --git a/jimbrella/database.py b/jimbrella/database.py index e43b8f2..07ff6e0 100644 --- a/jimbrella/database.py +++ b/jimbrella/database.py @@ -1,15 +1,16 @@ import csv import os from datetime import datetime, timedelta +from .csv_table import CsvTable from .lockfile import Lockfile -from .utils import human_datetime, human_timedelta +from .utils import identity, human_datetime, human_timedelta from .config import DUE_HOURS from .exceptions import * STATUSES = ["available", "lent", "overdue", "maintenance", "withheld", "unknown"] -class Database: +class Database(CsvTable): def __init__(self, path): """A database of all umbrellas and their current state. @@ -43,86 +44,23 @@ class Database: - lent_at | (date) if status is "lent", lent_at is the submission time of the user's | jForm answer sheet. is blank otherwise. """ - self.path = path - self.lockfile = Lockfile(self.path) - - def _read(self) -> list: - """Deserialize database.""" - # Create database if it does not yet exist - try: - f = open(self.path, "x") - f.close() - except FileExistsError: - pass - - with open(self.path) as f: - reader = csv.reader(f) - umbrellas = [ + super().__init__( + path, + [ + {"name": "serial", "deserializer": int}, + {"name": "alias"}, + {"name": "status"}, + {"name": "tenant_name"}, + {"name": "tenant_id"}, + {"name": "tenant_phone"}, + {"name": "tenant_email"}, { - "serial": int(row[0]), - "alias": row[1], - "status": row[2], - "tenant_name": row[3], - "tenant_id": row[4], - "tenant_phone": row[5], - "tenant_email": row[6], - "lent_at": datetime.fromisoformat(row[7]) if row[7] else None, - } - for row in reader - ] - f.close() - - return umbrellas - - def _write(self, umbrellas: list) -> None: - """Serialize database. When a failure occurs, abort and recover data.""" - # make backup in memory - with open(self.path) as f: - backup = f.read() - f.close() - - self.lockfile.lock() - - f = open(self.path, "w") - try: - writer = csv.writer(f) - writer.writerows( - [ - [ - umb["serial"], - umb["alias"], - umb["status"], - umb["tenant_name"], - umb["tenant_id"], - umb["tenant_phone"], - umb["tenant_email"], - umb["lent_at"].isoformat(timespec="seconds") - if umb["lent_at"] - else "", - ] - for umb in umbrellas - ] - ) - except Exception as e: - # failure occurred on write - # abort write, and write back contents as they were before - # TODO: pass on exception and keep log - f.close() - f = open(self.path, "w") - f.write(backup) - f.close() - raise e - - self.lockfile.unlock() - - def _update(self, update: dict) -> list: - """Update status of one umbrella, and return the entire updated database.""" - umbrellas = self._read() - for idx, umb in enumerate(umbrellas): - if umb["serial"] == update["serial"]: - umbrellas[idx] = update - self._write(umbrellas) - return umbrellas + "name": "lent_at", + "serializer": lambda d: d.isoformat(timespec="seconds") if d else "", + "deserializer": lambda d: datetime.fromisoformat(d) if d else None, + }, + ], + ) def _find_by_serial(self, serial: int) -> dict: """Given a serial number, returns an umbrella with such serial. diff --git a/jimbrella/utils.py b/jimbrella/utils.py index cac8bc9..ce57bf9 100644 --- a/jimbrella/utils.py +++ b/jimbrella/utils.py @@ -1,5 +1,8 @@ from datetime import datetime, timedelta, tzinfo +# identity function +identity = lambda x: x + def human_datetime(time: datetime) -> str: return "{:%Y-%m-%d %H:%M:%S}".format(time) |