Files

269 lines
8.6 KiB
Python
Raw Permalink Normal View History

from _sx126x import *
from sx126x import SX126X
_SX126X_PA_CONFIG_SX1262 = const(0x00)
class SX1262(SX126X):
TX_DONE = SX126X_IRQ_TX_DONE
RX_DONE = SX126X_IRQ_RX_DONE
ADDR_FILT_OFF = SX126X_GFSK_ADDRESS_FILT_OFF
ADDR_FILT_NODE = SX126X_GFSK_ADDRESS_FILT_NODE
ADDR_FILT_NODE_BROAD = SX126X_GFSK_ADDRESS_FILT_NODE_BROADCAST
PREAMBLE_DETECT_OFF = SX126X_GFSK_PREAMBLE_DETECT_OFF
PREAMBLE_DETECT_8 = SX126X_GFSK_PREAMBLE_DETECT_8
PREAMBLE_DETECT_16 = SX126X_GFSK_PREAMBLE_DETECT_16
PREAMBLE_DETECT_24 = SX126X_GFSK_PREAMBLE_DETECT_24
PREAMBLE_DETECT_32 = SX126X_GFSK_PREAMBLE_DETECT_32
STATUS = ERROR
def __init__(self, spi_bus, clk, mosi, miso, cs, irq, rst, gpio):
super().__init__(spi_bus, clk, mosi, miso, cs, irq, rst, gpio)
self._callbackFunction = self._dummyFunction
def begin(self, freq=434.0, bw=125.0, sf=9, cr=7, syncWord=SX126X_SYNC_WORD_PRIVATE,
power=14, currentLimit=60.0, preambleLength=8, implicit=False, implicitLen=0xFF,
crcOn=True, txIq=False, rxIq=False, tcxoVoltage=1.6, useRegulatorLDO=False,
blocking=True):
state = super().begin(bw, sf, cr, syncWord, currentLimit, preambleLength, tcxoVoltage, useRegulatorLDO, txIq, rxIq)
ASSERT(state)
if not implicit:
state = super().explicitHeader()
else:
state = super().implicitHeader(implicitLen)
ASSERT(state)
state = super().setCRC(crcOn)
ASSERT(state)
state = self.setFrequency(freq)
ASSERT(state)
state = self.setOutputPower(power)
ASSERT(state)
state = super().fixPaClamping()
ASSERT(state)
state = self.setBlockingCallback(blocking)
return state
def beginFSK(self, freq=434.0, br=48.0, freqDev=50.0, rxBw=156.2, power=14, currentLimit=60.0,
preambleLength=16, dataShaping=0.5, syncWord=[0x2D, 0x01], syncBitsLength=16,
addrFilter=SX126X_GFSK_ADDRESS_FILT_OFF, addr=0x00, crcLength=2, crcInitial=0x1D0F, crcPolynomial=0x1021,
crcInverted=True, whiteningOn=True, whiteningInitial=0x0100,
fixedPacketLength=False, packetLength=0xFF, preambleDetectorLength=SX126X_GFSK_PREAMBLE_DETECT_16,
tcxoVoltage=1.6, useRegulatorLDO=False,
blocking=True):
state = super().beginFSK(br, freqDev, rxBw, currentLimit, preambleLength, dataShaping, preambleDetectorLength, tcxoVoltage, useRegulatorLDO)
ASSERT(state)
state = super().setSyncBits(syncWord, syncBitsLength)
ASSERT(state)
if addrFilter == SX126X_GFSK_ADDRESS_FILT_OFF:
state = super().disableAddressFiltering()
elif addrFilter == SX126X_GFSK_ADDRESS_FILT_NODE:
state = super().setNodeAddress(addr)
elif addrFilter == SX126X_GFSK_ADDRESS_FILT_NODE_BROADCAST:
state = super().setBroadcastAddress(addr)
else:
state = ERR_UNKNOWN
ASSERT(state)
state = super().setCRC(crcLength, crcInitial, crcPolynomial, crcInverted)
ASSERT(state)
state = super().setWhitening(whiteningOn, whiteningInitial)
ASSERT(state)
if fixedPacketLength:
state = super().fixedPacketLengthMode(packetLength)
else:
state = super().variablePacketLengthMode(packetLength)
ASSERT(state)
state = self.setFrequency(freq)
ASSERT(state)
state = self.setOutputPower(power)
ASSERT(state)
state = super().fixPaClamping()
ASSERT(state)
state = self.setBlockingCallback(blocking)
return state
def setFrequency(self, freq, calibrate=True):
if freq < 150.0 or freq > 960.0:
return ERR_INVALID_FREQUENCY
state = ERR_NONE
if calibrate:
data = bytearray(2)
if freq > 900.0:
data[0] = SX126X_CAL_IMG_902_MHZ_1
data[1] = SX126X_CAL_IMG_902_MHZ_2
elif freq > 850.0:
data[0] = SX126X_CAL_IMG_863_MHZ_1
data[1] = SX126X_CAL_IMG_863_MHZ_2
elif freq > 770.0:
data[0] = SX126X_CAL_IMG_779_MHZ_1
data[1] = SX126X_CAL_IMG_779_MHZ_2
elif freq > 460.0:
data[0] = SX126X_CAL_IMG_470_MHZ_1
data[1] = SX126X_CAL_IMG_470_MHZ_2
else:
data[0] = SX126X_CAL_IMG_430_MHZ_1
data[1] = SX126X_CAL_IMG_430_MHZ_2
state = super().calibrateImage(data)
ASSERT(state)
return super().setFrequencyRaw(freq)
def setOutputPower(self, power):
if not ((power >= -9) and (power <= 22)):
return ERR_INVALID_OUTPUT_POWER
ocp = bytearray(1)
ocp_mv = memoryview(ocp)
state = super().readRegister(SX126X_REG_OCP_CONFIGURATION, ocp_mv, 1)
ASSERT(state)
state = super().setPaConfig(0x04, _SX126X_PA_CONFIG_SX1262)
ASSERT(state)
state = super().setTxParams(power)
ASSERT(state)
return super().writeRegister(SX126X_REG_OCP_CONFIGURATION, ocp, 1)
def setTxIq(self, txIq):
self._txIq = txIq
def setRxIq(self, rxIq):
self._rxIq = rxIq
if not self.blocking:
ASSERT(super().startReceive())
def setPreambleDetectorLength(self, preambleDetectorLength):
self._preambleDetectorLength = preambleDetectorLength
if not self.blocking:
ASSERT(super().startReceive())
def setBlockingCallback(self, blocking, callback=None):
self.blocking = blocking
if not self.blocking:
state = super().startReceive()
ASSERT(state)
if callback != None:
self._callbackFunction = callback
super().setDio1Action(self._onIRQ)
else:
self._callbackFunction = self._dummyFunction
super().clearDio1Action()
return state
else:
state = super().standby()
ASSERT(state)
self._callbackFunction = self._dummyFunction
super().clearDio1Action()
return state
def recv(self, len=0, timeout_en=False, timeout_ms=0):
if not self.blocking:
return self._readData(len)
else:
return self._receive(len, timeout_en, timeout_ms)
def send(self, data):
if not self.blocking:
return self._startTransmit(data)
else:
return self._transmit(data)
def _events(self):
return super().getIrqStatus()
def _receive(self, len_=0, timeout_en=False, timeout_ms=0):
state = ERR_NONE
length = len_
if len_ == 0:
length = SX126X_MAX_PACKET_LENGTH
data = bytearray(length)
data_mv = memoryview(data)
try:
state = super().receive(data_mv, length, timeout_en, timeout_ms)
except AssertionError as e:
state = list(ERROR.keys())[list(ERROR.values()).index(str(e))]
if state == ERR_NONE or state == ERR_CRC_MISMATCH:
if len_ == 0:
length = super().getPacketLength(False)
data = data[:length]
else:
return b'', state
return bytes(data), state
def _transmit(self, data):
if isinstance(data, bytes) or isinstance(data, bytearray):
pass
else:
return 0, ERR_INVALID_PACKET_TYPE
state = super().transmit(data, len(data))
return len(data), state
def _readData(self, len_=0):
state = ERR_NONE
length = super().getPacketLength()
if len_ < length and len_ != 0:
length = len_
data = bytearray(length)
data_mv = memoryview(data)
try:
state = super().readData(data_mv, length)
except AssertionError as e:
state = list(ERROR.keys())[list(ERROR.values()).index(str(e))]
ASSERT(super().startReceive())
if state == ERR_NONE or state == ERR_CRC_MISMATCH:
return bytes(data), state
else:
return b'', state
def _startTransmit(self, data):
if isinstance(data, bytes) or isinstance(data, bytearray):
pass
else:
return 0, ERR_INVALID_PACKET_TYPE
state = super().startTransmit(data, len(data))
return len(data), state
def _dummyFunction(self, *args):
pass
def _onIRQ(self, callback):
events = self._events()
if events & SX126X_IRQ_TX_DONE:
super().startReceive()
self._callbackFunction(events)