from pygit2 import * from hurry.filesize import size, alternative from datetime import datetime, timedelta import dateutil.parser from pathlib import Path import os import shutil import mimetypes from const import * from config import * mimetypes.add_type("text/gemini", ".gmi") mimetypes.add_type("text/gemini", ".gemini") def convert_filesize(bytes: int) -> str: # convert filesize in bytes to a human-friendly format return size(bytes, system=alternative) class GitGmiRepo: def __init__(self, name: str, path: str): self.name = name self.path = path self.cache_dir = Path(CACHE_DIR) / name self._init_cache() try: self.repo = Repository(path) except GitError: raise FileNotFoundError(f"Error: no such repo: {name}") def _init_cache(self): try: os.mkdir(self.cache_dir) except FileExistsError: pass def _read_cache(self, req: list) -> str: # req is what the user requests after the repo name, # like ["tree", "master", "src"] # which points to a file called tree_master_src.gmi # file content: # 20 text/gemini # [body - page content] # [newline] # cached at: # [iso timestamp] fn = "_".join(req) + ".gmi" try: with open(self.cache_dir / fn) as f: response = f.read() f.close() created_at = dateutil.parser.isoparse(response.splitlines()[-1]) if datetime.now() - created_at < timedelta(seconds=CACHE_TTL): # cache valid # response will include the timestamp return response except FileNotFoundError: pass return None def _write_cache(self, req: list, resp: str): # write resp into cache, appended with timestamp fn = "_".join(req) + ".gmi" try: f = open(self.cache_dir / fn, "x") except FileExistsError: f = open(self.cache_dir / fn, "w") f.write(resp + "\ncached at:\n" + datetime.now().isoformat()) def _flush_cache(self): try: shutil.rmtree(self.cache_dir) except FileNotFoundError: pass def _generate_header(self): # global "header" to display above all views (except raw files) header = ( f"# {self.name}\n" f"=> {CGI_PATH} {GIT_GMI_SITE_TITLE}\n" f"=> {CGI_PATH}{self.name}/summary summary\n" f"=> {CGI_PATH}{self.name}/tree/{MAIN_BRANCH}/ tree\n" f"=> {CGI_PATH}{self.name}/log log\n" f"=> {CGI_PATH}{self.name}/refs refs\n\n" ) return header def view_summary(self) -> str: cached = self._read_cache(["summary"]) if cached is not None: return cached response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self._generate_header() # show 3 recent commits recent_commits = self._get_commit_log()[:3] for cmt in recent_commits: time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC" response += ( f"### {cmt['short_id']} - {cmt['author']} - {time}\n" f"{cmt['msg'].splitlines()[0]}\n\n" ) # TODO: link to commit view # find and display readme(.*) tree = self._get_tree(MAIN_BRANCH) trls = self._list_tree(tree) found_readme = False for item in trls: if ( item["type"] == "file" and item["name"].lower().split(".")[0] == ("readme") and not found_readme ): found_readme = True response += ( f"## {item['name']} | {convert_filesize(item['size'])}\n" f"{item['blob'].data.decode('utf-8')}" ) if not found_readme: response += "## No readme found." self._write_cache(["summary"], response) return response def _get_commit_log(self) -> list: # returns useful info from commit log. repo = self.repo commits = list(repo.walk(repo[repo.head.target].id, GIT_SORT_TIME)) log = [ { "id": str(cmt.id), # hex SHA-1 hash "short_id": str(cmt.short_id), # short version of the above "author": cmt.author.name, # author's display name "time": cmt.commit_time, # unix timestamp "msg": cmt.message, # full commit message } for cmt in commits ] return log # reverse chronical order def view_log(self) -> str: cached = self._read_cache(["log"]) if cached is not None: return cached response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self._generate_header() log = self._get_commit_log() for cmt in log: # looks like "2020-06-06 04:51:21 UTC" time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC" response += ( f"## {cmt['short_id']} - {cmt['author']} - {time}\n" f"=> commit/{cmt['id']} view diff\n" f"=> tree/{cmt['id']}/ view tree\n" f"{cmt['msg']}\n\n" ) self._write_cache(["log"], response) return response def _get_commit(self, commit_str) -> dict: try: commit = self.repo.revparse_single(commit_str) diff = self.repo.diff(commit.parents[0], commit) return { "id": commit.id, "author": commit.author.name, "time": commit.commit_time, "msg": commit.message, "patch": diff.patch, } except ValueError: raise FileNotFoundError(f"Error: no such commit: {commit_str}") def view_commit(self, commit_str) -> str: cached = self._read_cache(["commit", commit_str]) if cached is not None: return cached commit = self._get_commit(commit_str) response = ( f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self._generate_header() + f"{commit['id']} - {commit['author']} - {commit['time']}\n" + commit["msg"] + "\n" + f"=> {CGI_PATH}{self.name}/tree/{commit['id']}/ view tree\n" + f"=> {commit_str}?raw view raw\n" + "\n```\n" + commit["patch"] + "\n```" ) self._write_cache(["commit", commit_str], response) return response def view_raw_commit(self, commit_str) -> str: commit = self.get_commit(commit_str) response = f"{STATUS_SUCCESS} {META_PLAINTEXT}\r\n" + commit["patch"] return response def _get_refs(self) -> list: refs = self.repo.listall_reference_objects() return [ { "name": ref.name, "shorthand": ref.shorthand, "target": ref.target, "type": ref.type, } for ref in refs ] def view_refs(self) -> str: cached = self._read_cache(["refs"]) if cached is not None: return cached response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self._generate_header() refs = self._get_refs() for ref in refs: # HACK: filter out refs with slashes as remote branches if ref["shorthand"].find("/") == -1: response += ( f"## {ref['shorthand']}\n=> tree/{ref['shorthand']}/ view tree\n\n" ) self._write_cache(["refs"], response) return response @classmethod def _parse_recursive_tree(cls, tree: Tree) -> list: # recursively replace all Trees with a list of Blobs inside it, # bundled with the Tree's name as a tuple, # e.g. [('src', [blob0, blob1]), otherblob]. tree_list = list(tree) for idx, item in enumerate(tree_list): if isinstance(item, Tree): tree_list[idx] = (item.name, cls._parse_recursive_tree(tree_list[idx])) return tree_list def _get_tree(self, revision_str: str) -> list: # returns a recursive list of Blob objects try: revision = self.repo.revparse_single(revision_str) if isinstance(revision, Commit): # top level tree; may contain sub-trees return self._parse_recursive_tree(revision.tree) elif isinstance(revision, Tag): return self._parse_recursive_tree(revision.get_object().tree) except ValueError: raise FileNotFoundError(f"Error: no such tree: {revision_str}") return None @staticmethod def _list_tree(tree_list: list, location=[]) -> list: # tree_list is the output of _parse_recursive_tree(); # location is which dir you are viewing, represented path-like # in a list, e.g. ['src', 'static', 'css'] => 'src/static/css', # which this method will cd into and display to the visitor. # when there is no such dir, raises FileNotFoundError. trls = tree_list for loc in location: found = False for item in trls: if isinstance(item, tuple) and item[0] == loc: trls = item[1] found = True break if not found: raise FileNotFoundError( f"Error: no such directory: {'/'.join(location)}" ) contents = [] for item in trls: if isinstance(item, tuple): # was originally a Tree; structure: ('dir_name', [list_of_blobs]) contents.append( { "type": "dir", "name": item[0], "items": len(item[1]), # number of objects in dir } ) elif isinstance(item, Blob): contents.append( { "type": "file", "name": item.name, "blob": item, "size": item.size, # size in bytes } ) return contents def view_tree(self, branch: str, location=[]) -> str: # actual Gemini response # consists of a header and a body cached = self._read_cache(["tree", branch] + location) if cached is not None: return cached tree = self._get_tree(branch) contents = self._list_tree(tree, location) items = len(contents) response = ( f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self.generate_header() + f"## {self.name}{'/' if location else ''}{'/'.join(location)}/" f" | {items} {'items' if items > 1 else 'item'}\n\n" ) for item in contents: if item["type"] == "dir": response += ( f"=> {item['name']}/ {item['name']}/ | {item['items']} items\n" ) elif item["type"] == "file": response += f"=> {item['name']} {item['name']} | {convert_filesize(item['size'])}\n" self._write_cache(["tree", branch] + location, response) return response def _get_blob(self, commit_str: str, location=[]) -> Blob: # returns a specific Blob object # location: just like that of _list_tree, but the last element # is the filename try: tree = self._get_tree(commit_str) trls = self._list_tree(tree, location[:-1]) for item in trls: if item["type"] == "file" and item["name"] == location[-1]: return item["blob"] raise FileNotFoundError(f"Error: no such file: {'/'.join(location)}") except FileNotFoundError: raise FileNotFoundError(f"Error: No such tree: {'/'.join(location[:-1])}") def view_blob(self, branch: str, location=[]) -> str: cached = self._read_cache(["tree", branch] + location) if cached is not None: return cached blob = self._get_blob(branch, location) response = ( f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self.generate_header() + f"## {self.name}/{'/'.join(location)} | {convert_filesize(blob.size)}\n\n" f"=> {blob.name}?raw view raw\n\n" f"```\n" ) response += blob.data.decode("utf-8") + "\n```" self._write_cache(["tree", branch] + location, response) return response def view_raw_blob(self, branch: str, location=[]) -> str: blob = self._get_blob(branch, location) # if mimetypes can't make out the type, set it to plaintext guessed_mimetype = mimetypes.guess_type(blob.name)[0] or META_PLAINTEXT response = f"{STATUS_SUCCESS} {guessed_mimetype}\r\n" response += blob.data.decode("utf-8") return response