summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Yin <fkfd@fkfd.me>2022-02-01 12:43:25 +0800
committerFrederick Yin <fkfd@fkfd.me>2022-02-01 12:43:25 +0800
commit0e47cf9a8d06e896c5197cb28cb5a2a518d255d1 (patch)
tree22a0267dbfe204038aaac48d8596d436b5d56291
parent444966b2ff4a04374856d3a6759bef9e9f451c03 (diff)
SQLite in favor of CSV database
Deprecate csv database format, move around some methods
-rw-r--r--jimbrella/csv_table.py108
-rw-r--r--jimbrella/database.py252
-rw-r--r--jimbrella/lockfile.py33
-rw-r--r--jimbrella/umbrellas.py189
-rw-r--r--jimbrella/utils.py28
5 files changed, 198 insertions, 412 deletions
diff --git a/jimbrella/csv_table.py b/jimbrella/csv_table.py
deleted file mode 100644
index 8827e5f..0000000
--- a/jimbrella/csv_table.py
+++ /dev/null
@@ -1,108 +0,0 @@
-import csv
-import os
-import logging
-from .lockfile import Lockfile
-from .utils import identity
-
-
-class CsvTable:
- def __init__(self, path, schema: list):
- """A generic CSV table of data, and basic I/O operations.
-
- The first column should be unique. If so, it can be used to index rows, a feature of
- method `_find`.
-
- Arguments:
- - path | file path for table.
- - schema | list of dicts, each for a datum field (column).
- | each dict contains the following keys:
- | "name": what the column will be called in the dict that `_read` returns.
- | "serializer": function to be applied to the datum when being written.
- | "deserializer": same as above except it is when datum is being read.
- | the latter two are optional, and when they are not specified, the default is
- | the identity function, i.e. f such that f(x) = x for any x.
- """
- self.path = path
- for col in schema:
- for func in ("serializer", "deserializer"):
- if func not in col or col[func] is None:
- col[func] = identity
-
- # schema: column number -> name and (de)serialization functions
- # reverse_schema: name -> column number
- self.schema = schema
- self.reverse_schema = {schema[i]["name"]: i for i in range(len(schema))}
- self.lockfile = Lockfile(self.path)
- # Create file if it does not yet exist
- try:
- f = open(self.path, "x")
- f.close()
- except FileExistsError:
- pass
-
- def _read(self) -> list:
- """Deserialize table."""
- with open(self.path) as f:
- reader = csv.reader(f)
- rows = []
- # `rows` is a list of 2-tuples
- for ln, row in enumerate(reader):
- # for each tuple (k, v) in `rows`,
- # it will be unzipped into a dict key-value pair
- try:
- rows.append(
- dict(
- [
- (sch["name"], sch["deserializer"](datum))
- for sch, datum in zip(self.schema, row)
- ]
- )
- )
- except Exception:
- logging.warning("%s:%d cannot be read. Skip.", self.path, ln)
- continue
-
- f.close()
- return rows
-
- def _write(self, rows: list) -> None:
- """Serialize table. When a failure occurs, abort and recover data."""
- # make backup in memory
- with open(self.path) as f:
- backup = f.read()
- f.close()
-
- self.lockfile.lock()
-
- f = open(self.path, "w")
- try:
- writer = csv.writer(f)
- for row in rows:
- writer.writerow([row[col["name"]] for col in self.schema])
- except Exception as e:
- # failure occurred on write
- # abort write, and write back contents as they were before
- f.close()
- f = open(self.path, "w")
- f.write(backup)
- raise e
- finally:
- f.close()
- self.lockfile.unlock()
-
- def _append(self, row) -> list:
- """Append one row, and return the entire updated table."""
- rows = self._read()
- rows.append(row)
- self._write(rows)
- return rows
-
- def _update(self, update: dict) -> list:
- """Update one row, and return the entire updated table."""
- rows = self._read()
- index_column = self.schema[0]["name"]
- for idx, row in enumerate(rows):
- if row[index_column] == update[index_column]:
- rows[idx] = update
- self._write(rows)
- return rows
diff --git a/jimbrella/database.py b/jimbrella/database.py
deleted file mode 100644
index 9b6e51f..0000000
--- a/jimbrella/database.py
+++ /dev/null
@@ -1,252 +0,0 @@
-from datetime import datetime, timedelta
-from .csv_table import CsvTable
-from .utils import human_datetime, human_timedelta
-from .config import DUE_HOURS, ADMIN_LOG_PATH
-from .exceptions import *
-
-STATUSES = ["available", "lent", "overdue", "maintenance", "withheld", "unknown"]
-
-
-class Database(CsvTable):
- def __init__(self, path):
- """A database of all umbrellas and their current state.
-
- Currently, the data are represented in a csv file.
-
- Explanation of data types:
- - id | a unique numerical identity of unsigned integer type.
- - uint | whatever fits the regex /[0-9]+/. despite carrying a numerical value, it is
- | internally represented as a string.
- - string | any UTF-8 string.
- - date | an ISO 8601 date of format "YYYY-MM-DDThh:mm:ss+08:00".
-
- The status of an umbrella consists of:
- - serial | (id) unique identifier for the umbrella.
- - alias | (string) future compatibility. a human readable (preferably cute) name
- | for a particular umbrella
- - status | (string) one of ("available", "lent", "overdue", "maintenance",
- | "withheld", "unknown")
- | available : is in service on the stand
- | lent : is in temporary possession of a user
- | overdue : is in overly prolonged possession of a user
- | maintenance : is on the stand but not in service
- | withheld : is not on the stand but rather in the possession of an
- | administrator
- | unknown : none of the above
- | NOTE: the values above are subject to changes.
- - tenant_name | (string) the person in temporary possession of the umbrella.
- - tenant_id | (uint) student or faculty ID.
- - tenant_phone | (uint) phone number via which to contact tenant when the lease is due.
- - tenant_email | (string) future compatibility. empty for the time being.
- - lent_at | (date) if status is "lent", lent_at is the submission time of the user's
- | jForm answer sheet. is blank otherwise.
- """
- super().__init__(
- path,
- [
- {"name": "serial", "deserializer": int},
- {"name": "alias"},
- {"name": "status"},
- {"name": "tenant_name"},
- {"name": "tenant_id"},
- {"name": "tenant_phone"},
- {"name": "tenant_email"},
- {
- "name": "lent_at",
- "serializer": lambda d: d.isoformat(timespec="seconds")
- if d
- else "",
- "deserializer": lambda d: datetime.fromisoformat(d) if d else None,
- },
- ],
- )
-
- def _find_by_serial(self, serial: int) -> dict:
- """Given a serial number, returns an umbrella with such serial.
- If there is no such umbrella, returns None.
- """
- umbrellas = self._read()
- for umb in umbrellas:
- if umb["serial"] == serial:
- return umb
-
- def read(self) -> list:
- """An interface to `_read()` with supplemental data included.
-
- All exposed methods with a return value should use this method instead of `_read()`.
-
- Supplemental data:
- - lent_at_str: string representation for lent_at.
- - lent_time_ago: time since umbrella was taken away by tenant. if umbrella is not
- taken away, its value is None.
- - lent_time_ago_str: string representation for lent_time_ago.
- """
- umbrellas = self._read()
- now = datetime.now()
- for idx, umb in enumerate(umbrellas):
- if umb["status"] in ("lent", "overdue"):
- umbrellas[idx]["lent_at_str"] = human_datetime(umb["lent_at"])
- lent_time_ago = now - umb["lent_at"]
- umbrellas[idx]["lent_time_ago"] = lent_time_ago
- umbrellas[idx]["lent_time_ago_str"] = human_timedelta(lent_time_ago)
- return umbrellas
-
- def update(self, umb) -> dict:
- """An interface to `_update()` with added convenience and safeguards.
-
- Convenience: Not all fields in an umbrella dict need to be present in `umb`. If an
- optional field is not there, its value is left untouched. Every field other
- than `serial` is optional.
-
- `serial` may be an int or a str that evaluates to the correct serial when
- int() is applied to it.
-
- `tenant_id` and `tenant_phone` may be a str or an int.
-
- `lent_at` in `umb` may either be a datetime.datetime object, or an ISO 8601
- formatted string.
-
- Safeguards: Invalid values are rejected as an Exception, in the default case, an
- UmbrellaValueError.
-
- Both: When a value is set, values of unneeded fields are automatically discarded
- if applicable. For example, when `status` is set to "available", `tenant_*`
- and `lent_at` are no longer needed; in fact, their existence is dangerous
- as it implies the status of the umbrella is "lent" or "overdue", which is
- not the case. Therefore, if `status` is set to "available", the unnecessary
- fields mentioned above are made blank, and these fields supplied in `umb`
- are ignored.
-
- Returns a dict, where the keys are all modified columns of the updated row, and the value
- of each is a 2-tuple (<past value>, <new value>).
- """
- # `serial` must be specified.
- if "serial" not in umb:
- raise UmbrellaValueError("serial")
-
- # check if umbrella #<serial> exists in database
- umb["serial"] = int(umb["serial"])
- umb_in_db = self._find_by_serial(umb["serial"])
- if umb_in_db is None:
- raise UmbrellaNotFoundError(umb["serial"])
-
- status = umb_in_db["status"]
- if "status" in umb and umb["status"]:
- if status not in STATUSES:
- raise UmbrellaValueError("status") # invalid
-
- # admin specifies a (perhaps different) status
- status = umb["status"]
-
- if "lent_at" in umb and umb["lent_at"]:
- # check if `umb` has valid date (`lent_at`) if any
- if isinstance(umb["lent_at"], datetime):
- lent_at = umb["lent_at"]
- elif isinstance(umb["lent_at"], str):
- try:
- umb["lent_at"] = datetime.fromisoformat(umb["lent_at"])
- except ValueError:
- raise UmbrellaValueError("lent_at")
- else:
- raise UmbrellaValueError("lent_at")
-
- # we will now check the validity of `umb` based on `status`
- if status == "available":
- # discard unneeded fields
- for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]:
- umb[key] = ""
- umb["lent_at"] = None
- elif status in ("lent", "overdue"):
- # copy values in database into unfilled or non-existent fields of `umb`
- for key in [
- "tenant_name",
- "tenant_id",
- "tenant_phone",
- "tenant_email",
- "lent_at",
- ]:
- umb[key] = umb[key] if (key in umb and umb[key]) else umb_in_db[key]
-
- if not umb["lent_at"]:
- raise UmbrellaValueError("lent_at")
-
- # commit update
- self._update(umb)
-
- # find updated columns
- # essentially "diff umb_in_db umb"
- diff = {}
- for key, new in umb.items():
- past = umb_in_db[key]
- if any([new, past]) and new != past:
- # new and past are not both null values, and they are unequal
- diff[key] = (past, new)
-
- return diff
-
- @staticmethod
- def group_by_status(umbrellas) -> dict:
- """(static method) Returns umbrellas grouped into a dict by their status."""
- keys = set([umb["status"] for umb in umbrellas])
- # initiate statuses: each status is []
- statuses = dict.fromkeys(STATUSES, [])
- for key in keys:
- statuses[key] = [umb for umb in umbrellas if umb["status"] == key]
- return statuses
-
- @staticmethod
- def find_overdue(umbrellas) -> list:
- """(static method) Returns umbrellas in possession of their tenant for too long."""
- now = datetime.now()
- return [
- umb
- for umb in umbrellas
- if umb["lent_at"] is not None
- and now - umb["lent_at"] > timedelta(hours=DUE_HOURS)
- ]
-
- def take_away(
- self, serial, date, tenant_name, tenant_id, tenant_phone="", tenant_email=""
- ) -> None:
- """When a user has borrowed an umbrella."""
- umb = self._find_by_serial(serial)
- if umb is None:
- raise UmbrellaNotFoundError(serial)
- if umb["status"] != "available":
- raise UmbrellaStatusError
- umb["status"] = "lent"
- umb["tenant_name"] = tenant_name
- umb["tenant_id"] = tenant_id
- umb["tenant_phone"] = tenant_phone
- umb["tenant_email"] = tenant_email
- umb["lent_at"] = date
- self._update(umb)
-
- def give_back(self, serial, tenant_name, tenant_id) -> None:
- """When a user has returned an umbrella.
-
- `tenant_name` and `tenant_id` are used to verify if the umbrella is returned by the same
- person who borrowed it.
- """
- umb = self._find_by_serial(serial)
- if umb is None:
- raise UmbrellaNotFoundError(serial)
- if umb["status"] not in ("lent", "overdue"):
- raise UmbrellaStatusError
- if umb["tenant_name"] != tenant_name or umb["tenant_id"] != tenant_id:
- raise TenantIdentityError(umb["tenant_name"], tenant_name)
- umb["status"] = "available"
- for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]:
- umb[key] = ""
- umb["lent_at"] = None
- self._update(umb)
-
- def mark_overdue(self, serial) -> None:
- """When an umbrella is overdue, change its status to "overdue"."""
- umb = self._find_by_serial(serial)
- if umb is None:
- raise UmbrellaNotFoundError(serial)
- elif umb["status"] != "lent":
- raise UmbrellaStatusError
- umb["status"] = "overdue"
- self._update(umb)
diff --git a/jimbrella/lockfile.py b/jimbrella/lockfile.py
deleted file mode 100644
index 987f0e3..0000000
--- a/jimbrella/lockfile.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import os
-
-
-class Lockfile:
- """Prevent unwanted concurrency for file I/O.
-
- For a file named "<file>", create or remove a lockfile named "<file>.lock".
-
- When a process is modifying the file, call `lock(). When the modification
- is done, call `unlock()`.
- """
-
- def __init__(self, filepath):
- self.filepath = str(filepath)
- self.lockpath = self.filepath + ".lock"
-
- def lock(self):
- """Continue attempting to lock until locked."""
- locked = False
- while not locked:
- try:
- f = open(self.lockpath, "x")
- f.close()
- locked = True
- except FileExistsError:
- continue
-
- def unlock(self):
- """Remove lockfile."""
- try:
- os.remove(self.lockpath)
- except FileNotFoundError:
- return
diff --git a/jimbrella/umbrellas.py b/jimbrella/umbrellas.py
new file mode 100644
index 0000000..05b103c
--- /dev/null
+++ b/jimbrella/umbrellas.py
@@ -0,0 +1,189 @@
+import sqlite3
+from datetime import datetime, timedelta
+from .utils import human_datetime, human_timedelta
+from .config import DUE_HOURS, ADMIN_LOG_PATH
+from .exceptions import *
+
+STATUSES = ["available", "lent", "overdue", "maintenance", "unknown"]
+
+
+class Umbrellas:
+ def __init__(self, path):
+ """A database of all umbrellas and their current state.
+
+ Currently, the data are represented in a SQLite database. Only admins have access to the
+ database, and have the power to modify it arbitrarily.
+
+ An SQL row for an umbrella consists of these columns:
+ - id | int. unique identifier for the umbrella.
+ - status | string. possible values:
+ | available : is in service on the stand
+ | lent : is in temporary possession of a user
+ | overdue : is in overly prolonged possession of a user
+ | maintenance : is out of service
+ | unknown : none of the above
+ - tenant_name | string. the person in temporary possession of the umbrella.
+ - tenant_id | string. student or faculty ID.
+ - tenant_phone | string. phone number via which to contact tenant when the lease is due.
+ - tenant_email | string. for future compatibility. always None for the time being.
+ - lent_at | an ISO 8601 date string "YYYY-MM-DDThh:mm:ss+08:00" if status is
+ | "lent" or "overdue. is None otherwise.
+
+ Schema:
+ CREATE TABLE Umbrellas(
+ id INT PRIMARY KEY,
+ status TEXT,
+ tenant_name TEXT,
+ tenant_id TEXT,
+ tenant_phone TEXT,
+ tenant_email TEXT,
+ lent_at TEXT
+ );
+ """
+ self.path = path
+
+ def read(self) -> list:
+ db = sqlite3.connect(path)
+ db.row_factory = sqlite.Row
+ umbrellas = db.execute("SELECT * FROM Umbrellas").fetchall()
+ db.close()
+ return umbrellas
+
+ def update(self, umb) -> None:
+ """Update Umbrella table with new data given in `umb`.
+
+ Not all fields in an umbrella dict need to be present in `umb`. Only `id` is required.
+ If an optional field is not found, its value is left untouched. If an optional field is
+ present but its value is an empty string or None, the old datum will become NULL.
+
+ Invalid values are rejected as an UmbrellaValueError.
+
+ If `status` is not "lent" or "overdue", `tenant_*` and `lent_at` are automatically erased.
+ """
+ # `id` must be specified.
+ try:
+ umb["id"] = int(umb["id"])
+ except (KeyError, ValueError):
+ raise UmbrellaValueError("id")
+
+ db = sqlite3.connect(path)
+ db.row_factory = sqlite.Row
+
+ # check if umbrella #<id> exists in database
+ umbid = umb["id"]
+ umb_in_db = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid).fetchone()
+ if umb_in_db is None:
+ raise UmbrellaNotFoundError(umbid)
+
+ status = umb_in_db["status"]
+ if "status" in umb and umb["status"] in STATUSES:
+ status = umb["status"]
+ db.execute("UPDATE Umbrellas SET status = ? WHERE id = ?", status, umbid)
+ else:
+ raise UmbrellaValueError("status")
+
+ if status in ("lent", "overdue"):
+ try:
+ # timezone must be +08:00
+ if (
+ datetime.fromisoformat(umb["lent_at"])
+ .tzinfo.utcoffset(None)
+ .seconds
+ != 28800
+ ):
+ raise UmbrellaValueError("lent_at")
+ except ValueError:
+ raise UmbrellaValueError("lent_at")
+
+ for key in (
+ "tenant_name",
+ "tenant_id",
+ "tenant_phone",
+ "tenant_email",
+ "lent_at",
+ ):
+ if col in umb:
+ db.execute(
+ "UPDATE Umbrellas SET ? = ? WHERE id = ?",
+ col,
+ umb[col] or None,
+ umbid,
+ )
+ else:
+ # discard unneeded fields
+ for col in (
+ "tenant_name",
+ "tenant_id",
+ "tenant_phone",
+ "tenant_email",
+ "lent_at",
+ ):
+ db.execute("UPDATE Umbrellas SET ? = NULL WHERE id = ?", col, umbid)
+
+ # now that new data are validated, commit the SQL transaction
+ db.commit()
+ db.close()
+
+ def take_away(
+ self, umbid, date, tenant_name, tenant_id, tenant_phone="", tenant_email=""
+ ) -> None:
+ """When a user has borrowed an umbrella."""
+ db = sqlite3.connect(path)
+ db.row_factory = sqlite3.Row
+ umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid)
+ db.close()
+
+ if umb is None:
+ raise UmbrellaNotFoundError(umbid)
+ if umb["status"] != "available":
+ raise UmbrellaStatusError
+
+ self.update(
+ {
+ "id": umbid,
+ "status": "lent",
+ "tenant_name": tenant_name,
+ "tenant_id": tenant_id,
+ "tenant_phone": tenant_phone,
+ "tenant_email": tenant_email,
+ "lent_at": date.isoformat(timespec="milliseconds"),
+ }
+ )
+
+ def give_back(self, umbid, tenant_name, tenant_id) -> None:
+ """When a user has returned an umbrella.
+
+ `tenant_name` and `tenant_id` are used to verify if the umbrella is returned by the same
+ person who borrowed it.
+ """
+ db = sqlite3.connect(path)
+ db.row_factory = sqlite3.Row
+ umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid)
+ db.close()
+
+ if umb is None:
+ raise UmbrellaNotFoundError(umbid)
+ if umb["status"] not in ("lent", "overdue"):
+ raise UmbrellaStatusError
+ if umb["tenant_name"] != tenant_name or umb["tenant_id"] != tenant_id:
+ raise TenantIdentityError(umb["tenant_name"], tenant_name)
+
+ self.update(
+ {
+ "id": umbid,
+ "status": "available",
+ }
+ )
+
+ def mark_overdue(self, umbid) -> None:
+ """When an umbrella is overdue, change its status to "overdue"."""
+ db = sqlite3.connect(path)
+ db.row_factory = sqlite3.Row
+ umb = db.execute("SELECT * FROM Umbrellas WHERE id = ?", umbid)
+
+ if umb is None:
+ raise UmbrellaNotFoundError(umbid)
+ elif umb["status"] != "lent":
+ raise UmbrellaStatusError
+
+ self.update({"id": umbid, "status": "overdue"})
diff --git a/jimbrella/utils.py b/jimbrella/utils.py
index ce57bf9..31fafbc 100644
--- a/jimbrella/utils.py
+++ b/jimbrella/utils.py
@@ -21,22 +21,12 @@ def human_timedelta(delta: timedelta) -> str:
return days + f"{hours:0>2}:{minutes:0>2}" # zero-pad to two digits
-class UTCPlus8(tzinfo):
- def __init__(self):
- super().__init__()
-
- def utcoffset(self, dt):
- return timedelta(hours=8)
-
- def dst(self, dt):
- return timedelta(0)
-
- def tzname(self, dt):
- return "+8:00"
-
-
-utc_plus_8 = UTCPlus8()
-
-
-def local_now():
- return datetime.now(tz=utc_plus_8)
+def group_by_key(data: list, key: str) -> dict:
+ """Groups a list of dicts by the value of their `key` into a dict of lists of dicts."""
+ keys = set([item[key] for item in data])
+ # initiate a dict with `keys` as keys and [] as values
+ groups = dict.fromkeys(keys, [])
+ for k in keys:
+ groups[k] = [item for item in data if item[key] == k]
+
+ return groups