diff options
author | Frederick Yin <fkfd@fkfd.me> | 2022-02-01 12:43:25 +0800 |
---|---|---|
committer | Frederick Yin <fkfd@fkfd.me> | 2022-02-01 12:43:25 +0800 |
commit | 0e47cf9a8d06e896c5197cb28cb5a2a518d255d1 (patch) | |
tree | 22a0267dbfe204038aaac48d8596d436b5d56291 | |
parent | 444966b2ff4a04374856d3a6759bef9e9f451c03 (diff) |
SQLite in favor of CSV database
Deprecate csv database format, move around some methods
-rw-r--r-- | jimbrella/csv_table.py | 108 | ||||
-rw-r--r-- | jimbrella/database.py | 252 | ||||
-rw-r--r-- | jimbrella/lockfile.py | 33 | ||||
-rw-r--r-- | jimbrella/umbrellas.py | 189 | ||||
-rw-r--r-- | jimbrella/utils.py | 28 |
5 files changed, 198 insertions, 412 deletions
diff --git a/jimbrella/csv_table.py b/jimbrella/csv_table.py deleted file mode 100644 index 8827e5f..0000000 --- a/jimbrella/csv_table.py +++ /dev/null @@ -1,108 +0,0 @@ -import csv -import os -import logging -from .lockfile import Lockfile -from .utils import identity - - -class CsvTable: - def __init__(self, path, schema: list): - """A generic CSV table of data, and basic I/O operations. - - The first column should be unique. If so, it can be used to index rows, a feature of - method `_find`. - - Arguments: - - path | file path for table. - - schema | list of dicts, each for a datum field (column). - | each dict contains the following keys: - | "name": what the column will be called in the dict that `_read` returns. - | "serializer": function to be applied to the datum when being written. - | "deserializer": same as above except it is when datum is being read. - | the latter two are optional, and when they are not specified, the default is - | the identity function, i.e. f such that f(x) = x for any x. - """ - self.path = path - for col in schema: - for func in ("serializer", "deserializer"): - if func not in col or col[func] is None: - col[func] = identity - - # schema: column number -> name and (de)serialization functions - # reverse_schema: name -> column number - self.schema = schema - self.reverse_schema = {schema[i]["name"]: i for i in range(len(schema))} - self.lockfile = Lockfile(self.path) - # Create file if it does not yet exist - try: - f = open(self.path, "x") - f.close() - except FileExistsError: - pass - - def _read(self) -> list: - """Deserialize table.""" - with open(self.path) as f: - reader = csv.reader(f) - rows = [] - # `rows` is a list of 2-tuples - for ln, row in enumerate(reader): - # for each tuple (k, v) in `rows`, - # it will be unzipped into a dict key-value pair - try: - rows.append( - dict( - [ - (sch["name"], sch["deserializer"](datum)) - for sch, datum in zip(self.schema, row) - ] - ) - ) - except Exception: - logging.warning("%s:%d cannot be read. Skip.", self.path, ln) - continue - - f.close() - return rows - - def _write(self, rows: list) -> None: - """Serialize table. When a failure occurs, abort and recover data.""" - # make backup in memory - with open(self.path) as f: - backup = f.read() - f.close() - - self.lockfile.lock() - - f = open(self.path, "w") - try: - writer = csv.writer(f) - for row in rows: - writer.writerow([row[col["name"]] for col in self.schema]) - except Exception as e: - # failure occurred on write - # abort write, and write back contents as they were before - f.close() - f = open(self.path, "w") - f.write(backup) - raise e - finally: - f.close() - self.lockfile.unlock() - - def _append(self, row) -> list: - """Append one row, and return the entire updated table.""" - rows = self._read() - rows.append(row) - self._write(rows) - return rows - - def _update(self, update: dict) -> list: - """Update one row, and return the entire updated table.""" - rows = self._read() - index_column = self.schema[0]["name"] - for idx, row in enumerate(rows): - if row[index_column] == update[index_column]: - rows[idx] = update - self._write(rows) - return rows diff --git a/jimbrella/database.py b/jimbrella/database.py deleted file mode 100644 index 9b6e51f..0000000 --- a/jimbrella/database.py +++ /dev/null @@ -1,252 +0,0 @@ -from datetime import datetime, timedelta -from .csv_table import CsvTable -from .utils import human_datetime, human_timedelta -from .config import DUE_HOURS, ADMIN_LOG_PATH -from .exceptions import * - -STATUSES = ["available", "lent", "overdue", "maintenance", "withheld", "unknown"] - - -class Database(CsvTable): - def __init__(self, path): - """A database of all umbrellas and their current state. - - Currently, the data are represented in a csv file. - - Explanation of data types: - - id | a unique numerical identity of unsigned integer type. - - uint | whatever fits the regex /[0-9]+/. despite carrying a numerical value, it is - | internally represented as a string. - - string | any UTF-8 string. - - date | an ISO 8601 date of format "YYYY-MM-DDThh:mm:ss+08:00". - - The status of an umbrella consists of: - - serial | (id) unique identifier for the umbrella. - - alias | (string) future compatibility. a human readable (preferably cute) name - | for a particular umbrella - - status | (string) one of ("available", "lent", "overdue", "maintenance", - | "withheld", "unknown") - | available : is in service on the stand - | lent : is in temporary possession of a user - | overdue : is in overly prolonged possession of a user - | maintenance : is on the stand but not in service - | withheld : is not on the stand but rather in the possession of an - | administrator - | unknown : none of the above - | NOTE: the values above are subject to changes. - - tenant_name | (string) the person in temporary possession of the umbrella. - - tenant_id | (uint) student or faculty ID. - - tenant_phone | (uint) phone number via which to contact tenant when the lease is due. - - tenant_email | (string) future compatibility. empty for the time being. - - lent_at | (date) if status is "lent", lent_at is the submission time of the user's - | jForm answer sheet. is blank otherwise. - """ - super().__init__( - path, - [ - {"name": "serial", "deserializer": int}, - {"name": "alias"}, - {"name": "status"}, - {"name": "tenant_name"}, - {"name": "tenant_id"}, - {"name": "tenant_phone"}, - {"name": "tenant_email"}, - { - "name": "lent_at", - "serializer": lambda d: d.isoformat(timespec="seconds") - if d - else "", - "deserializer": lambda d: datetime.fromisoformat(d) if d else None, - }, - ], - ) - - def _find_by_serial(self, serial: int) -> dict: - """Given a serial number, returns an umbrella with such serial. - If there is no such umbrella, returns None. - """ - umbrellas = self._read() - for umb in umbrellas: - if umb["serial"] == serial: - return umb - - def read(self) -> list: - """An interface to `_read()` with supplemental data included. - - All exposed methods with a return value should use this method instead of `_read()`. - - Supplemental data: - - lent_at_str: string representation for lent_at. - - lent_time_ago: time since umbrella was taken away by tenant. if umbrella is not - taken away, its value is None. - - lent_time_ago_str: string representation for lent_time_ago. - """ - umbrellas = self._read() - now = datetime.now() - for idx, umb in enumerate(umbrellas): - if umb["status"] in ("lent", "overdue"): - umbrellas[idx]["lent_at_str"] = human_datetime(umb["lent_at"]) - lent_time_ago = now - umb["lent_at"] - umbrellas[idx]["lent_time_ago"] = lent_time_ago - umbrellas[idx]["lent_time_ago_str"] = human_timedelta(lent_time_ago) - return umbrellas - - def update(self, umb) -> dict: - """An interface to `_update()` with added convenience and safeguards. - - Convenience: Not all fields in an umbrella dict need to be present in `umb`. If an - optional field is not there, its value is left untouched. Every field other - than `serial` is optional. - - `serial` may be an int or a str that evaluates to the correct serial when - int() is applied to it. - - `tenant_id` and `tenant_phone` may be a str or an int. - - `lent_at` in `umb` may either be a datetime.datetime object, or an ISO 8601 - formatted string. - - Safeguards: Invalid values are rejected as an Exception, in the default case, an - UmbrellaValueError. - - Both: When a value is set, values of unneeded fields are automatically discarded - if applicable. For example, when `status` is set to "available", `tenant_*` - and `lent_at` are no longer needed; in fact, their existence is dangerous - as it implies the status of the umbrella is "lent" or "overdue", which is - not the case. Therefore, if `status` is set to "available", the unnecessary - fields mentioned above are made blank, and these fields supplied in `umb` - are ignored. - - Returns a dict, where the keys are all modified columns of the updated row, and the value - of each is a 2-tuple (<past value>, <new value>). - """ - # `serial` must be specified. - if "serial" not in umb: - raise UmbrellaValueError("serial") - - # check if umbrella #<serial> exists in database - umb["serial"] = int(umb["serial"]) - umb_in_db = self._find_by_serial(umb["serial"]) - if umb_in_db is None: - raise UmbrellaNotFoundError(umb["serial"]) - - status = umb_in_db["status"] - if "status" in umb and umb["status"]: - if status not in STATUSES: - raise UmbrellaValueError("status") # invalid - - # admin specifies a (perhaps different) status - status = umb["status"] - - if "lent_at" in umb and umb["lent_at"]: - # check if `umb` has valid date (`lent_at`) if any - if isinstance(umb["lent_at"], datetime): - lent_at = umb["lent_at"] - elif isinstance(umb["lent_at"], str): - try: - umb["lent_at"] = datetime.fromisoformat(umb["lent_at"]) - except ValueError: - raise UmbrellaValueError("lent_at") - else: - raise UmbrellaValueError("lent_at") - - # we will now check the validity of `umb` based on `status` - if status == "available": - # discard unneeded fields - for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]: - umb[key] = "" - umb["lent_at"] = None - elif status in ("lent", "overdue"): - # copy values in database into unfilled or non-existent fields of `umb` - for key in [ - "tenant_name", - "tenant_id", - "tenant_phone", - "tenant_email", - "lent_at", - ]: - umb[key] = umb[key] if (key in umb and umb[key]) else umb_in_db[key] - - if not umb["lent_at"]: - raise UmbrellaValueError("lent_at") - - # commit update - self._update(umb) - - # find updated columns - # essentially "diff umb_in_db umb" - diff = {} - for key, new in umb.items(): - past = umb_in_db[key] - if any([new, past]) and new != past: - # new and past are not both null values, and they are unequal - diff[key] = (past, new) - - return diff - - @staticmethod - def group_by_status(umbrellas) -> dict: - """(static method) Returns umbrellas grouped into a dict by their status.""" - keys = set([umb["status"] for umb in umbrellas]) - # initiate statuses: each status is [] - statuses = dict.fromkeys(STATUSES, []) - for key in keys: - statuses[key] = [umb for umb in umbrellas if umb["status"] == key] - return statuses - - @staticmethod - def find_overdue(umbrellas) -> list: - """(static method) Returns umbrellas in possession of their tenant for too long.""" - now = datetime.now() - return [ - umb - for umb in umbrellas - if umb["lent_at"] is not None - and now - umb["lent_at"] > timedelta(hours=DUE_HOURS) - ] - - def take_away( - self, serial, date, tenant_name, tenant_id, tenant_phone="", tenant_email="" - ) -> None: - """When a user has borrowed an umbrella.""" - umb = self._find_by_serial(serial) - if umb is None: - raise UmbrellaNotFoundError(serial) - if umb["status"] != "available": - raise UmbrellaStatusError - umb["status"] = "lent" - umb["tenant_name"] = tenant_name - umb["tenant_id"] = tenant_id - umb["tenant_phone"] = tenant_phone - umb["tenant_email"] = tenant_email - umb["lent_at"] = date - self._update(umb) - - def give_back(self, serial, tenant_name, tenant_id) -> None: - """When a user has returned an umbrella. - - `tenant_name` and `tenant_id` are used to verify if the umbrella is returned by the same - person who borrowed it. - """ - umb = self._find_by_serial(serial) - if umb is None: - raise UmbrellaNotFoundError(serial) - if umb["status"] not in ("lent", "overdue"): - raise UmbrellaStatusError - if umb["tenant_name"] != tenant_name or umb["tenant_id"] != tenant_id: - raise TenantIdentityError(umb["tenant_name"], tenant_name) - umb["status"] = "available" - for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]: - umb[key] = "" - umb["lent_at"] = None - self._update(umb) - - def mark_overdue(self, serial) -> None: - """When an umbrella is overdue, change its status to "overdue".""" - umb = self._find_by_serial(serial) - if umb is None: - raise UmbrellaNotFoundError(serial) - elif umb["status"] != "lent": - raise UmbrellaStatusError - umb["status"] = "overdue" - self._update(umb) diff --git a/jimbrella/lockfile.py b/jimbrella/lockfile.py deleted file mode 100644 index 987f0e3..0000000 --- a/jimbrella/lockfile.py +++ /dev/null @@ -1,33 +0,0 @@ -import os - - -class Lockfile: - """Prevent unwanted concurrency for file I/O. - - For a file named "<file>", create or remove a lockfile named "<file>.lock". - - When a process is modifying the file, call `lock(). When the modification - is done, call `unlock()`. - """ - - def __init__(self, filepath): - self.filepath = str(filepath) - self.lockpath = self.filepath + ".lock" - - def lock(self): - """Continue attempting to lock until locked.""" - locked = False - while not locked: - try: - f = open(self.lockpath, "x") - f.close() - locked = True - except FileExistsError: - continue - - def unlock(self): - """Remove lockfile.""" - try: - os.remove(self.lockpath) - except FileNotFoundError: - return diff --git a/jimbrella/umbrellas.py b/jimbrella/umbrellas.py new file mode 100644 index 0000000..05b103c --- /dev/null +++ b/jimbrella/umbrellas.py @@ -0,0 +1,189 @@ +import sqlite3 +from datetime import datetime, timedelta +from .utils import human_datetime, human_timedelta +from .config import DUE_HOURS, ADMIN_LOG_PATH +from .exceptions import * + +STATUSES = ["available", "lent", "overdue", "maintenance", "unknown"] + + +class Umbrellas: + def __init__(self, path): + """A database of all umbrellas and their current state. + + Currently, the data are represented in a SQLite database. Only admins have access to the + database, and have the power to modify it arbitrarily. + + An SQL row for an umbrella consists of these columns: + - id | int. unique identifier for the umbrella. + - status | string. possible values: + | available : is in service on the stand + | lent : is in temporary possession of a user + | overdue : is in overly prolonged possession of a user + | maintenance : is out of service + | unknown : none of the above + - tenant_name | string. the person in temporary possession of the umbrella. + - tenant_id | string. student or faculty ID. + - tenant_phone | string. phone number via which to contact tenant when the lease is due. + - tenant_email | string. for future compatibility. always None for the time being. + - lent_at | an ISO 8601 date string "YYYY-MM-DDThh:mm:ss+08:00" if status is + | "lent" or "overdue. is None otherwise. + + Schema: + CREATE TABLE Umbrellas( + id INT PRIMARY KEY, + status TEXT, + tenant_name TEXT, + tenant_id TEXT, + tenant_phone TEXT, + tenant_email TEXT, + lent_at TEXT + ); + """ + self.path = path + + def read(self) -> list: + db = sqlite3.connect(path) + db.row_factory = sqlite.Row + umbrellas = db.execute("SELECT * FROM Umbrellas").fetchall() + db.close() + return umbrellas + + def update(self, umb) -> None: + """Update Umbrella table with new data given in `umb`. + + Not all fields in an umbrella dict need to be present in `umb`. Only `id` is required. + If an optional field is not found, its value is left untouched. If an optional field is + present but its value is an empty string or None, the old datum will become NULL. + + Invalid values are rejected as an UmbrellaValueError. + + If `status` is not "lent" or "overdue", `tenant_*` and `lent_at` are automatically erased. + """ + # `id` must be specified. + try: + umb["id"] = int(umb["id"]) + except (KeyError, ValueError): + raise UmbrellaValueError("id") + + db = sqlite3.connect(path) + db.row_factory = sqlite.Row + + # check if umbrella #<id> exists in database + umbid = umb["id"] + umb_in_db = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid).fetchone() + if umb_in_db is None: + raise UmbrellaNotFoundError(umbid) + + status = umb_in_db["status"] + if "status" in umb and umb["status"] in STATUSES: + status = umb["status"] + db.execute("UPDATE Umbrellas SET status = ? WHERE id = ?", status, umbid) + else: + raise UmbrellaValueError("status") + + if status in ("lent", "overdue"): + try: + # timezone must be +08:00 + if ( + datetime.fromisoformat(umb["lent_at"]) + .tzinfo.utcoffset(None) + .seconds + != 28800 + ): + raise UmbrellaValueError("lent_at") + except ValueError: + raise UmbrellaValueError("lent_at") + + for key in ( + "tenant_name", + "tenant_id", + "tenant_phone", + "tenant_email", + "lent_at", + ): + if col in umb: + db.execute( + "UPDATE Umbrellas SET ? = ? WHERE id = ?", + col, + umb[col] or None, + umbid, + ) + else: + # discard unneeded fields + for col in ( + "tenant_name", + "tenant_id", + "tenant_phone", + "tenant_email", + "lent_at", + ): + db.execute("UPDATE Umbrellas SET ? = NULL WHERE id = ?", col, umbid) + + # now that new data are validated, commit the SQL transaction + db.commit() + db.close() + + def take_away( + self, umbid, date, tenant_name, tenant_id, tenant_phone="", tenant_email="" + ) -> None: + """When a user has borrowed an umbrella.""" + db = sqlite3.connect(path) + db.row_factory = sqlite3.Row + umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid) + db.close() + + if umb is None: + raise UmbrellaNotFoundError(umbid) + if umb["status"] != "available": + raise UmbrellaStatusError + + self.update( + { + "id": umbid, + "status": "lent", + "tenant_name": tenant_name, + "tenant_id": tenant_id, + "tenant_phone": tenant_phone, + "tenant_email": tenant_email, + "lent_at": date.isoformat(timespec="milliseconds"), + } + ) + + def give_back(self, umbid, tenant_name, tenant_id) -> None: + """When a user has returned an umbrella. + + `tenant_name` and `tenant_id` are used to verify if the umbrella is returned by the same + person who borrowed it. + """ + db = sqlite3.connect(path) + db.row_factory = sqlite3.Row + umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid) + db.close() + + if umb is None: + raise UmbrellaNotFoundError(umbid) + if umb["status"] not in ("lent", "overdue"): + raise UmbrellaStatusError + if umb["tenant_name"] != tenant_name or umb["tenant_id"] != tenant_id: + raise TenantIdentityError(umb["tenant_name"], tenant_name) + + self.update( + { + "id": umbid, + "status": "available", + } + ) + + def mark_overdue(self, umbid) -> None: + """When an umbrella is overdue, change its status to "overdue".""" + db = sqlite3.connect(path) + db.row_factory = sqlite3.Row + umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid) + + if umb is None: + raise UmbrellaNotFoundError(umbid) + elif umb["status"] != "lent": + raise UmbrellaStatusError + + self.update({"id": umbid, "status": "overdue"}) diff --git a/jimbrella/utils.py b/jimbrella/utils.py index ce57bf9..31fafbc 100644 --- a/jimbrella/utils.py +++ b/jimbrella/utils.py @@ -21,22 +21,12 @@ def human_timedelta(delta: timedelta) -> str: return days + f"{hours:0>2}:{minutes:0>2}" # zero-pad to two digits -class UTCPlus8(tzinfo): - def __init__(self): - super().__init__() - - def utcoffset(self, dt): - return timedelta(hours=8) - - def dst(self, dt): - return timedelta(0) - - def tzname(self, dt): - return "+8:00" - - -utc_plus_8 = UTCPlus8() - - -def local_now(): - return datetime.now(tz=utc_plus_8) +def group_by_key(data: list, key: str) -> dict: + """Groups a list of dicts by the value of their `key` into a dict of lists of dicts.""" + keys = set([item[key] for item in data]) + # initiate a dict with `keys` as keys and [] as values + groups = dict.fromkeys(keys, []) + for k in keys: + groups[k] = [item for item in data if item[key] == k] + + return groups |