From 89d16bb05a61936658ae2939d57e2c3e5f726a6b Mon Sep 17 00:00:00 2001 From: Frederick Yin Date: Sat, 13 Jun 2020 22:51:08 +0800 Subject: Initial commit: MVP --- git-gmi/cgi.py | 77 +++++++++++++++++++++++++++ git-gmi/const.py | 5 ++ git-gmi/git.py | 157 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 239 insertions(+) create mode 100755 git-gmi/cgi.py create mode 100644 git-gmi/const.py create mode 100644 git-gmi/git.py (limited to 'git-gmi') diff --git a/git-gmi/cgi.py b/git-gmi/cgi.py new file mode 100755 index 0000000..c243ccd --- /dev/null +++ b/git-gmi/cgi.py @@ -0,0 +1,77 @@ +#!/home/fakefred/p/git.gmi/venv/bin/python3.8 +# gotta change the executable path before running +from git import * +from const import * +from os import environ, listdir + +# be careful when using print(); stdout is passed to the client. +# this cgi uses \n as newline. + + +def generate_navigation(repo_name: str): + pass # TODO + + +def handle_cgi_request(path: str): + # intended to work with Jetforce. + # url: gemini://git.gemini.site/cgi-bin/cgi.py/repo/src/static/css/[index.css] + # path: /repo/src/static/css/[index.css] + # path_trace = ['repo', 'src', 'static', 'css', 'index.css'] + path_trace = path[1:].split("/") + if path_trace == [""]: # empty path + print(f"{STATUS_SUCCESS} {META_GEMINI}") # welcome page + print("Welcome to the git.gmi demo") + print("Available repositories:") + print("\n".join([f"=> {dir}/summary" for dir in listdir(GIT_CATALOG)])) + return + + try: + repo = GitGmiRepo(path_trace[0], f"{GIT_CATALOG}/{path_trace[0]}") + except FileNotFoundError: + print(STATUS_NOT_FOUND) + return + + if len(path_trace) > 1: + view = path_trace[1] # e.g. summary, tree, log + else: + pass # TODO: summary + + if view == "summary": + try: + print(repo.view_summary()) + return + except: + print(STATUS_TEMPORARY_FAILURE) + return + + elif view == "tree": + if len(path_trace) == 2: + print("31 master/") + return + + if len(path_trace) > 2: + branch = path_trace[2] + + if len(path_trace) == 3: + location = [] + else: + location = path_trace[3:] + + try: # is dir + print(repo.view_tree(branch, location)) + except FileNotFoundError: # is file + try: + print(repo.view_blob(branch, location)) + except FileNotFoundError: + print("50 Error locating content") + + elif view == "log": + try: + print(repo.view_log()) + return + except: + print(STATUS_TEMPORARY_FAILURE) + return + + +handle_cgi_request(environ.get("PATH_INFO")) diff --git a/git-gmi/const.py b/git-gmi/const.py new file mode 100644 index 0000000..9a1bddd --- /dev/null +++ b/git-gmi/const.py @@ -0,0 +1,5 @@ +GIT_CATALOG = "/home/fakefred/p/gemini/git/" +STATUS_SUCCESS = "20 SUCCESS" +STATUS_NOT_FOUND = "51 NOT FOUND" +STATUS_TEMPORARY_FAILURE = "40 TEMPORARY FAILURE" +META_GEMINI = "text/gemini" diff --git a/git-gmi/git.py b/git-gmi/git.py new file mode 100644 index 0000000..9a0ab0d --- /dev/null +++ b/git-gmi/git.py @@ -0,0 +1,157 @@ +from pygit2 import * +import mimetypes +from const import * + + +class GitGmiRepo: + def __init__(self, name: str, path: str): + self.name = name + self.path = path + try: + self.repo = Repository(path) + except FileNotFoundError: + print(f"Error: repository {path} not found") + + def view_summary(self) -> str: + tree = self.get_tree("master") + trls = self.list_tree(tree) + for item in trls: + if item["type"] == "file" and item["name"].lower().split(".")[0] == ( + "readme" + ): + # mimetypes.guess_type() returns tuple (type, encoding) + # only the first one of which we care about + response = ( + f"{STATUS_SUCCESS} {mimetypes.guess_type(item['name'])[0]}\n" + "=> tree/master/ tree\n" + "=> log/ log\n\n" + ) + response += item["blob"].data.decode("utf-8") + break + return response + + def get_commit_log(self) -> list: + # returns the commit log in a human-readable way. + repo = self.repo + commits = list(repo.walk(repo[repo.head.target].id, GIT_SORT_TIME)) + log = [ + { + "id": str(cmt.id), # hex SHA-1 hash + "short_id": str(cmt.short_id), # short version of the above + "author": cmt.author.name, # author's display name + "time": cmt.commit_time, # unix timestamp + "msg": cmt.message, # full commit message + } + for cmt in commits + ] + + return log # reverse chronical order + + def view_log(self) -> str: + response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + log = self.get_commit_log() + for cmt in log: + response += f"## {cmt['short_id']} - {cmt['author']}\n{cmt['msg']}\n\n" + return response + + @classmethod + def parse_recursive_tree(cls, tree: Tree) -> list: + # recursively replace all Trees with a list of Blobs inside it, + # bundled with the Tree's name as a tuple, + # e.g. [('src', [blob0, blob1]), otherblob]. + tree_list = list(tree) + for idx, item in enumerate(tree_list): + if isinstance(item, Tree): + tree_list[idx] = (item.name, cls.parse_recursive_tree(tree_list[idx])) + + return tree_list + + def get_tree(self, commit_str: str) -> list: + # returns a recursive list of Blob objects + try: + commit = self.repo.revparse_single(commit_str) + # top level tree; may contain sub-trees + return self.parse_recursive_tree(commit.tree) + except ValueError: + raise FileNotFoundError(f"Error: no such tree: {commit_str}") + return None + + @staticmethod + def list_tree(tree_list: list, location=[]) -> list: + # tree_list is the output of parse_recursive_tree(); + # location is which dir you are viewing, represented path-like + # in a list, e.g. ['src', 'static', 'css'] => 'src/static/css', + # which this method will cd into and display to the visitor. + # when there is no such dir, raises FileNotFoundError. + trls = tree_list + for loc in location: + found = False + for item in trls: + if isinstance(item, tuple) and item[0] == loc: + trls = item[1] + found = True + break + if not found: + raise FileNotFoundError( + f"Error: no such directory: {'/'.join(location)}" + ) + + contents = [] + for item in trls: + if isinstance(item, tuple): + # was originally a Tree; structure: ('dir_name', [list_of_blobs]) + contents.append( + { + "type": "dir", + "name": item[0], + "size": len(item[1]), # number of objects in dir + } + ) + + elif isinstance(item, Blob): + contents.append( + { + "type": "file", + "name": item.name, + "blob": item, + "size": item.size, # size in bytes + } + ) + + return contents + + def view_tree(self, branch: str, location=[]) -> str: + # actual Gemini response + # consists of a header and a body + response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + tree = self.get_tree(branch) + contents = self.list_tree(tree, location) + for item in contents: + if item["type"] == "dir": + response += ( + f"=> {item['name']}/ {item['name']}/ | {item['size']} items\n" + ) + elif item["type"] == "file": + response += f"=> {item['name']} {item['name']} | {item['size']} bytes\n" + return response + + def get_blob(self, commit_str: str, location=[]) -> Blob: + # returns a specific Blob object + # location: just like that of list_tree, but the last element + # is the filename + try: + tree = self.get_tree(commit_str) + trls = self.list_tree(tree, location[:-1]) + for item in trls: + if item["type"] == "file" and item["name"] == location[-1]: + return item["blob"] + raise FileNotFoundError(f"Error: no such file: {'/'.join(location)}") + except FileNotFoundError: + raise FileNotFoundError(f"Error: No such tree: {'/'.join(location[:-1])}") + + def view_blob(self, branch: str, location=[]) -> str: + blob = self.get_blob(branch, location) + guessed_mimetype = mimetypes.guess_type(blob.name)[0] or "text/plain" + response = f"{STATUS_SUCCESS} {guessed_mimetype}\n" + response += blob.data.decode("utf-8") + return response -- cgit v1.2.3