216 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			216 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from textual_window import Window
 | |
| from textual.widgets import RichLog, Button
 | |
| from textual import work
 | |
| import textual.widgets
 | |
| from typing import Any
 | |
| from traceback import format_exception
 | |
| 
 | |
| from lupa import lua51
 | |
| 
 | |
| import os, json, asyncio
 | |
| 
 | |
| 
 | |
| class PluginLoader(Window):
 | |
|     def __init__(self):
 | |
|         super().__init__(
 | |
|             id="Plugin Loader",
 | |
|             mode="permanent",
 | |
|             icon="⚙️",
 | |
|             starting_horizontal="right",
 | |
|             starting_vertical="bottom",
 | |
|             start_open=True,
 | |
|             allow_maximize=True
 | |
|         )
 | |
| 
 | |
|     # region Lua functions
 | |
| 
 | |
|     def fake_notify(self, message: str, title: str = None, severity: str = "information"):
 | |
|         self.app.notify(message=message, title=title, severity=severity)
 | |
| 
 | |
|     def create_sidebar_button(self, icon: str):
 | |
|         new_button = Button(icon)
 | |
|         self.app.query_one("#sidebar-buttons").mount(new_button)
 | |
| 
 | |
|     def set_theme(self, theme_name: str):
 | |
|         self.app.theme = theme_name
 | |
| 
 | |
|     def add_bind(self, action_name: str, key: str, description: str, show: bool = True):
 | |
|         # a bit of a sneaky way of doing things
 | |
|         #self.app.bind(key, action_name, description=description, show=show)
 | |
|         self.app._bindings.bind(key, action_name, description, show, priority=True)
 | |
|         self.app.refresh_bindings()
 | |
| 
 | |
|     def fake_run_action(self, action_name: str):
 | |
|         getattr(self.app, f"action_{action_name}")()
 | |
| 
 | |
|     def create_action(self, action_name: str, function):
 | |
|         def wrapper():
 | |
|             function()
 | |
| 
 | |
|         setattr(self.app, f"action_{action_name}", wrapper)
 | |
|     
 | |
|     def create_widget(self, widget_type: str, *args):
 | |
|         return getattr(textual.widgets, widget_type)(*args)
 | |
|     
 | |
|     def run_on_message(self, event, function):
 | |
|         raise NotImplementedError("onMessage is not implemented yet.")
 | |
| 
 | |
|     def create_window(self, title: str, icon: str = "", show_title: bool = True, can_close: bool = True, can_maximize: bool = True, can_resize: bool = True, start_horizontal: str = "center", start_vertical: str = "middle"):
 | |
|         new_window = Window(
 | |
|             id=title,
 | |
|             mode="temporary" if can_close else 'permanent',
 | |
|             allow_maximize=can_maximize,
 | |
|             allow_resize=can_resize,
 | |
|             start_open=True,
 | |
|             icon=icon,
 | |
|             show_title=show_title,
 | |
|             starting_horizontal=start_horizontal,
 | |
|             starting_vertical=start_vertical
 | |
|         )
 | |
|         #self.app.mount(new_window)
 | |
|         return new_window
 | |
|     
 | |
|     def create_setting(self, section_name: str, option_name: str, description: str, option_type: str, default_value: Any, on_changed_func = None):
 | |
|         self.app.config_handler.define_plugin_setting(section_name, option_name, description, option_type, default_value, on_changed_func)
 | |
| 
 | |
|     # endregion
 | |
| 
 | |
|     @work
 | |
|     async def find_plugins(self):
 | |
|         log = self.query_one(RichLog)
 | |
| 
 | |
|         no_errors = True
 | |
| 
 | |
|         log.write("[b]Setting up LUA runtime..[/]")
 | |
|         self.lua_runtime = lua51.LuaRuntime()
 | |
| 
 | |
