summaryrefslogtreecommitdiff
path: root/jimbrella/database.py
blob: ab54a283f726ff21bca6550db7c381f5dfb30559 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import csv
import os
from datetime import datetime, timedelta
from .utils import human_datetime, human_timedelta
from .config import DUE_HOURS


class Database:
    def __init__(self, path):
        """A database of all umbrellas and their current state.

        Currently, the data are represented in a csv file.

        Explanation of data types:
        - id        | a unique numerical identity of unsigned integer type.
        - uint      | whatever fits the regex /[0-9]+/. despite carrying a numerical value, it is
                    | internally represented as a string.
        - string    | any UTF-8 string.
        - date      | an ISO 8601 date of format "YYYY-MM-DDThh:mm:ss". default timezone is UTC+8.

        The status of an umbrella consists of:
        - serial        | (id) unique identifier for the umbrella.
        - alias         | (string) future compatibility. a human readable (preferably cute) name
                        | for a particular umbrella
        - status        | (string) one of ("available", "lent", "withheld", "maintenance",
                        | "withheld", "unknown")
                        | available     : is in service on the stand
                        | lent          : is in temporary possession of a user
                        | maintenance   : is on the stand but not in service
                        | withheld      : is not on the stand but rather in the possession of an
                        |                 administrator
                        | unknown       : none of the above
                        | NOTE: the values above are subject to changes.
        - tenant_name   | (string) the person in temporary possession of the umbrella.
        - tenant_id     | (uint) student or faculty ID.
        - tenant_phone  | (uint) phone number via which to contact tenant when the lease is due.
        - tenant_email  | (string) future compatibility. empty for the time being.
        - lent_at       | (date) if status is "lent", lent_at is the submission time of the user's
                        | jForm answer sheet. is blank otherwise.
        """
        self.path = path

    def _lock(self) -> int:
        """Create a lockfile for database.

        If the database is called "db.csv", the lockfile, located in the same
        directory, would be called "db.csv.lock".

        Returns 1 if the lockfile is successfully created, 0 if the lockfile
        already exists (strong hint of unrecovered crash or potential race
        condition - caller of this method should abort whatever it is doing)
        """
        try:
            f = open(self.path + ".lock", "x")
            f.close()
            return 1
        except FileExistsError:
            return 0

    def _unlock(self) -> None:
        """Remove lockfile created by _lock.

        If there is no lockfile, simply ignore.
        """
        try:
            os.remove(self.path + ".lock")
        except FileNotFoundError:
            pass

    def _read(self) -> list:
        """Deserialize database."""
        # Create database if it does not yet exist
        try:
            f = open(self.path, "x")
            f.close()
        except FileExistsError:
            pass

        with open(self.path) as f:
            reader = csv.reader(f)
            umbrellas = [
                {
                    "serial": int(row[0]),
                    "alias": row[1],
                    "status": row[2],
                    "tenant_name": row[3],
                    "tenant_id": row[4],
                    "tenant_phone": row[5],
                    "tenant_email": row[6],
                    "lent_at": datetime.fromisoformat(row[7]) if row[7] else None,
                }
                for row in reader
            ]
            f.close()

            return umbrellas

    def _write(self, umbrellas: list) -> None:
        """Serialize database. When a failure occurs, abort and recover data."""
        # make backup in memory
        with open(self.path) as f:
            backup = f.read()
            f.close()

        # wait until database is locked for this write
        while not self._lock():
            continue

        f = open(self.path, "w")
        try:
            writer = csv.writer(f)
            writer.writerows(
                [
                    [
                        umb["serial"],
                        umb["alias"],
                        umb["status"],
                        umb["tenant_name"],
                        umb["tenant_id"],
                        umb["tenant_phone"],
                        umb["tenant_email"],
                        umb["lent_at"].isoformat(timespec="seconds")
                        if umb["lent_at"]
                        else "",
                    ]
                    for umb in umbrellas
                ]
            )
        except:
            # failure occurred on write
            # abort write, and write back contents as they were before
            # TODO: pass on exception and keep log
            f.close()
            f = open(self.path, "w")
            f.write(backup)
            f.close()

        self._unlock()

    def _update(self, update: dict) -> list:
        """Update status of one umbrella, and return the entire updated database."""
        umbrellas = self._read()
        for idx, umb in enumerate(umbrellas):
            if umb["serial"] == update["serial"]:
                umbrellas[idx] = update
                self._write(umbrellas)
                return umbrellas

    def _find_by_serial(self, serial: int) -> dict:
        """Given a serial number, returns an umbrella with such serial.
        If there is no such umbrella, returns None.
        """
        umbrellas = self._read()
        for umb in umbrellas:
            if umb["serial"] == serial:
                return umb

    def read(self) -> list:
        """An interface to `_read()` with supplemental data included.

        All exposed methods with a return value should use this method instead of `_read()`.

        Supplemental data:
        - lent_at_str:          string representation for lent_at.
        - lent_time_ago:        time since umbrella was taken away by tenant. if umbrella is not
                                taken away, its value is None.
        - lent_time_ago_str:    string representation for lent_time_ago.
        """
        umbrellas = self._read()
        now = datetime.now()
        for idx, umb in enumerate(umbrellas):
            if umb["status"] == "lent":
                umbrellas[idx]["lent_at_str"] = human_datetime(umb["lent_at"])
                lent_time_ago = now - umb["lent_at"]
                umbrellas[idx]["lent_time_ago"] = lent_time_ago
                umbrellas[idx]["lent_time_ago_str"] = human_timedelta(lent_time_ago)
        return umbrellas

    def group_by_status(umbrellas) -> dict:
        """(static method) Returns umbrellas grouped into a dict by their status."""
        keys = set([umb["status"] for umb in umbrellas])
        statuses = {}
        for key in keys:
            statuses[key] = [umb for umb in umbrellas if umb["status"] == key]
        return statuses

    def find_overdue(umbrellas) -> list:
        """(static method) Returns umbrellas in possession of their tenant for too long."""
        now = datetime.now()
        return [
            umb
            for umb in umbrellas
            if umb["lent_at"] is not None
            and now - umb["lent_at"] > timedelta(hours=DUE_HOURS)
        ]

    def take_away(
        self, serial, tenant_name, tenant_id, tenant_phone="", tenant_email=""
    ) -> None:
        """When a user has borrowed an umbrella."""
        umb = self._find_by_serial(serial)
        if umb is None:
            raise ValueError(f"No umbrella with serial {serial} found.")
        elif umb["status"] != "available":
            raise ValueError(f"Umbrella with serial {serial} is inavailable.")
        umb["status"] = "lent"
        umb["tenant_name"] = tenant_name
        umb["tenant_id"] = tenant_id
        umb["tenant_phone"] = tenant_phone
        umb["tenant_email"] = tenant_email
        umb["lent_at"] = datetime.now()
        self._update(umb)

    def give_back(self, serial) -> None:
        """When a user has returned an umbrella."""
        umb = self._find_by_serial(serial)
        if umb is None:
            raise ValueError(f"No umbrella with serial {serial} found.")
        elif umb["status"] != "lent":
            raise ValueError(f"Umbrella with serial {serial} is not lent out.")
        umb["status"] = "available"
        for key in ["tenant_name", "tenant_id", "tenant_phone", "tenant_email"]:
            umb[key] = ""
        umb["lent_at"] = None
        self._update(umb)