1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
import csv
import os
from datetime import datetime, timedelta
from .utils import human_datetime, human_timedelta, local_now
from .config import DUE_HOURS
from .exceptions import *
class Database:
def __init__(self, path):
"""A database of all umbrellas and their current state.
Currently, the data are represented in a csv file.
Explanation of data types:
- id | a unique numerical identity of unsigned integer type.
- uint | whatever fits the regex /[0-9]+/. despite carrying a numerical value, it is
| internally represented as a string.
- string | any UTF-8 string.
- date | an ISO 8601 date of format "YYYY-MM-DDThh:mm:ss". default timezone is UTC+8.
The status of an umbrella consists of:
- serial | (id) unique identifier for the umbrella.
- alias | (string) future compatibility. a human readable (preferably cute) name
| for a particular umbrella
- status | (string) one of ("available", "lent", "withheld", "maintenance",
| "withheld", "unknown")
| available : is in service on the stand
| lent : is in temporary possession of a user
| overdue : is in overly prolonged possession of a user
| maintenance : is on the stand but not in service
| withheld : is not on the stand but rather in the possession of an
| administrator
| unknown : none of the above
| NOTE: the values above are subject to changes.
- tenant_name | (string) the person in temporary possession of the umbrella.
- tenant_id | (uint) student or faculty ID.
- tenant_phone | (uint) phone number via which to contact tenant when the lease is due.
- tenant_email | (string) future compatibility. empty for the time being.
- lent_at | (date) if status is "lent", lent_at is the submission time of the user's
| jForm answer sheet. is blank otherwise.
"""
self.path = path
def _lock(self) -> int:
"""Create a lockfile for database.
If the database is called "db.csv", the lockfile, located in the same
directory, would be called "db.csv.lock".
Returns 1 if the lockfile is successfully created, 0 if the lockfile
already exists (strong hint of unrecovered crash or potential race
condition - caller of this method should abort whatever it is doing)
"""
try:
f = open(self.path + ".lock", "x")
f.close()
return 1
except FileExistsError:
return 0
def _unlock(self) -> None:
"""Remove lockfile created by _lock.
If there is no lockfile, simply ignore.
"""
try:
os.remove(self.path + ".lock")
except FileNotFoundError:
pass
def _read(self) -> list:
"""Deserialize database."""
# Create database if it does not yet exist
try:
f = open(self.path, "x")
f.close()
except FileExistsError:
pass
with open(self.path) as f:
reader = csv.reader(f)
umbrellas = [
{
"serial": int(row[0]),
"alias": row[1],
"status": row[2],
"tenant_name": row[3],
"tenant_id": row[4],
"tenant_phone": row[5],
"tenant_email": row[6],
"lent_at": datetime.fromisoformat(row[7]) if row[7] else None,
}
for row in reader
]
f.close()
return umbrellas
def _write(self, umbrellas: list) -> None:
"""Serialize database. When a failure occurs, abort and recover data."""
# make backup in memory
with open(self.path) as f:
backup = f.read()
f.close()
# wait until database is locked for this write
while not self._lock():
continue
f = open(self.path, "w")
try:
writer = csv.writer(f)
writer.writerows(
[
[
umb["serial"],
umb["alias"],
umb["status"],
umb["tenant_name"],
umb["tenant_id"],
umb["tenant_phone"],
umb["tenant_email"],
umb["lent_at"].isoformat(timespec="seconds")
if umb["lent_at"]
else "",
]
for umb in umbrellas
]
)
except:
# failure occurred on write
# abort write, and write back contents as they were before
# TODO: pass on exception and keep log
f.close()
f = open(self.path, "w")
f.write(backup)
f.close()
self._unlock()
def _update(self, update: dict) -> list:
"""Update status of one umbrella, and return the entire updated database."""
umbrellas = self._read()
for idx, umb in enumerate(umbrellas):
if umb["serial"] == update["serial"]:
umbrellas[idx] = update
self._write(umbrellas)
return umbrellas
def _find_by_serial(self, serial: int) -> dict:
"""Given a serial number, returns an umbrella with such serial.
If there is no such umbrella, returns None.
"""
umbrellas = self._read()
for umb in umbrellas:
if umb["serial"] == serial:
return umb
def read(self) -> list:
"""An interface to `_read()` with supplemental data included.
All exposed methods with a return value should use this method instead of `_read()`.
Supplemental data:
- lent_at_str: string representation for lent_at.
- lent_time_ago: time since umbrella was taken away by tenant. if umbrella is not
taken away, its value is None.
- lent_time_ago_str: string representation for lent_time_ago.
"""
umbrellas = self._read()
now = local_now()
for idx, umb in enumerate(umbrellas):
if umb["status"] in ("lent", "overdue"):
umbrellas[idx]["lent_at_str"] = human_datetime(umb["lent_at"])
lent_time_ago = now - umb["lent_at"]
umbrellas[idx]["lent_time_ago"] = lent_time_ago
umbrellas[idx]["lent_time_ago_str"] = human_timedelta(lent_time_ago)
return umbrellas
def group_by_status(umbrellas) -> dict:
"""(static method) Returns umbrellas grouped into a dict by their status."""
keys = set([umb["status"] for umb in umbrellas])
statuses = {}
for key in keys:
statuses[key] = [umb for umb in umbrellas if umb["status"] == key]
return statuses
def find_overdue(umbrellas) -> list:
"""(static method) Returns umbrellas in possession of their tenant for too long."""
now = local_now()
return [
umb
for umb in umbrellas
if umb["lent_at"] is not None
and now - umb["lent_at"] > timedelta(hours=DUE_HOURS)
]
def take_away(
self, serial, date, tenant_name, tenant_id, tenant_phone="", tenant_email=""
) -> None:
"""When a user has borrowed an umbrella."""
umb = self._find_by_serial(serial)
if umb is None:
raise UmbrellaNotFoundError
elif umb["status"] != "available":
raise UmbrellaStatusError
umb["status"] = "lent"
umb["tenant_name"] = tenant_name
umb["tenant_id"] = tenant_id
umb["tenant_phone"] = tenant_phone
umb["tenant_email"] = tenant_email
umb["lent_at"] = date
self._update(umb)
def give_back(self, serial, date) -> None:
"""When a user has returned an umbrella."""
umb = self._find_by_serial(serial)
if umb is None:
raise UmbrellaNotFoundError
elif umb["status"] not in ("lent", "overdue"):
raise UmbrellaStatusError
umb["status"] = "available"
for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]:
umb[key] = ""
umb["lent_at"] = None
self._update(umb)
def mark_overdue(self, serial, date) -> None:
"""When an umbrella is overdue, change its status to "overdue"."""
umb = self._find_by_serial(serial)
if umb is None:
raise UmbrellaNotFoundError
elif umb["status"] != "lent":
raise UmbrellaStatusError
umb["status"] = "overdue"
self._update(umb)
|