|         lua_runtime_stuff = {
 | |
|             "ui": {
 | |
|                 "notify": self.fake_notify,
 | |
|                 "runAction": self.fake_run_action,
 | |
|                 "addBind": self.add_bind,
 | |
|                 "createAction": self.create_action,
 | |
|                 "createWindow": self.create_window,
 | |
|                 "createWidget": self.create_widget,
 | |
|                 "app": self.app,
 | |
|                 "onMessage": self.run_on_message
 | |
|             },
 | |
|             "config": {
 | |
|                 "get": self.app.config_handler.get,
 | |
|                 "defineSetting": self.create_setting
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         log.write("[b]Finding plugins...[/]")
 | |
| 
 | |
|         # Find all plugins (they're just in the plugins folder for now)
 | |
|         folders = [
 | |
|             os.path.join(os.path.dirname(__file__), "plugins")
 | |
|         ]
 | |
|         # path to the folder of all correctly formatted plugins
 | |
|         plugin_paths = []
 | |
| 
 | |
|         for folder in folders:
 | |
|             log.write(f"Searching {folder}...")
 | |
|             plugin_folders = os.listdir(folder)
 | |
| 
 | |
|             for plugin_folder in plugin_folders:
 | |
|                 plugin_folder = os.path.join(folder, plugin_folder)
 | |
| 
 | |
|                 if not os.path.isdir(plugin_folder):
 | |
|                     log.write(f"[d]Ignoring {plugin_folder} because it is not a folder.[/]")
 | |
|                     no_errors = False
 | |
|                     continue
 | |
| 
 | |
|                 if not os.path.isdir(os.path.join(plugin_folder, "lua")):
 | |
|                     log.write(f"[d]Ignoring {plugin_folder} because it has no \"lua\" folder.[/]")
 | |
|                     no_errors = False
 | |
|                     continue
 | |
| 
 | |
|                 if not os.path.isfile(os.path.join(plugin_folder, "plugin.json")):
 | |
|                     log.write(f"[d]Ignoring {plugin_folder} because it has no plugin.json file.[/]")
 | |
|                     no_errors = False
 | |
|                     continue
 | |
| 
 | |
|                 with open(os.path.join(plugin_folder, "plugin.json"), "r") as f:
 | |
|                     try:
 | |
|                         plugin_json = json.loads(f.read())
 | |
|                         plugin_json["name"]
 | |
|                         plugin_json["version"]
 | |
|                         plugin_json["author"]
 | |
|                         plugin_json["dependencies"]
 | |
|                     except UnicodeDecodeError:
 | |
|                         log.write(f"[d]Ignoring {plugin_folder} because its plugin.json file is unreadable.[/]")
 | |
|                         no_errors = False
 | |
|                         continue
 | |
|                     except json.JSONDecodeError:
 | |
|                         log.write(f"[d]Ignoring {plugin_folder} because its plugin.json file is malformed.[/]")
 | |
|                         no_errors = False
 | |
|                         continue
 | |
|                     except KeyError as e:
 | |
|                         log.write(f"[d]Ignoring {plugin_folder} because its plugin.json file is missing the field {e}.")
 | |
|                         no_errors = False
 | |
|                         continue
 | |
|                     except Exception as e:
 | |
|                         log.write(f"[d]Ignoring {plugin_folder} because of error: {e}.[/]")
 | |
|                         no_errors = False
 | |
|                         continue
 | |
| 
 | |
| 
 | |
|                 log.write(f"[b green]FOUND[/] {plugin_json['name']} ({plugin_json['version']})")
 | |
|                 for lua_file in os.listdir(os.path.join(plugin_folder, "lua")):
 | |
|                     lua_file_path = os.path.join(plugin_folder, f"lua/{lua_file}")
 | |
|                     
 | |
|                     with open(lua_file_path, "r") as f:
 | |
|                         code = f.read()
 | |
|                         sandbox = self.lua_runtime.eval("{}")
 | |
|                         setfenv = self.lua_runtime.eval("setfenv")
 | |
| 
 | |
|                         sandbox.print = self.log #self.lua_runtime.globals().print
 | |
|                         sandbox.math = self.lua_runtime.globals().math
 | |
|                         sandbox.string = self.lua_runtime.globals().string
 | |
|                         sandbox.tostring = self.lua_runtime.globals().tostring
 | |
|                         sandbox.tonumber = self.lua_runtime.globals().tonumber
 | |
|                         sandbox.berry = lua_runtime_stuff
 | |
| 
 | |
|                         setfenv(0, sandbox)
 | |
|                         try:
 | |
|                             executed_code = self.lua_runtime.execute(code)
 | |
|                         except lua51.LuaError as e:
 | |
|                             log.write(f"[b red]Error in {lua_file_path}:\n" + '\n'.join(format_exception(e)) + "[/]")
 | |
|                             self.notify("There was Lua error while loading one your installed plugins. Check the Plugin Loader window for more details.", title="Lua Error", severity="error", timeout=10)
 | |
|                             no_errors = False
 | |
|                             continue
 | |
|                         
 | |
|                         try:
 | |
|                             if executed_code.run:
 | |
|                                 try:
 | |
|                                     executed_code.run()
 | |
|                                 except Exception as e:
 | |
|                                     log.write(f"[b red]Runtime error in {lua_file_path}:\n" + "\n".join(format_exception(e)) + "[/]")
 | |
|                                     self.notify("A plugin has created an error. Check the log for more details.", title="Lua Error", severity="error", timeout=10)
 | |
|                                     no_errors = False
 | |
|                                     continue
 | |
|                         except:
 | |
|                             log.write(f"[b red]Error in {lua_file_path}: file returned invalid value, you should only return a table.")
 | |
|                             self.notify("A plugin has created an error. Check the log for more details.", title="Lua Error", severity="error", timeout=10)
 | |
|                             no_errors = False
 | |
|                             continue
 | |
|                 
 | |
|                 plugin_paths.append(plugin_folder)
 | |
| 
 | |
|         log.write("\n[b]Done loading plugins![/]")
 | |
|         if no_errors and int(self.app.config_handler.get("plugins", "log_timeout")) != -1:
 | |
|             log.write(f'[d]Window will automatically close in {self.app.config_handler.get("plugins", "log_timeout")} seconds.[/]')
 | |
|             await asyncio.sleep(int(self.app.config_handler.get("plugins", "log_timeout")))
 | |
|             self.close_window()
 | |
| 
 | |
|     async def on_mount(self):
 | |
|         if bool(int(self.app.config_handler.get("plugins", "log"))) == False:
 | |
|             self.display = "none"
 | |
| 
 | |
|         self.find_plugins()
 | |
|         
 | |
| 
 | |
|     def compose(self):
 | |
|         yield RichLog(markup=True, id="plugins-log", wrap=True) |