import sqlite3 from datetime import datetime, timezone, timedelta from dateutil.parser import isoparse from typing import Union from .admin_log import AdminLog from .utils import human_datetime, human_timedelta, CST from .exceptions import * STATUSES = ["available", "lent", "overdue", "maintenance", "unknown"] class Umbrellas: def __init__(self, path): """A database of all umbrellas and their current state. Currently, the data are represented in a SQLite database. Only admins have access to the database, and have the power to modify it arbitrarily. Each time a modification is made, AdminLog keeps a log. An SQL row for an umbrella consists of these columns: - id | int. unique identifier for the umbrella. - status | string. possible values: | available : is in service on the stand | lent : is in temporary possession of a user | overdue : is in overly prolonged possession of a user | maintenance : is out of service | unknown : none of the above - tenant_name | string. the person in temporary possession of the umbrella. - tenant_id | string. student or faculty ID. - tenant_phone | string. phone number via which to contact tenant when the lease is due. - tenant_email | string. for future compatibility. always None for the time being. - lent_at | an ISO 8601 datetime string if status is "lent" or "overdue", empty string | otherwise. Schema: CREATE TABLE Umbrellas( id INT PRIMARY KEY, status TEXT, tenant_name TEXT, tenant_id TEXT, tenant_phone TEXT, tenant_email TEXT, lent_at TEXT ); """ self.path = path self.admin_log = AdminLog(path) def read(self, umbid=None) -> Union[dict, list]: """Read umbrella data from database. If umbid is an integer, returns dict pertaining to umbrella #, or None if not found. If umbid is None, returns list of dicts for all umbrellas. """ db = sqlite3.connect(self.path) db.row_factory = sqlite3.Row if umbid is None: data = db.execute("SELECT * FROM Umbrellas").fetchall() else: data = db.execute( "SELECT * FROM Umbrellas WHERE id = ?", (umbid,) ).fetchone() db.close() return data def update(self, umb) -> None: """Update Umbrella table with new data given in `umb`. Not all fields in an umbrella dict need to be present in `umb`. Only `id` is required. If an optional field is not found, its value is left untouched. If an optional field is present but its value is an empty string, it will be erased. Invalid values are rejected as an UmbrellaValueError. If `status` is not "lent" or "overdue", `tenant_*` and `lent_at` are automatically erased. `lent_at` may be either an ISO 8601 string or a datetime.datetime object. If no timezone is supplied, UTC+8 is assumed. Returns a dict of : (, ) for each updated field unless its erasure can be inferred. For AdminLog. """ # `id` must be specified. try: umb["id"] = int(umb["id"]) except (KeyError, ValueError): raise UmbrellaValueError("id") db = sqlite3.connect(self.path) db.row_factory = sqlite3.Row # check if umbrella # exists in database umbid = umb["id"] umb_in_db = db.execute( "SELECT * FROM Umbrellas WHERE id = ?", (umbid,) ).fetchone() if umb_in_db is None: raise UmbrellaNotFoundError(umbid) status = umb_in_db["status"] if "status" in umb: if umb["status"] in STATUSES: status = umb["status"] db.execute( "UPDATE Umbrellas SET status = ? WHERE id = ?", (status, umbid) ) else: raise UmbrellaValueError("status") if status in ("lent", "overdue"): for col in ( "tenant_name", "tenant_id", "tenant_phone", "tenant_email", ): if col in umb: db.execute( f"UPDATE Umbrellas SET {col} = ? WHERE id = ?", ( umb[col], umbid, ), ) if "lent_at" in umb: try: # lent_at could be a string, in which case it is parsed lent_at = isoparse(umb["lent_at"]) except TypeError: # or it could be a datetime.datetime lent_at = umb["lent_at"] except ValueError: # anything else is invalid raise UmbrellaValueError("lent_at") if lent_at.tzinfo is None: lent_at = lent_at.replace(tzinfo=CST) db.execute( "UPDATE Umbrellas SET lent_at = ? WHERE id = ?", ( lent_at.isoformat(timespec="milliseconds"), umbid, ), ) else: # discard unneeded fields for col in ( "tenant_name", "tenant_id", "tenant_phone", "tenant_email", "lent_at", ): db.execute(f"UPDATE Umbrellas SET {col} = '' WHERE id = ?", (umbid,)) # now that new data are validated, commit the SQL transaction db.commit() db.close() def admin_modify(self, actor: str, umb: dict, note="") -> None: """Update db with `umb`, and also keep a log with AdminLog. Similar to AdminLog.log, this method does no authentication either. """ try: umbid = umb["id"] except KeyError: raise UmbrellaValueError("id") umb_old = dict(self.read(umbid)) if umb_old is None: raise UmbrellaNotFoundError(umbid) self.update(umb) umb_new = dict(self.read(umbid)) if umb_old != umb_new: self.admin_log.log( datetime.now(tz=CST).isoformat(timespec="milliseconds"), actor, umbid, umb_old, umb_new, note, ) def take_away( self, umbid, date, tenant_name, tenant_id, tenant_phone="", tenant_email="" ) -> None: """When a user has borrowed an umbrella.""" umb = self.read(umbid) if umb is None: raise UmbrellaNotFoundError(umbid) if umb["status"] != "available": raise UmbrellaStatusError self.update( { "id": umbid, "status": "lent", "tenant_name": tenant_name, "tenant_id": tenant_id, "tenant_phone": tenant_phone, "tenant_email": tenant_email, "lent_at": date.isoformat(timespec="milliseconds"), } ) def give_back(self, umbid, tenant_name, tenant_id) -> None: """When a user has returned an umbrella. `tenant_name` and `tenant_id` are used to verify if the umbrella is returned by the same person who borrowed it. """ umb = self.read(umbid) if umb is None: raise UmbrellaNotFoundError(umbid) if umb["status"] not in ("lent", "overdue"): raise UmbrellaStatusError if umb["tenant_name"] != tenant_name or umb["tenant_id"] != tenant_id: raise TenantIdentityError(umb["tenant_name"], tenant_name) self.update( { "id": umbid, "status": "available", } ) def mark_overdue(self, umbid) -> None: """When an umbrella is overdue, change its status to "overdue".""" umb = self.read(umbid) if umb is None: raise UmbrellaNotFoundError(umbid) elif umb["status"] != "lent": raise UmbrellaStatusError self.update({"id": umbid, "status": "overdue"})