import sqlite3 from datetime import datetime, timedelta from .utils import human_datetime, human_timedelta from .config import DUE_HOURS, ADMIN_LOG_PATH from .exceptions import * STATUSES = ["available", "lent", "overdue", "maintenance", "unknown"] class Umbrellas: def __init__(self, path): """A database of all umbrellas and their current state. Currently, the data are represented in a SQLite database. Only admins have access to the database, and have the power to modify it arbitrarily. An SQL row for an umbrella consists of these columns: - id | int. unique identifier for the umbrella. - status | string. possible values: | available : is in service on the stand | lent : is in temporary possession of a user | overdue : is in overly prolonged possession of a user | maintenance : is out of service | unknown : none of the above - tenant_name | string. the person in temporary possession of the umbrella. - tenant_id | string. student or faculty ID. - tenant_phone | string. phone number via which to contact tenant when the lease is due. - tenant_email | string. for future compatibility. always None for the time being. - lent_at | an ISO 8601 date string "YYYY-MM-DDThh:mm:ss.mmm+08:00" if status is | "lent" or "overdue. is None otherwise. Schema: CREATE TABLE Umbrellas( id INT PRIMARY KEY, status TEXT, tenant_name TEXT, tenant_id TEXT, tenant_phone TEXT, tenant_email TEXT, lent_at TEXT ); """ self.path = path def read(self) -> list: db = sqlite3.connect(self.path) db.row_factory = sqlite.Row umbrellas = db.execute("SELECT * FROM Umbrellas").fetchall() db.close() return umbrellas def update(self, umb) -> dict: """Update Umbrella table with new data given in `umb`. Not all fields in an umbrella dict need to be present in `umb`. Only `id` is required. If an optional field is not found, its value is left untouched. If an optional field is present but its value is an empty string or None, the old datum will become NULL. Invalid values are rejected as an UmbrellaValueError. If `status` is not "lent" or "overdue", `tenant_*` and `lent_at` are automatically erased. `lent_at` may be either an ISO 8601 string or a datetime.datetime object. Must be UTC+8. Returns a dict of : (, ) for each updated field unless its erasure can be inferred. For AdminLog. """ # `id` must be specified. try: umb["id"] = int(umb["id"]) except (KeyError, ValueError): raise UmbrellaValueError("id") db = sqlite3.connect(self.path) db.row_factory = sqlite.Row # check if umbrella # exists in database umbid = umb["id"] umb_in_db = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid).fetchone() if umb_in_db is None: raise UmbrellaNotFoundError(umbid) diff = {} status = umb_in_db["status"] if "status" in umb: if umb["status"] in STATUSES: status = umb["status"] if umb_in_db["status"] != umb["status"]: diff["status"] = (umb_in_db["status"], umb["status"]) db.execute("UPDATE Umbrellas SET status = ? WHERE id = ?", status, umbid) else: raise UmbrellaValueError("status") if status in ("lent", "overdue"): for key in ( "tenant_name", "tenant_id", "tenant_phone", "tenant_email", ): if col in umb: if umb_in_db[col] != umb[col]: diff[col] = (umb_in_db[col], umb[col]) db.execute( "UPDATE Umbrellas SET ? = ? WHERE id = ?", col, umb[col], umbid, ) if "lent_at" in umb: try: # lent_at could be a string, in which case it is parsed lent_at = datetime.fromisoformat(umb["lent_at"]) except TypeError: # or it could be a datetime.datetime lent_at = umb["lent_at"] except ValueError: # anything else is invalid raise UmbrellaValueError("lent_at") if lent_at.tzinfo.utcoffset(None).seconds != 28800: # timezone must be +08:00 raise UmbrellaValueError("lent_at") diff["lent_at"] = (umb_in_db["lent_at"], lent_at.isoformat(timespec="milliseconds")) db.execute( "UPDATE Umbrellas SET lent_at = ? WHERE id = ?", lent_at.isoformat(timespec="milliseconds"), umbid, ) else: # discard unneeded fields for col in ( "tenant_name", "tenant_id", "tenant_phone", "tenant_email", "lent_at", ): db.execute("UPDATE Umbrellas SET ? = NULL WHERE id = ?", col, umbid) # now that new data are validated, commit the SQL transaction db.commit() db.close() return diff def take_away( self, umbid, date, tenant_name, tenant_id, tenant_phone="", tenant_email="" ) -> None: """When a user has borrowed an umbrella.""" db = sqlite3.connect(self.path) db.row_factory = sqlite3.Row umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid) db.close() if umb is None: raise UmbrellaNotFoundError(umbid) if umb["status"] != "available": raise UmbrellaStatusError self.update( { "id": umbid, "status": "lent", "tenant_name": tenant_name, "tenant_id": tenant_id, "tenant_phone": tenant_phone, "tenant_email": tenant_email, "lent_at": date.isoformat(timespec="milliseconds"), } ) def give_back(self, umbid, tenant_name, tenant_id) -> None: """When a user has returned an umbrella. `tenant_name` and `tenant_id` are used to verify if the umbrella is returned by the same person who borrowed it. """ db = sqlite3.connect(self.path) db.row_factory = sqlite3.Row umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid) db.close() if umb is None: raise UmbrellaNotFoundError(umbid) if umb["status"] not in ("lent", "overdue"): raise UmbrellaStatusError if umb["tenant_name"] != tenant_name or umb["tenant_id"] != tenant_id: raise TenantIdentityError(umb["tenant_name"], tenant_name) self.update( { "id": umbid, "status": "available", } ) def mark_overdue(self, umbid) -> None: """When an umbrella is overdue, change its status to "overdue".""" db = sqlite3.connect(self.path) db.row_factory = sqlite3.Row umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid) if umb is None: raise UmbrellaNotFoundError(umbid) elif umb["status"] != "lent": raise UmbrellaStatusError self.update({"id": umbid, "status": "overdue"})