Compare commits

...

2 Commits

7 changed files with 158 additions and 24 deletions

View File

@@ -1,3 +1,5 @@
class Channel: class Channel:
def __init__(self, name: str, key: str): def __init__(self, name: str, key: str, sf: int = 9):
self.name = name self.name = name
self.key = key
self.sf = sf

View File

@@ -1,23 +1,80 @@
from bleak import BleakScanner, BleakClient from bleak import BleakScanner, BleakClient
import asyncio
import struct
from dataclasses import dataclass
from enum import Enum
NODE_BLUETOOTH_SERVICE_UUID = "E1898FF7-5063-4441-a6eb-526073B00001" NODE_BLUETOOTH_SERVICE_UUID = "E1898FF7-5063-4441-a6eb-526073B00001"
NODE_BLUETOOTH_RX_UUID = "E1898FF7-5063-4441-a6eb-526073B00002" NODE_BLUETOOTH_RX_UUID = "E1898FF7-5063-4441-a6eb-526073B00002"
NODE_BLUETOOTH_TX_UUID = "E1898FF7-5063-4441-a6eb-526073B00003" NODE_BLUETOOTH_TX_UUID = "E1898FF7-5063-4441-a6eb-526073B00003"
class BluetoothPacketType(Enum):
SEND_MESSAGE = 1
GET_INFO = 2
@dataclass
class BluetoothPacket:
packet_type: BluetoothPacketType
data: dict
class MeshNode: class MeshNode:
def __init__(self, client: BleakClient): def __init__(self, client: BleakClient, app):
self.client = client self.client = client
self.client.pair() self.app = app
self.name = "None"
async def discover(): self.rx_queue = asyncio.Queue()
async def get_client_info(self):
await self.transmit_bluetooth_packet(BluetoothPacket(
BluetoothPacketType.GET_INFO,
None
))
def serialize_msg(self, packet: BluetoothPacket):
final = bytes()
final += struct.pack("<B", packet.packet_type.value)
final += repr(packet.data).encode()
return final
async def transmit_bluetooth_packet(self, packet: BluetoothPacket):
await self.client.write_gatt_char(NODE_BLUETOOTH_TX_UUID, self.serialize_msg(packet))
async def receive(self):
return await self.rx_queue.get()
async def ping(self):
await self.client.write_gatt_char(NODE_BLUETOOTH_TX_UUID, b"ping")
def notification_received(self, sender, data):
self.app.log(f"Receive: {data}")
self.rx_queue.put_nowait(data)
async def discover(app):
"""Find a mesh node via Bluetooth """Find a mesh node via Bluetooth
""" """
# find a nearby device that is a node
devices = await BleakScanner.discover(service_uuids=[NODE_BLUETOOTH_SERVICE_UUID], timeout=5) devices = await BleakScanner.discover(service_uuids=[NODE_BLUETOOTH_SERVICE_UUID], timeout=5)
# no device was found # no device was found
if len(devices) == 0: if len(devices) == 0:
return None return None
# get the device using its address
device = await BleakScanner.find_device_by_address(devices[0].address, timeout=5) device = await BleakScanner.find_device_by_address(devices[0].address, timeout=5)
return MeshNode(BleakClient(device)) client = BleakClient(device)
# connect to the device
await client.connect()
# make it so we can receive data
new_node = MeshNode(client, app)
await client.start_notify(NODE_BLUETOOTH_RX_UUID, new_node.notification_received)
# request info about the client
await new_node.get_client_info()
return new_node

View File

@@ -1,6 +1,7 @@
from textual.app import App from textual.app import App
from ui.screens.pair_screen import PairScreen from ui.screens.pair_screen import PairScreen
from api.node import MeshNode from api.node import MeshNode
from api.channel import Channel
class mesh(App): class mesh(App):
@@ -9,6 +10,9 @@ class mesh(App):
def __init__(self, driver_class = None, css_path = None, watch_css = False, ansi_color = False): def __init__(self, driver_class = None, css_path = None, watch_css = False, ansi_color = False):
super().__init__(driver_class, css_path, watch_css, ansi_color) super().__init__(driver_class, css_path, watch_css, ansi_color)
self.mesh_node: MeshNode = None self.mesh_node: MeshNode = None
# key = channel name
# value = channel
self.channels: dict[str, Channel]
def on_ready(self): def on_ready(self):
self.push_screen(PairScreen()) self.push_screen(PairScreen())

View File

@@ -3,6 +3,7 @@ from textual.widgets import Header, Footer, ContentSwitcher
from ui.widgets.home_sidebar import HomeSidebar from ui.widgets.home_sidebar import HomeSidebar
from ui.widgets.home_info import HomeInfo from ui.widgets.home_info import HomeInfo
from ui.widgets.channels_list import ChannelsList from ui.widgets.channels_list import ChannelsList
from ui.widgets.chat_window import ChatWindow
class MainScreen(Screen): class MainScreen(Screen):
@@ -13,5 +14,6 @@ class MainScreen(Screen):
with ContentSwitcher(initial="home-info"): with ContentSwitcher(initial="home-info"):
yield HomeInfo(id="home-info") yield HomeInfo(id="home-info")
yield ChannelsList(id="channels-list") yield ChannelsList(id="channels-list")
yield ChatWindow(id="chat-window")
yield Footer() yield Footer()

