from textual.screen import Screen from textual.app import ComposeResult from textual.widgets import Input, Select, TextArea, LoadingIndicator, Static, DataTable, ContentSwitcher, Tabs, Tab, Button, Markdown from textual.containers import VerticalGroup, Vertical, HorizontalGroup, Right from textual import work from widgets import Navbar from datetime import datetime from human_readable import time_delta import requests, asyncio class RepoViewScreen(Screen): CSS_PATH = "../assets/repo_view_screen.tcss" def __init__(self, owner_name: str, repo_name: str): super().__init__() self.owner_name = owner_name self.repo_name = repo_name self.current_dir = "." self.readme_loaded = False def get_icon_from_name_and_type(self, file_name: str, is_folder: bool): # nerd font icons my beloved if not is_folder: match file_name: case "Makefile": return "\ue673" case "Dockerfile": return "\ue7b0" case "requirements.txt": return "\ue73c" case "LICENSE": return "\uf1f9" case "Cargo.lock" | "Cargo.toml": return "\ue7a8" extension = file_name[file_name.index(".")+1:] match extension: case 'c' | 'h': return "\ue61e" case 'cpp': return "\ue61d" case 'py': return "\ue73c" case 'js': return "\ue781" case 'json': return "\ueb0f" case 'gitignore' | 'gitmodules': return "\ue702" case 'html' | 'htm': return "\ue736" case 'css' | 'tcss': return "\ue749" case 'svg': return "\ue698" case 'ico': return "\ue623" case 'go': return "\ue65e" case 'rs': return "\ue7a8" case 'grnd': return "\uf44f" case 'md': return "\ueb1d" case 'fish': return "\uee41" case 'sh': return "\ue760" case 'bat': return "\ue70f" case 'png' | 'jpg' | 'jpeg' | 'avif': return "\uf03e" case 'lua': return "\ue620" case 'zip' | 'tar' | 'gz' | "7z": return "\ue6aa" case _: return "\uf15b" else: return "\ue5ff" @work(thread=False, exclusive=True) async def action_view_file(self, path: str, type: str): self.notify(f"{path, type}") if type == "dir": self.current_dir = path self.show_directory(path) elif type == "file": self.show_file(path) def show_file(self, path: str): files: DataTable = self.query_one("#files") loading = self.query_one(LoadingIndicator) open_file: TextArea = self.query_one("#open-file") self.query_one("#readme").display = False self.query_one("#repo-info").display = False files.display = False open_file.display = False loading.display = True content = requests.get( self.app.GITEA_HOST + f"api/v1/repos/{self.owner_name}/{self.repo_name}/raw/{path}" ).text languages = { "py": "python", "md": "markdown", "h": "c", "cpp": "c++" } extension = path[path.rfind(".")+1:] open_file.text = content try: open_file.language = languages.get(extension, extension) except: open_file.language = None self.notify(f"{open_file._languages}") loading.display = False open_file.display = True def show_directory(self, path: str): files: DataTable = self.query_one("#files") self.query_one("#open-file").display = False files.clear(columns=True) loading = self.query_one(LoadingIndicator) self.query_one("#repo-info").display = path == "." files.display = False loading.display = True readme: Markdown = self.query_one("#readme") readme.display = path == "." # get files in dir files_response = requests.get( self.app.GITEA_HOST + f"api/v1/repos/{self.owner_name}/{self.repo_name}/contents/{path}" ) if not files_response.ok: self.notify(files_response.text, title="Failed to get files:", severity="error") return print("Getting most recent commit...") # get most recent commit commits_response = requests.get( self.app.GITEA_HOST + f"api/v1/repos/{self.owner_name}/{self.repo_name}/commits", params={ "limit": 1, "path": self.current_dir } ) if not commits_response.ok: self.notify(commits_response.text, title="Failed to get most recent commit:", severity="error") return most_recent_commit = commits_response.json()[0] files.add_columns( f"[b]{most_recent_commit["commit"]["author"]["name"]}[/]", f"[r]{most_recent_commit["sha"][:10]}[/]", f"[d]{most_recent_commit["commit"]["message"]}" ) print("Getting root commits...") rows = [] commit_page_number = 1 page_size = 50 commit_data = {} # loop over all the root commits until we get em' all while True: commits = requests.get( self.app.GITEA_HOST + f"api/v1/repos/{self.owner_name}/{self.repo_name}/commits", params={ "path": path, "verification": False, "files": False, "stat": False, "page": commit_page_number, "limit": page_size, } ).json() commit_data.update({commit["sha"]: commit for commit in commits}) if len(commits) == page_size: # we reached end of the page commit_page_number += 1 print("Next page...") else: break found_readme = False # add a ".." folder if we're not in the root directory if path != ".": previous_dir = self.current_dir[:self.current_dir.rfind("/")] rows.append(( f"[cyan]{self.get_icon_from_name_and_type("..", True)}[/] [@click=screen.view_file('{previous_dir}','dir')]..[/]", "", "" )) for file in files_response.json(): commit = commit_data[file["last_commit_sha"]] commit_created_at = datetime.fromisoformat(commit["created"]).replace(tzinfo=None) rows.append(( f"[cyan]{self.get_icon_from_name_and_type(file["name"], file["type"] != "file")}[/] [@click=screen.view_file('{self.current_dir}/{file["name"]}','{file["type"]}')]{file["name"]}[/]", f"[d]{commit["commit"]["message"]}[/]", f"[d]Updated {time_delta(datetime.now() - commit_created_at)} ago[/]" )) if not self.readme_loaded and path == "." and file["name"].lower() == "readme.md": print("Getting README...") found_readme = True self.readme_loaded = True readme.border_title = f"\uf405 {file["name"]}" readme_content = requests.get( self.app.GITEA_HOST + f"api/v1/repos/{self.owner_name}/{self.repo_name}/raw/{file["name"]}" ).text self.app.call_from_thread(readme.update, readme_content or "Empty README") print("Adding rows...") files.add_rows(rows) if not self.readme_loaded and not found_readme and path == ".": self.readme_loaded = True self.app.call_from_thread(readme.update, "This repository has no `README.md` file.") files.display = True loading.display = False @work(thread=True, exclusive=True) def on_mount(self): print("Getting files...") self.show_directory(self.current_dir) def compose(self) -> ComposeResult: # get repo data via a request response = requests.get( url=self.app.GITEA_HOST + f"api/v1/repos/{self.owner_name}/{self.repo_name}" ) if not response.ok: self.notify(response.text, title="Failed to get repo:", severity="error") yield Navbar() return data = response.json() yield Navbar() with VerticalGroup(id="top"): with HorizontalGroup(): yield Static(f" {self.owner_name}/[b]{self.repo_name}[/]", id="title") with Right(id="buttons"): yield Button(f"\uf005 Star [d]({data["stars_count"]})", variant="warning", flat=True) yield Button(f"\uf418 Fork [d]({data["forks_count"]})", variant="primary", flat=True) yield Button(f"\uea70 Watch [d]({data["watchers_count"]})", flat=True) yield Tabs( "\uf44f Code", "\uebf8 Issues", "\ue726 Pull Requests", "\uf500 Actions", "\ueb29 Packages", "\uf502 Projects", "\uf412 Releases", "\ueaa4 Wiki", "\uf21e Activity" ) with ContentSwitcher(initial="Code"): with HorizontalGroup(id="Code"): with VerticalGroup(id="commits"): with HorizontalGroup(id="branch-info"): yield Static(f"[@click='']\uf4b6 1 Commits[/]") yield Static(f"[@click='']\uf418 1 Branch[/]") yield Static(f"[@click='']\uf02b 0 Tags[/]") with HorizontalGroup(id="branch-options"): yield Select.from_values([ "main" ], allow_blank=False, id="branch") yield Button("\ue726", flat=True, id="new-pull-request", tooltip="New Pull Request") yield Button("Go to file", flat=True) table = DataTable(id="files", show_cursor=False) #table.add_columns("SpookyDervish [r]9b32c417e9[/]", "switched from tabs to spaces", "51 minutes ago") #table.add_row("\ue5ff screens", "[d]switched from tabs to spaces", "[d]51 minutes ago") yield table yield TextArea.code_editor(theme="css", placeholder="Empty file", read_only=True, id="open-file") yield LoadingIndicator() readme = Markdown(markdown="Loading...", id="readme") readme.border_title = "\uf405 README.md" yield readme with Vertical(id="repo-info"): with HorizontalGroup(): yield Input(placeholder="Search code...", id="search-query") yield Button("\uf002", flat=True, id="search-btn") yield Static("\n[b] Description") yield Static("[d]" + data["description"], id="description")