import csv import os from datetime import datetime, timedelta from .utils import human_datetime, human_timedelta, local_now from .config import DUE_HOURS from .exceptions import * class Database: def __init__(self, path): """A database of all umbrellas and their current state. Currently, the data are represented in a csv file. Explanation of data types: - id | a unique numerical identity of unsigned integer type. - uint | whatever fits the regex /[0-9]+/. despite carrying a numerical value, it is | internally represented as a string. - string | any UTF-8 string. - date | an ISO 8601 date of format "YYYY-MM-DDThh:mm:ss". default timezone is UTC+8. The status of an umbrella consists of: - serial | (id) unique identifier for the umbrella. - alias | (string) future compatibility. a human readable (preferably cute) name | for a particular umbrella - status | (string) one of ("available", "lent", "withheld", "maintenance", | "withheld", "unknown") | available : is in service on the stand | lent : is in temporary possession of a user | overdue : is in overly prolonged possession of a user | maintenance : is on the stand but not in service | withheld : is not on the stand but rather in the possession of an | administrator | unknown : none of the above | NOTE: the values above are subject to changes. - tenant_name | (string) the person in temporary possession of the umbrella. - tenant_id | (uint) student or faculty ID. - tenant_phone | (uint) phone number via which to contact tenant when the lease is due. - tenant_email | (string) future compatibility. empty for the time being. - lent_at | (date) if status is "lent", lent_at is the submission time of the user's | jForm answer sheet. is blank otherwise. """ self.path = path def _lock(self) -> int: """Create a lockfile for database. If the database is called "db.csv", the lockfile, located in the same directory, would be called "db.csv.lock". Returns 1 if the lockfile is successfully created, 0 if the lockfile already exists (strong hint of unrecovered crash or potential race condition - caller of this method should abort whatever it is doing) """ try: f = open(self.path + ".lock", "x") f.close() return 1 except FileExistsError: return 0 def _unlock(self) -> None: """Remove lockfile created by _lock. If there is no lockfile, simply ignore. """ try: os.remove(self.path + ".lock") except FileNotFoundError: pass def _read(self) -> list: """Deserialize database.""" # Create database if it does not yet exist try: f = open(self.path, "x") f.close() except FileExistsError: pass with open(self.path) as f: reader = csv.reader(f) umbrellas = [ { "serial": int(row[0]), "alias": row[1], "status": row[2], "tenant_name": row[3], "tenant_id": row[4], "tenant_phone": row[5], "tenant_email": row[6], "lent_at": datetime.fromisoformat(row[7]) if row[7] else None, } for row in reader ] f.close() return umbrellas def _write(self, umbrellas: list) -> None: """Serialize database. When a failure occurs, abort and recover data.""" # make backup in memory with open(self.path) as f: backup = f.read() f.close() # wait until database is locked for this write while not self._lock(): continue f = open(self.path, "w") try: writer = csv.writer(f) writer.writerows( [ [ umb["serial"], umb["alias"], umb["status"], umb["tenant_name"], umb["tenant_id"], umb["tenant_phone"], umb["tenant_email"], umb["lent_at"].isoformat(timespec="seconds") if umb["lent_at"] else "", ] for umb in umbrellas ] ) except: # failure occurred on write # abort write, and write back contents as they were before # TODO: pass on exception and keep log f.close() f = open(self.path, "w") f.write(backup) f.close() self._unlock() def _update(self, update: dict) -> list: """Update status of one umbrella, and return the entire updated database.""" umbrellas = self._read() for idx, umb in enumerate(umbrellas): if umb["serial"] == update["serial"]: umbrellas[idx] = update self._write(umbrellas) return umbrellas def _find_by_serial(self, serial: int) -> dict: """Given a serial number, returns an umbrella with such serial. If there is no such umbrella, returns None. """ umbrellas = self._read() for umb in umbrellas: if umb["serial"] == serial: return umb def read(self) -> list: """An interface to `_read()` with supplemental data included. All exposed methods with a return value should use this method instead of `_read()`. Supplemental data: - lent_at_str: string representation for lent_at. - lent_time_ago: time since umbrella was taken away by tenant. if umbrella is not taken away, its value is None. - lent_time_ago_str: string representation for lent_time_ago. """ umbrellas = self._read() now = local_now() for idx, umb in enumerate(umbrellas): if umb["status"] in ("lent", "overdue"): umbrellas[idx]["lent_at_str"] = human_datetime(umb["lent_at"]) lent_time_ago = now - umb["lent_at"] umbrellas[idx]["lent_time_ago"] = lent_time_ago umbrellas[idx]["lent_time_ago_str"] = human_timedelta(lent_time_ago) return umbrellas def group_by_status(umbrellas) -> dict: """(static method) Returns umbrellas grouped into a dict by their status.""" keys = set([umb["status"] for umb in umbrellas]) statuses = {} for key in keys: statuses[key] = [umb for umb in umbrellas if umb["status"] == key] return statuses def find_overdue(umbrellas) -> list: """(static method) Returns umbrellas in possession of their tenant for too long.""" now = local_now() return [ umb for umb in umbrellas if umb["lent_at"] is not None and now - umb["lent_at"] > timedelta(hours=DUE_HOURS) ] def take_away( self, serial, date, tenant_name, tenant_id, tenant_phone="", tenant_email="" ) -> None: """When a user has borrowed an umbrella.""" umb = self._find_by_serial(serial) if umb is None: raise UmbrellaNotFoundError elif umb["status"] != "available": raise UmbrellaStatusError umb["status"] = "lent" umb["tenant_name"] = tenant_name umb["tenant_id"] = tenant_id umb["tenant_phone"] = tenant_phone umb["tenant_email"] = tenant_email umb["lent_at"] = date self._update(umb) def give_back(self, serial) -> None: """When a user has returned an umbrella.""" umb = self._find_by_serial(serial) if umb is None: raise UmbrellaNotFoundError elif umb["status"] not in ("lent", "overdue"): raise UmbrellaStatusError umb["status"] = "available" for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]: umb[key] = "" umb["lent_at"] = None self._update(umb) def mark_overdue(self, serial) -> None: """When an umbrella is overdue, change its status to "overdue".""" umb = self._find_by_serial(serial) if umb is None: raise UmbrellaNotFoundError elif umb["status"] != "lent": raise UmbrellaStatusError umb["status"] = "overdue" self._update(umb)