View File

@@ -15,7 +15,7 @@ class PairScreen(Screen):
async def connect_to_node(self, is_retry = False): async def connect_to_node(self, is_retry = False):
if not is_retry: if not is_retry:
self.notify("This may take a moment...", title="Discovering nearby nodes...") self.notify("This may take a moment...", title="Discovering nearby nodes...")
self.app.mesh_node = await MeshNode.discover() self.app.mesh_node = await MeshNode.discover(self.app)
if self.app.mesh_node == None: if self.app.mesh_node == None:
self.notify("Check your node is powered on and nearby.\nRetrying...", title="Failed to find a nearby node!", severity="warning") self.notify("Check your node is powered on and nearby.\nRetrying...", title="Failed to find a nearby node!", severity="warning")

View File

@@ -1,5 +1,5 @@
from textual.containers import VerticalScroll, Vertical, HorizontalGroup from textual.containers import VerticalScroll, Vertical, HorizontalGroup
from textual.widgets import Static, Button, Rule from textual.widgets import Static, Button, Rule, ContentSwitcher
from api.channel import Channel from api.channel import Channel
@@ -37,16 +37,19 @@ class ChannelsList(Vertical):
} }
""" """
def on_button_pressed(self, event: ChannelView.Pressed):
self.screen.query_one(ContentSwitcher).current = "chat-window"
def compose(self): def compose(self):
with VerticalScroll(): with VerticalScroll():
yield Static("channels", classes="banner") yield Static("channels", classes="banner")
yield ChannelView(Channel("test channel", "AQ==")) yield ChannelView(Channel("test channel 1", "AQ=="))
yield ChannelView(Channel("test channel", "AQ==")) yield ChannelView(Channel("test channel 2", "AQ=="))
yield ChannelView(Channel("test channel", "AQ==")) yield ChannelView(Channel("test channel 3", "AQ=="))
yield ChannelView(Channel("test channel", "AQ==")) yield ChannelView(Channel("test channel 4", "AQ=="))
yield ChannelView(Channel("test channel", "AQ==")) yield ChannelView(Channel("test channel 5", "AQ=="))
yield ChannelView(Channel("test channel", "AQ==")) yield ChannelView(Channel("test channel 6", "AQ=="))
yield ChannelView(Channel("test channel", "AQ==")) yield ChannelView(Channel("test channel 7", "AQ=="))
yield Rule() yield Rule()

View File

@@ -1,16 +1,82 @@
from textual.containers import Vertical, VerticalScroll, HorizontalGroup from textual.containers import Vertical, VerticalScroll, VerticalGroup, HorizontalGroup
from textual.widgets import Input, Button, Static from textual.widgets import Input, Button, Static
from api.message import Message
from api.node import MeshNode
class Message(Static): class MessageView(VerticalGroup):
def __init__(self, message): DEFAULT_CSS = """
MessageView {
margin-bottom: 1;
#message-text {
background: $surface;
padding: 1;
width: auto;
max-width: 25;
}
#triangle {
color: $surface;
width: auto;
}
&.right {
align-horizontal: right;
}
}
"""
def __init__(self, message: Message):
super().__init__() super().__init__()
self.message = message
if self.message.sender != self.app.mesh_node:
self.notify("right side")
self.add_class("right")
def compose(self):
user_name = self.message.sender.name
if self.message.sender == self.app.mesh_node:
user_name += " (You)"
yield Static(f"[b u cyan]{user_name}[/]\n{self.message.content}", id="message-text")
yield Static("", id="triangle")
class ChatWindow(Vertical): class ChatWindow(Vertical):
def compose(self): DEFAULT_CSS = """
with VerticalScroll(): ChatWindow {
pass #message-history {
padding: 1;
}
with HorizontalGroup(): #message-box {
yield Input(placeholder="Send a message") margin-right: 2;
yield Button("", flat=True) align: left middle;
#message-input {
margin: 1;
width: 90%;
}
#send-btn {
max-width: 10%;
margin: 1 0;
}
}
}
"""
def compose(self):
with VerticalScroll(id="message-history"):
fake = MeshNode(None, self.app)
fake.name = "billy"
yield MessageView(Message("hi!!!", fake))
yield MessageView(Message("hi!!!", fake))
yield MessageView(Message("hi!!!", self.app.mesh_node))
with HorizontalGroup(id="message-box"):
yield Input(placeholder="Send a message", id="message-input")
yield Button("", flat=True, id="send-btn")