1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
|
import csv
import os
from datetime import datetime, timedelta
from .utils import human_datetime, human_timedelta
from .config import DUE_HOURS
from .exceptions import *
STATUSES = ["available", "lent", "overdue", "maintenance", "withheld", "unknown"]
class Database:
def __init__(self, path):
"""A database of all umbrellas and their current state.
Currently, the data are represented in a csv file.
Explanation of data types:
- id | a unique numerical identity of unsigned integer type.
- uint | whatever fits the regex /[0-9]+/. despite carrying a numerical value, it is
| internally represented as a string.
- string | any UTF-8 string.
- date | an ISO 8601 date of format "YYYY-MM-DDThh:mm:ss+08:00".
The status of an umbrella consists of:
- serial | (id) unique identifier for the umbrella.
- alias | (string) future compatibility. a human readable (preferably cute) name
| for a particular umbrella
- status | (string) one of ("available", "lent", "overdue", "maintenance",
| "withheld", "unknown")
| available : is in service on the stand
| lent : is in temporary possession of a user
| overdue : is in overly prolonged possession of a user
| maintenance : is on the stand but not in service
| withheld : is not on the stand but rather in the possession of an
| administrator
| unknown : none of the above
| NOTE: the values above are subject to changes.
- tenant_name | (string) the person in temporary possession of the umbrella.
- tenant_id | (uint) student or faculty ID.
- tenant_phone | (uint) phone number via which to contact tenant when the lease is due.
- tenant_email | (string) future compatibility. empty for the time being.
- lent_at | (date) if status is "lent", lent_at is the submission time of the user's
| jForm answer sheet. is blank otherwise.
"""
self.path = path
def _lock(self) -> int:
"""Create a lockfile for database.
If the database is called "db.csv", the lockfile, located in the same
directory, would be called "db.csv.lock".
Returns 1 if the lockfile is successfully created, 0 if the lockfile
already exists (strong hint of unrecovered crash or potential race
condition - caller of this method should abort whatever it is doing)
"""
try:
f = open(self.path + ".lock", "x")
f.close()
return 1
except FileExistsError:
return 0
def _unlock(self) -> None:
"""Remove lockfile created by _lock.
If there is no lockfile, simply ignore.
"""
try:
os.remove(self.path + ".lock")
except FileNotFoundError:
pass
def _read(self) -> list:
"""Deserialize database."""
# Create database if it does not yet exist
try:
f = open(self.path, "x")
f.close()
except FileExistsError:
pass
with open(self.path) as f:
reader = csv.reader(f)
umbrellas = [
{
"serial": int(row[0]),
"alias": row[1],
"status": row[2],
"tenant_name": row[3],
"tenant_id": row[4],
"tenant_phone": row[5],
"tenant_email": row[6],
"lent_at": datetime.fromisoformat(row[7]) if row[7] else None,
}
for row in reader
]
f.close()
return umbrellas
def _write(self, umbrellas: list) -> None:
"""Serialize database. When a failure occurs, abort and recover data."""
# make backup in memory
with open(self.path) as f:
backup = f.read()
f.close()
# wait until database is locked for this write
while not self._lock():
continue
f = open(self.path, "w")
try:
writer = csv.writer(f)
writer.writerows(
[
[
umb["serial"],
umb["alias"],
umb["status"],
umb["tenant_name"],
umb["tenant_id"],
umb["tenant_phone"],
umb["tenant_email"],
umb["lent_at"].isoformat(timespec="seconds")
if umb["lent_at"]
else "",
]
for umb in umbrellas
]
)
except Exception as e:
# failure occurred on write
# abort write, and write back contents as they were before
# TODO: pass on exception and keep log
f.close()
f = open(self.path, "w")
f.write(backup)
f.close()
raise e
self._unlock()
def _update(self, update: dict) -> list:
"""Update status of one umbrella, and return the entire updated database."""
umbrellas = self._read()
for idx, umb in enumerate(umbrellas):
if umb["serial"] == update["serial"]:
umbrellas[idx] = update
self._write(umbrellas)
return umbrellas
def _find_by_serial(self, serial: int) -> dict:
"""Given a serial number, returns an umbrella with such serial.
If there is no such umbrella, returns None.
"""
umbrellas = self._read()
for umb in umbrellas:
if umb["serial"] == serial:
return umb
def read(self) -> list:
"""An interface to `_read()` with supplemental data included.
All exposed methods with a return value should use this method instead of `_read()`.
Supplemental data:
- lent_at_str: string representation for lent_at.
- lent_time_ago: time since umbrella was taken away by tenant. if umbrella is not
taken away, its value is None.
- lent_time_ago_str: string representation for lent_time_ago.
"""
umbrellas = self._read()
now = datetime.now()
for idx, umb in enumerate(umbrellas):
if umb["status"] in ("lent", "overdue"):
umbrellas[idx]["lent_at_str"] = human_datetime(umb["lent_at"])
lent_time_ago = now - umb["lent_at"]
umbrellas[idx]["lent_time_ago"] = lent_time_ago
umbrellas[idx]["lent_time_ago_str"] = human_timedelta(lent_time_ago)
return umbrellas
def update(self, umb) -> list:
"""An interface to `_update()` with added convenience and safeguards.
Convenience: Not all fields in an umbrella dict need to be present in `umb`. If an
optional field is not there, its value is left untouched. Every field other
than `serial` is optional.
`serial` may be an int or a str that evaluates to the correct serial when
int() is applied to it.
`tenant_id` and `tenant_phone` may be a str or an int.
`lent_at` in `umb` may either be a datetime.datetime object, or an ISO 8601
formatted string.
Safeguards: Invalid values are rejected as an Exception, in the default case, an
UmbrellaValueError.
Both: When a value is set, values of unneeded fields are automatically discarded
if applicable. For example, when `status` is set to "available", `tenant_*`
and `lent_at` are no longer needed; in fact, their existence is dangerous
as it implies the status of the umbrella is "lent" or "overdue", which is
not the case. Therefore, if `status` is set to "available", the unnecessary
fields mentioned above are made blank, and these fields supplied in `umb`
are ignored.
Returns updated database (same as `_update()`).
"""
# `serial` must be specified.
if "serial" not in umb:
raise UmbrellaValueError
# check if umbrella #<serial> exists in database
umb["serial"] = int(umb["serial"])
umb_in_db = self._find_by_serial(umb["serial"])
if umb_in_db is None:
raise UmbrellaNotFoundError
status = umb_in_db["status"]
if "status" in umb:
if status not in STATUSES:
raise UmbrellaValueError # invalid
# admin specifies a (perhaps different) status
status = umb["status"]
if "lent_at" in umb:
# check if `umb` has valid date (`lent_at`)
if isinstance(umb["lent_at"], datetime):
lent_at = umb["lent_at"]
elif isinstance(umb["lent_at"], str):
try:
umb["lent_at"] = datetime.fromisoformat(umb["lent_at"])
except ValueError:
raise UmbrellaValueError
else:
raise UmbrellaValueError
# we will now check the validity of `umb` based on `status`
if status == "available":
# discard unneeded fields
for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]:
umb[key] = ""
umb["lent_at"] = None
elif status in ("lent", "overdue"):
# copy values in database into unfilled or non-existent fields of `umb`
for key in [
"tenant_name",
"tenant_id",
"tenant_phone",
"tenant_email",
"lent_at",
]:
umb[key] = umb[key] if (key in umb and umb[key]) else umb_in_db[key]
return self._update(umb)
@staticmethod
def group_by_status(umbrellas) -> dict:
"""(static method) Returns umbrellas grouped into a dict by their status."""
keys = set([umb["status"] for umb in umbrellas])
# initiate statuses: each status is []
statuses = dict.fromkeys(STATUSES, [])
for key in keys:
statuses[key] = [umb for umb in umbrellas if umb["status"] == key]
return statuses
@staticmethod
def find_overdue(umbrellas) -> list:
"""(static method) Returns umbrellas in possession of their tenant for too long."""
now = datetime.now()
return [
umb
for umb in umbrellas
if umb["lent_at"] is not None
and now - umb["lent_at"] > timedelta(hours=DUE_HOURS)
]
def take_away(
self, serial, date, tenant_name, tenant_id, tenant_phone="", tenant_email=""
) -> None:
"""When a user has borrowed an umbrella."""
umb = self._find_by_serial(serial)
if umb is None:
raise UmbrellaNotFoundError
elif umb["status"] != "available":
raise UmbrellaStatusError
umb["status"] = "lent"
umb["tenant_name"] = tenant_name
umb["tenant_id"] = tenant_id
umb["tenant_phone"] = tenant_phone
umb["tenant_email"] = tenant_email
umb["lent_at"] = date
self._update(umb)
def give_back(self, serial) -> None:
"""When a user has returned an umbrella."""
umb = self._find_by_serial(serial)
if umb is None:
raise UmbrellaNotFoundError
elif umb["status"] not in ("lent", "overdue"):
raise UmbrellaStatusError
umb["status"] = "available"
for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]:
umb[key] = ""
umb["lent_at"] = None
self._update(umb)
def mark_overdue(self, serial) -> None:
"""When an umbrella is overdue, change its status to "overdue"."""
umb = self._find_by_serial(serial)
if umb is None:
raise UmbrellaNotFoundError
elif umb["status"] != "lent":
raise UmbrellaStatusError
umb["status"] = "overdue"
self._update(umb)
|