Files
mesh-network-project/desktop_app/api/node.py

80 lines
2.4 KiB
Python
Raw Normal View History

2026-03-08 07:40:27 +11:00
from bleak import BleakScanner, BleakClient
import asyncio
import struct
from dataclasses import dataclass
from enum import Enum
2026-03-07 10:38:18 +11:00
NODE_BLUETOOTH_SERVICE_UUID = "E1898FF7-5063-4441-a6eb-526073B00001"
2026-03-08 07:40:27 +11:00
NODE_BLUETOOTH_RX_UUID = "E1898FF7-5063-4441-a6eb-526073B00002"
NODE_BLUETOOTH_TX_UUID = "E1898FF7-5063-4441-a6eb-526073B00003"
2026-03-07 10:38:18 +11:00
class BluetoothPacketType(Enum):
SEND_MESSAGE = 1
GET_INFO = 2
@dataclass
class BluetoothPacket:
packet_type: BluetoothPacketType
data: dict
2026-03-07 10:38:18 +11:00
class MeshNode:
def __init__(self, client: BleakClient, app):
2026-03-07 14:22:06 +11:00
self.client = client
self.app = app
self.name = "None"
self.rx_queue = asyncio.Queue()
async def get_client_info(self):
await self.transmit_bluetooth_packet(BluetoothPacket(
BluetoothPacketType.GET_INFO,
None
))
def serialize_msg(self, packet: BluetoothPacket):
final = bytes()
final += struct.pack("<B", packet.packet_type.value)
final += repr(packet.data).encode()
return final
async def transmit_bluetooth_packet(self, packet: BluetoothPacket):
await self.client.write_gatt_char(NODE_BLUETOOTH_TX_UUID, self.serialize_msg(packet))
2026-03-07 10:38:18 +11:00
async def receive(self):
return await self.rx_queue.get()
async def ping(self):
await self.client.write_gatt_char(NODE_BLUETOOTH_TX_UUID, b"ping")
def notification_received(self, sender, data):
self.app.log(f"Receive: {data}")
self.rx_queue.put_nowait(data)
async def discover(app):
2026-03-07 10:38:18 +11:00
"""Find a mesh node via Bluetooth
"""
# find a nearby device that is a node
2026-03-07 14:22:06 +11:00
devices = await BleakScanner.discover(service_uuids=[NODE_BLUETOOTH_SERVICE_UUID], timeout=5)
2026-03-07 10:38:18 +11:00
# no device was found
if len(devices) == 0:
2026-03-07 14:22:06 +11:00
return None
# get the device using its address
2026-03-07 14:22:06 +11:00
device = await BleakScanner.find_device_by_address(devices[0].address, timeout=5)
client = BleakClient(device)
# connect to the device
await client.connect()
# make it so we can receive data
new_node = MeshNode(client, app)
await client.start_notify(NODE_BLUETOOTH_RX_UUID, new_node.notification_received)
# request info about the client
await new_node.get_client_info()
return new_node