mirror of
https://github.com/B00mb0x/LoRaWAN-Stack-for-Micropython.git
synced 2025-08-03 20:00:04 +02:00
462 lines
No EOL
16 KiB
Python
462 lines
No EOL
16 KiB
Python
"""
|
|
Library for SX1262 chipset
|
|
|
|
This library controls the SX1262chipset.
|
|
"""
|
|
import utime
|
|
# SX1262 commands
|
|
_CLEAR_IRQ = const(0x02)
|
|
_CLEAR_DEV_ERRS = const(0x07)
|
|
_SET_DIO_IRQ = const(0x08)
|
|
_WRITE_REG = const(0x0D)
|
|
_WRITE_BUFFER = const(0x0E)
|
|
_GET_IRQ_STATUS = const(0x12)
|
|
_GET_RX_BUFFER_STATUS = const(0x13)
|
|
_GET_PKT_STATUS = const(0x14)
|
|
_GET_DEV_ERRS = const(0x17)
|
|
_READ_REGISTER = const(0x1D)
|
|
_READ_BUFFER = const(0x1E)
|
|
_SET_STANDBY = const(0x80)
|
|
_SET_RX = const(0x82)
|
|
_SET_TX = const(0x83)
|
|
_SET_SLEEP = const(0x84)
|
|
_SET_RF_FREQ = const(0x86)
|
|
_SET_PACKET_TYPE = const(0x8A)
|
|
_SET_MOD_PARAMS = const(0x8B)
|
|
_SET_PKT_PARAMS = const(0x8C)
|
|
_SET_TX_PARAMS = const(0x8E)
|
|
_SET_BUFFER_BASE_ADDRESS = const(0x8F)
|
|
_SET_PA_CONFIG = const(0x95)
|
|
_SET_DIO3_TXCO_CTRL = const(0x97)
|
|
_CAL_IMG = const(0x98)
|
|
_SET_DIO2_RFSWITCH = const(0x9D)
|
|
_GET_STATUS = const(0xC0)
|
|
|
|
#Errors
|
|
_ERR_CMD_TIMEOUT = const(0x06)
|
|
_ERR_CMD_PROCESSING = const(0x08)
|
|
_ERR_FAIL_EXEC_CMD = const(0x0A)
|
|
|
|
class Transceiver:
|
|
"""Transceiver class
|
|
|
|
Here all functions regarding the transceiver are implemented
|
|
"""
|
|
|
|
def __init__(self,spi,cs,rst,busy,dio1):
|
|
"""
|
|
Initializes SX1262
|
|
|
|
Parameters
|
|
----------
|
|
spi : spi object
|
|
Micropython SPI object for communication with SX1276
|
|
cs : pin object
|
|
Micropython Pin object for chip select of SX1276
|
|
rst : pin object
|
|
Micropython Pin object for resetting SX1276
|
|
busy : pin object
|
|
Micropython Pin object connected to busy
|
|
"""
|
|
#variables
|
|
self.spi = spi
|
|
self.cs = cs
|
|
self.rst = rst
|
|
self.busy = busy
|
|
self.dio1 = dio1
|
|
|
|
#set pin states
|
|
self.rst.on()
|
|
self.cs.on()
|
|
|
|
#reset transceiver
|
|
self.rst.off()
|
|
utime.sleep_ms(1)
|
|
self.rst.on()
|
|
|
|
#Enter Standby_RC
|
|
self.write(bytes([_SET_STANDBY, 0x00]))
|
|
|
|
#Set LoRa mode
|
|
self.write(bytes([_SET_PACKET_TYPE, 0x01]))
|
|
|
|
self.write(bytes([_SET_DIO3_TXCO_CTRL, 0x02, 0x0, 0x02, 0x80])) #1.8 V for TXCO, delay: 640*15.625us=10ms
|
|
self.write(bytes([_SET_DIO2_RFSWITCH, 0x01]))#Enable ANT_SW
|
|
#_SET_REGULATOR_MODE = const(0x96)
|
|
#self.write(bytes([_SET_REGULATOR_MODE, 0x01])) #doesnt help
|
|
|
|
self.settings()
|
|
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
|
|
|
|
def settings(self, power=17, sf=7, bw=125, cr=4/5, syn_word=0x12, inv_iq=False, crc=True, exp_header=True):
|
|
"""
|
|
Parameters
|
|
----------
|
|
power : int
|
|
Transmission power in dBm, range: -9dBm to 22dBm. Default: 17
|
|
sf : int
|
|
Spreading factor, range 7 to 12. Default: 7
|
|
bw : int, float
|
|
Bandwidth of LoRa signal in kHz. Default: 125kHz. Valid values: 7.8, 10.4, 15.6, 20.8, 31.25, 41.7, 62.5, 125, 250, 500
|
|
cr : float
|
|
Code rate of LoRa signal. Default: 4/5. Valid values: 4/5, 4/6, 4/7, 4/8
|
|
syn_word : int
|
|
LoRa sync word. One or two byte values possible, for one byte, a 4 is appended after each hex-digit. Default: 0x1424. For LoRaWAN use 0x3444
|
|
inv_iq : bool
|
|
Specify if I and Q should be inverted or not. Default: False
|
|
crc : bool
|
|
Enable CRC. Default: True
|
|
exp_header : bool
|
|
Explicit Header mode, if False Implicit Header mode is used.
|
|
"""
|
|
|
|
self.write(bytes([_SET_STANDBY, 0x00]))
|
|
utime.sleep_ms(1)
|
|
|
|
if crc:
|
|
self.crc = 0x01
|
|
else:
|
|
self.crc = 0x00
|
|
|
|
if exp_header:
|
|
self.exp_header = 0x00
|
|
else:
|
|
self.exp_header = 0x01
|
|
|
|
#power mode
|
|
self.write(bytes([_SET_PA_CONFIG,0x04,0x07,0x00,0x01])) #see p.79
|
|
|
|
#set tx params
|
|
if not (power >= -9 and power <= 22 and isinstance(power, int)):
|
|
power = 17
|
|
if power < 0:
|
|
power = power+0x100 #2s complement
|
|
self.write(bytes([_SET_TX_PARAMS, power, 0x04])) #ramp time 200us
|
|
|
|
#define payload storage
|
|
self.write(bytes([_SET_BUFFER_BASE_ADDRESS, 0x00, 0x00]))
|
|
|
|
#Workaround PA clamping
|
|
val = self.read(bytes([_READ_REGISTER, 0x08, 0xD8, 0x00,0x00]))[4]
|
|
self.write(bytes([_WRITE_REG, 0x08, 0xD8, val|0x1E]))
|
|
|
|
#optimize Inverted IQ
|
|
val = self.read(bytes([_READ_REGISTER, 0x07, 0x36, 0x00, 0x00]))[4]
|
|
print(val)
|
|
if inv_iq:
|
|
self.write(bytes([_WRITE_REG, 0x07, 0x36, val&0xFB]))
|
|
self.inv_iq = 0x01
|
|
else:
|
|
self.write(bytes([_WRITE_REG, 0x07, 0x36, val|0x04]))
|
|
self.inv_iq = 0x00
|
|
|
|
#set mod parameters
|
|
if sf >= 7 and sf <=12 and isinstance(sf,int):
|
|
sf = sf
|
|
else:
|
|
sf = 7
|
|
if bw in [7.8, 10.4, 15.6, 20.8, 31.25, 41.7, 62.5, 125, 250, 500]:
|
|
if bw == 500: #improve modulation quality
|
|
val = self.read(bytes([_READ_REGISTER, 0x08, 0x89, 0x00,0x00]))[4]
|
|
self.write(bytes([_WRITE_REG, 0x08, 0x89, val&0xFB]))
|
|
else:
|
|
val = self.read(bytes([_READ_REGISTER, 0x08, 0x89, 0x00,0x00]))[4]
|
|
self.write(bytes([_WRITE_REG, 0x08, 0x89, val|0x04]))
|
|
t_sym = 2**sf/bw
|
|
bw = [7.8,15.6,31.25,62.5,125,250,500,float("nan"),10.4,20.8,41.7].index(bw)
|
|
else:
|
|
val = self.read(bytes([_READ_REGISTER, 0x08, 0x89, 0x00,0x00]))[4]
|
|
self.write(bytes([_WRITE_REG, 0x08, 0x89, val|0x04]))
|
|
t_sym = 2**sf/125
|
|
bw = 0x04 #125kHz
|
|
if cr in [4/5, 4/6, 4/7, 4/8]:
|
|
cr = [4/5, 4/6, 4/7, 4/8].index(cr) + 1
|
|
else:
|
|
cr = 0x01 #4/5
|
|
if t_sym < 16.38: #low datarate optimization
|
|
self.write(bytes([_SET_MOD_PARAMS, sf, bw, cr,0x00]))
|
|
else:
|
|
self.write(bytes([_SET_MOD_PARAMS, sf, bw, cr,0x01]))
|
|
|
|
#set sync word
|
|
if isinstance(syn_word,int):
|
|
if syn_word <= 0xFF:
|
|
sw = bytes([(syn_word&0xF0)+4,((syn_word&0x0F)<<4)+4]) #append 4s according to
|
|
elif syn_word <= 0xFFFF:
|
|
sw = int.to_bytes(syn_word,2,'big')
|
|
else:
|
|
sw = bytes([0x14, 0x24])
|
|
self.write(bytes([_WRITE_REG, 0x07, 0x40]) + sw)
|
|
else:
|
|
self.write(bytes([_WRITE_REG, 0x07, 0x40, 0x14, 0x24]))
|
|
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
|
|
|
|
|
|
def set_freq(self, freq):
|
|
"""
|
|
Parameters
|
|
----------
|
|
freq : float, int
|
|
Transmission frequency in MHz.
|
|
"""
|
|
self.write(bytes([_SET_STANDBY, 0x00]))
|
|
utime.sleep_ms(1)
|
|
|
|
#calibrate
|
|
if freq > 430 and freq < 440:
|
|
self.write(bytes([_CAL_IMG, 0x6B,0x6F]))
|
|
elif freq > 470 and freq < 510:
|
|
self.write(bytes([_CAL_IMG, 0x75,0x81]))
|
|
elif freq > 779 and freq < 787:
|
|
self.write(bytes([_CAL_IMG, 0xC1,0xC5]))
|
|
elif freq > 863 and freq < 870:
|
|
self.write(bytes([_CAL_IMG, 0xD7,0xDB]))
|
|
elif freq > 902 and freq < 928:
|
|
self.write(bytes([_CAL_IMG, 0xE1,0xE9]))
|
|
else:
|
|
raise ValueError("Invalid Frequency")
|
|
|
|
#Set freq
|
|
rf_freq = int(freq*(2**25)/32)
|
|
self.write(bytes([_SET_RF_FREQ]) + int.to_bytes(rf_freq,4,'big')) #Datasheet p.85 (868.1*2**25)/32=910268826=0x3641999A
|
|
utime.sleep_ms(1)
|
|
|
|
|
|
def send(self,msg,freq=868.1, blocking=True):
|
|
"""
|
|
Parameters
|
|
----------
|
|
msg : str,bytes
|
|
Message to be sent via LoRa.
|
|
freq : float, int
|
|
Transmission frequency in MHz. Default: 868.1 MHz
|
|
blocking : bool
|
|
Waits for TxDone flag and clears Irq if True. Default: True
|
|
"""
|
|
if (isinstance(msg,str) or isinstance(msg,bytes)) and (isinstance(freq,float) or isinstance(freq,int)):
|
|
self.set_freq(freq)
|
|
|
|
self.write(bytes([_CLEAR_IRQ, 0x03, 0xFF])) #clear possibly remaining IRQ flags
|
|
self.write(bytes([_CLEAR_DEV_ERRS, 0x00, 0x00]))
|
|
|
|
if len(msg) > 255:
|
|
msg = msg[:255]
|
|
|
|
if isinstance(msg,str):
|
|
msg = msg.encode()
|
|
|
|
#write payload
|
|
self.write(bytes([_WRITE_BUFFER, 0x00]) + msg)
|
|
utime.sleep_ms(1)
|
|
|
|
#define frame format
|
|
self.write(bytes([_SET_PKT_PARAMS, 0x00,0x08,self.exp_header,len(msg),self.crc,self.inv_iq])) #preamble length 0x00 0x08,explicit header 0x00, payload length, crc off 0x00, iq invert false 0x00
|
|
|
|
#config dio
|
|
self.write(bytes([_SET_DIO_IRQ, 0x02, 0x01, 0x02, 0x01, 0x00, 0x00, 0x00, 0x00])) #IRQ Mask, DIO1 Mask, DIO2 Mask, DIO3 Mask
|
|
|
|
#transmit
|
|
utime.sleep_ms(10)
|
|
self.write(bytes([_SET_TX, 0x62, 0x98, 0x00])) #3.840.000 for 1 min timeout
|
|
utime.sleep_ms(100)
|
|
tx_err = False
|
|
if blocking:
|
|
while self.dio1.value() == 0:
|
|
utime.sleep_us(100)
|
|
if self.read(bytes([_GET_STATUS,0x00]))[1]&0x0E in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
|
|
self.write(bytes([_SET_STANDBY, 0x00]))
|
|
utime.sleep_ms(1)
|
|
tx_err = True
|
|
print("TX Error")
|
|
err=self.read(bytes([_GET_DEV_ERRS, 0x00, 0x00, 0x00]))[-1]
|
|
if err&0x20:
|
|
print("XOSC Start Error")
|
|
if err&0x40:
|
|
print("PLL Lock Error")
|
|
self.write(bytes([_CLEAR_DEV_ERRS, 0x00, 0x00]))
|
|
break
|
|
|
|
self.write(bytes([_CLEAR_IRQ, 0x02, 0x01]))
|
|
utime.sleep_ms(1)
|
|
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
|
|
|
|
|
|
def receive(self, freq=868.1, timeout=0):
|
|
"""
|
|
Receive LoRa message
|
|
|
|
Parameters
|
|
----------
|
|
freq : float
|
|
Frequency to listen for new messages in MHz. Default: 868.1 MHz
|
|
timeout : int
|
|
Timeout in seconds, put 0 to disable timeout. Default: 0
|
|
|
|
Returns
|
|
-----
|
|
payload : bytes
|
|
Received messageer(SX126X_REG_RTC_EVENT, rtcEvent_mv, 1)
|
|
rtcEvent_mv[0] |= 0x02
|
|
self.writeRegister(SX126X_REG_RTC_EVENT, rtcEvent, 1)
|
|
|
|
snr : float
|
|
SNR in dB
|
|
rssi : float
|
|
RSSI in dBm
|
|
"""
|
|
self.write(bytes([_SET_STANDBY, 0x00]))
|
|
utime.sleep_ms(1)
|
|
self.write(bytes([_CLEAR_IRQ, 0x02, 0x43])) #clear possibly remaining RXDone, TXDone or Timeout flags
|
|
self.set_freq(freq)
|
|
#packet params
|
|
self.write(bytes([_SET_PKT_PARAMS, 0x00, 0x08, self.exp_header, 0xFF, self.crc, self.inv_iq])) #preamble length 0x00 0x08,explicit header 0x00, max. payload length, crc off 0x00, iq invert false 0x00
|
|
#set dio irq params
|
|
self.write(bytes([_SET_DIO_IRQ, 0x02, 0x02, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00])) #IRQ Mask, DIO1 Mask, DIO2 Mask, DIO3 Mask
|
|
|
|
#set rx
|
|
to_factor = 0x000000 #No timeout
|
|
self.write(bytes([_SET_RX, 0x00, 0x00, 0x00]))
|
|
t_rx = utime.ticks_ms()
|
|
print("Receiving...")
|
|
#wait for rxdone
|
|
rx_err=False
|
|
while self.dio1.value() == 0:
|
|
utime.sleep_us(100)
|
|
#print(utime.ticks_diff(utime.ticks_ms(),t_rx)/1000)
|
|
if utime.ticks_diff(utime.ticks_ms(),t_rx)/1000 > timeout and timeout > 0:
|
|
print("Start: " + str(t_rx) + " End: " + str(utime.ticks_ms()))
|
|
rx_err = True
|
|
break
|
|
#if self.read(bytes([_GET_STATUS,0x00]))[1]&0x0E in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
|
|
# rx_err = True
|
|
# print("Error while receiving")
|
|
# break
|
|
#if rx done, getirqstatus to check crc
|
|
utime.sleep_ms(1)
|
|
irq_status = self.read([_GET_IRQ_STATUS, 0x00, 0x00, 0x00])
|
|
print(self.read(bytes([_GET_STATUS,0x00])))
|
|
print("IRQ: ",irq_status)
|
|
if not (irq_status[3] & 0x40) and not(irq_status[2] & 0x02) and not rx_err: #if crc correct and no timeout
|
|
#GetRxBufferStatus
|
|
buffer_status = self.read(bytes([_GET_RX_BUFFER_STATUS, 0x00, 0x00, 0x00]))
|
|
payload_len = buffer_status[2]
|
|
buf_start = buffer_status[3]
|
|
#read packet
|
|
payload = self.read(bytes([_READ_BUFFER,buf_start,0x00]) + bytes(payload_len))[3:]
|
|
snr,rssi = self.get_meta()
|
|
else:
|
|
payload = None
|
|
snr = None
|
|
rssi = None
|
|
|
|
#RX Workaround
|
|
#self.write(bytes([_WRITE_REG, 0x09, 0x20, 0x00]))
|
|
#val = self.read(bytes([_READ_REGISTER, 0x09, 0x44, 0x00,0x00]))[4]
|
|
#self.write(bytes([_WRITE_REG, 0x09, 0x44, val|0x02]))
|
|
#clear irq
|
|
self.write(bytes([_CLEAR_IRQ, 0x02, 0x43]))
|
|
self.write(bytes([_SET_STANDBY, 0x00]))
|
|
utime.sleep_ms(1)
|
|
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
|
|
return payload, snr, rssi
|
|
|
|
|
|
def get_meta(self):
|
|
"""
|
|
Get metadata of last received message.
|
|
|
|
Returns
|
|
-----
|
|
snr : float
|
|
SNR in dB
|
|
signal_power : float
|
|
RSSI in dBm
|
|
"""
|
|
pkt_status = self.read(bytes([_GET_PKT_STATUS, 0x00, 0x00, 0x00, 0x00]))
|
|
signal_power = pkt_status[2]/-2 #dBm
|
|
snr_twos = pkt_status[3]
|
|
if snr_twos & 0x80:
|
|
snr_twos -= 0x100
|
|
snr = snr_twos/4 #dB
|
|
rssi = pkt_status[4]/-2 #dB
|
|
return snr, signal_power
|
|
|
|
def write(self,msg, retries=3):
|
|
"""
|
|
Parameters
|
|
----------
|
|
msg : bytes
|
|
Command in bytes to be written via SPI
|
|
retries : int
|
|
number of retries after failed write attempt. Default: 3
|
|
"""
|
|
for i in range(retries): #in case of error try again
|
|
utime.sleep_ms(1)
|
|
self.cs.off()
|
|
utime.sleep_us(150)
|
|
while True:
|
|
utime.sleep_us(100)
|
|
if self.busy.value() == 0:
|
|
break
|
|
self.spi.write(msg)
|
|
utime.sleep_ms(1)
|
|
status = self.spi.read(1)
|
|
utime.sleep_ms(1)
|
|
self.cs.on()
|
|
utime.sleep_us(600)
|
|
if (status[0] & 0x0E) not in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
|
|
break
|
|
else:
|
|
print(str(i) + ": writing failed, trying again")
|
|
|
|
|
|
def read(self,buf, retries=3):
|
|
"""
|
|
Parameters
|
|
----------
|
|
buf : bytes
|
|
Bytes to send while reading.
|
|
retries : int
|
|
number of retries after failed write attempt. Default: 3
|
|
|
|
Returns
|
|
-------
|
|
rx_buf : bytes
|
|
Bytes read
|
|
"""
|
|
tx_buf = bytearray(buf)
|
|
rx_buf = bytearray(len(tx_buf))
|
|
self.cs.off()
|
|
utime.sleep_us(150)
|
|
while True:
|
|
utime.sleep_us(100)
|
|
if self.busy.value() == 0:
|
|
break
|
|
self.spi.write_readinto(tx_buf,rx_buf)
|
|
status = rx_buf[1]
|
|
utime.sleep_ms(1)
|
|
self.cs.on()
|
|
utime.sleep_us(600)
|
|
if (status & 0x0E) in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
|
|
print("reading failed, buf: " + str(buf) + " rx: " + str(rx_buf))
|
|
return bytes(rx_buf)
|
|
|
|
if __name__ == "__main__":
|
|
from machine import SPI, Pin
|
|
import machine
|
|
import utime
|
|
import random
|
|
|
|
spi = machine.SoftSPI(baudrate=400000, sck=9, mosi=10, miso=11)
|
|
cs = Pin(8, Pin.OUT, value=1)
|
|
rst = Pin(12, Pin.OUT, value=1)
|
|
busy = Pin(13, Pin.IN)
|
|
dio1 = Pin(14, Pin.IN)
|
|
|
|
sx1262 = Transceiver(spi, cs, rst, busy, dio1)
|
|
sx1262.settings(power=17, sf=7, bw=125, cr=4/5, syn_word=0x12, inv_iq=False, crc=True, exp_header=True)
|
|
|
|
sx1262.send("Hello World!")
|
|
|
|
while True:
|
|
print(sx1262.receive()) |