import csv import os from datetime import datetime class Database: def __init__(self, path): """A database of all umbrellas and their current state. Currently, the data are represented in a csv file. Explanation of data types: - id | a unique numerical identity of unsigned integer type. - uint | whatever fits the regex /[0-9]+/. despite carrying a numerical value, it is | internally represented as a string. - string | any UTF-8 string. - date | an ISO 8601 date of format "YYYY-MM-DDThh:mm:ss". default timezone is UTC+8. The status of an umbrella consists of: - serial | (id) unique identifier for the umbrella. - alias | (string) future compatibility. a human readable (preferably cute) name | for a particular umbrella - status | (string) one of ("available", "lent", "withheld", "maintenance", | "withheld", "unknown") | available : is in service on the stand | lent : is in temporary possession of a user | maintenance : is on the stand but not in service | withheld : is not on the stand but rather in the possession of an | administrator | unknown : none of the above | NOTE: the values above are subject to changes. - tenant_name | (string) the person in temporary possession of the umbrella. - tenant_id | (uint) student or faculty ID. - tenant_phone | (uint) phone number via which to contact tenant when the lease is due. - tenant_email | (string) future compatibility. empty for the time being. - lent_at | (date) if status is "lent", lent_at is the submission time of the user's | jForm answer sheet. is blank otherwise. """ self.path = path def _lock(self) -> int: """Create a lockfile for database. If the database is called "db.csv", the lockfile, located in the same directory, would be called "db.csv.lock". Returns 1 if the lockfile is successfully created, 0 if the lockfile already exists (strong hint of unrecovered crash or potential race condition - caller of this method should abort whatever it is doing) """ try: f = open(self.path + ".lock", "x") f.close() return 1 except FileExistsError: return 0 def _unlock(self) -> None: """Remove lockfile created by _lock. If there is no lockfile, simply ignore. """ try: os.remove(self.path + ".lock") except FileNotFoundError: pass def _read(self) -> list: """Deserialize database.""" # Create database if it does not yet exist try: f = open(self.path, "x") f.close() except FileExistsError: pass with open(self.path) as f: reader = csv.reader(f) umbrellas = [ { "serial": int(row[0]), "alias": row[1], "status": row[2], "tenant_name": row[3], "tenant_id": row[4], "tenant_phone": row[5], "tenant_email": row[6], "lent_at": datetime.fromisoformat(row[7]) if row[7] else None, } for row in reader ] f.close() return umbrellas def _write(self, umbrellas: list) -> None: """Serialize database. When a failure occurs, abort and recover data.""" # make backup in memory with open(self.path) as f: backup = f.read() f.close() # wait until database is locked for this write while not self._lock(): continue f = open(self.path, "w") try: writer = csv.writer(f) writer.writerows( [ [ umb["serial"], umb["alias"], umb["status"], umb["tenant_name"], umb["tenant_id"], umb["tenant_phone"], umb["tenant_email"], umb["lent_at"].isoformat(timespec="seconds") if umb["lent_at"] else "", ] for umb in umbrellas ] ) except: # failure occurred on write # abort write, and write back contents as they were before # TODO: pass on exception and keep log f.close() f = open(self.path, "w") f.write(backup) f.close() self._unlock() def _update(self, update: dict) -> list: """Update status of one umbrella, and return the entire updated database.""" umbrellas = self._read() for idx, umb in enumerate(umbrellas): if umb["serial"] == update["serial"]: umbrellas[idx] = update self._write(umbrellas) return umbrellas def _find_by_serial(self, serial: int) -> dict: """Given a serial number, returns an umbrella with such serial. If there is no such umbrella, returns None. """ umbrellas = self._read() for umb in umbrellas: if umb["serial"] == serial: return umb def read(self) -> list: """Currently an alias for `_read()`.""" return self._read() def take_away( self, serial, tenant_name, tenant_id, tenant_phone="", tenant_email="" ) -> None: """When a user has borrowed an umbrella.""" umb = self._find_by_serial(serial) if umb is None: raise ValueError(f"No umbrella with serial {serial} found.") elif umb["status"] != "available": raise ValueError(f"Umbrella with serial {serial} is inavailable.") umb["status"] = "lent" umb["tenant_name"] = tenant_name umb["tenant_id"] = tenant_id umb["tenant_phone"] = tenant_phone umb["tenant_email"] = tenant_email umb["lent_at"] = datetime.now() self._update(umb) def give_back(self, serial) -> None: """When a user has returned an umbrella.""" umb = self._find_by_serial(serial) if umb is None: raise ValueError(f"No umbrella with serial {serial} found.") elif umb["status"] != "lent": raise ValueError(f"Umbrella with serial {serial} is not lent out.") umb["status"] = "available" for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]: umb[key] = "" umb["lent_at"] = None self._update(umb)