LoRaWAN-Stack-for-Micropython/source/lib/SX1262.py
2025-06-24 21:54:38 +02:00

462 lines
No EOL
16 KiB
Python

"""
Library for SX1262 chipset
This library controls the SX1262chipset.
"""
import utime
# SX1262 commands
_CLEAR_IRQ = const(0x02)
_CLEAR_DEV_ERRS = const(0x07)
_SET_DIO_IRQ = const(0x08)
_WRITE_REG = const(0x0D)
_WRITE_BUFFER = const(0x0E)
_GET_IRQ_STATUS = const(0x12)
_GET_RX_BUFFER_STATUS = const(0x13)
_GET_PKT_STATUS = const(0x14)
_GET_DEV_ERRS = const(0x17)
_READ_REGISTER = const(0x1D)
_READ_BUFFER = const(0x1E)
_SET_STANDBY = const(0x80)
_SET_RX = const(0x82)
_SET_TX = const(0x83)
_SET_SLEEP = const(0x84)
_SET_RF_FREQ = const(0x86)
_SET_PACKET_TYPE = const(0x8A)
_SET_MOD_PARAMS = const(0x8B)
_SET_PKT_PARAMS = const(0x8C)
_SET_TX_PARAMS = const(0x8E)
_SET_BUFFER_BASE_ADDRESS = const(0x8F)
_SET_PA_CONFIG = const(0x95)
_SET_DIO3_TXCO_CTRL = const(0x97)
_CAL_IMG = const(0x98)
_SET_DIO2_RFSWITCH = const(0x9D)
_GET_STATUS = const(0xC0)
#Errors
_ERR_CMD_TIMEOUT = const(0x06)
_ERR_CMD_PROCESSING = const(0x08)
_ERR_FAIL_EXEC_CMD = const(0x0A)
class Transceiver:
"""Transceiver class
Here all functions regarding the transceiver are implemented
"""
def __init__(self,spi,cs,rst,busy,dio1):
"""
Initializes SX1262
Parameters
----------
spi : spi object
Micropython SPI object for communication with SX1276
cs : pin object
Micropython Pin object for chip select of SX1276
rst : pin object
Micropython Pin object for resetting SX1276
busy : pin object
Micropython Pin object connected to busy
"""
#variables
self.spi = spi
self.cs = cs
self.rst = rst
self.busy = busy
self.dio1 = dio1
#set pin states
self.rst.on()
self.cs.on()
#reset transceiver
self.rst.off()
utime.sleep_ms(1)
self.rst.on()
#Enter Standby_RC
self.write(bytes([_SET_STANDBY, 0x00]))
#Set LoRa mode
self.write(bytes([_SET_PACKET_TYPE, 0x01]))
self.write(bytes([_SET_DIO3_TXCO_CTRL, 0x02, 0x0, 0x02, 0x80])) #1.8 V for TXCO, delay: 640*15.625us=10ms
self.write(bytes([_SET_DIO2_RFSWITCH, 0x01]))#Enable ANT_SW
#_SET_REGULATOR_MODE = const(0x96)
#self.write(bytes([_SET_REGULATOR_MODE, 0x01])) #doesnt help
self.settings()
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
def settings(self, power=17, sf=7, bw=125, cr=4/5, syn_word=0x12, inv_iq=False, crc=True, exp_header=True):
"""
Parameters
----------
power : int
Transmission power in dBm, range: -9dBm to 22dBm. Default: 17
sf : int
Spreading factor, range 7 to 12. Default: 7
bw : int, float
Bandwidth of LoRa signal in kHz. Default: 125kHz. Valid values: 7.8, 10.4, 15.6, 20.8, 31.25, 41.7, 62.5, 125, 250, 500
cr : float
Code rate of LoRa signal. Default: 4/5. Valid values: 4/5, 4/6, 4/7, 4/8
syn_word : int
LoRa sync word. One or two byte values possible, for one byte, a 4 is appended after each hex-digit. Default: 0x1424. For LoRaWAN use 0x3444
inv_iq : bool
Specify if I and Q should be inverted or not. Default: False
crc : bool
Enable CRC. Default: True
exp_header : bool
Explicit Header mode, if False Implicit Header mode is used.
"""
self.write(bytes([_SET_STANDBY, 0x00]))
utime.sleep_ms(1)
if crc:
self.crc = 0x01
else:
self.crc = 0x00
if exp_header:
self.exp_header = 0x00
else:
self.exp_header = 0x01
#power mode
self.write(bytes([_SET_PA_CONFIG,0x04,0x07,0x00,0x01])) #see p.79
#set tx params
if not (power >= -9 and power <= 22 and isinstance(power, int)):
power = 17
if power < 0:
power = power+0x100 #2s complement
self.write(bytes([_SET_TX_PARAMS, power, 0x04])) #ramp time 200us
#define payload storage
self.write(bytes([_SET_BUFFER_BASE_ADDRESS, 0x00, 0x00]))
#Workaround PA clamping
val = self.read(bytes([_READ_REGISTER, 0x08, 0xD8, 0x00,0x00]))[4]
self.write(bytes([_WRITE_REG, 0x08, 0xD8, val|0x1E]))
#optimize Inverted IQ
val = self.read(bytes([_READ_REGISTER, 0x07, 0x36, 0x00, 0x00]))[4]
print(val)
if inv_iq:
self.write(bytes([_WRITE_REG, 0x07, 0x36, val&0xFB]))
self.inv_iq = 0x01
else:
self.write(bytes([_WRITE_REG, 0x07, 0x36, val|0x04]))
self.inv_iq = 0x00
#set mod parameters
if sf >= 7 and sf <=12 and isinstance(sf,int):
sf = sf
else:
sf = 7
if bw in [7.8, 10.4, 15.6, 20.8, 31.25, 41.7, 62.5, 125, 250, 500]:
if bw == 500: #improve modulation quality
val = self.read(bytes([_READ_REGISTER, 0x08, 0x89, 0x00,0x00]))[4]
self.write(bytes([_WRITE_REG, 0x08, 0x89, val&0xFB]))
else:
val = self.read(bytes([_READ_REGISTER, 0x08, 0x89, 0x00,0x00]))[4]
self.write(bytes([_WRITE_REG, 0x08, 0x89, val|0x04]))
t_sym = 2**sf/bw
bw = [7.8,15.6,31.25,62.5,125,250,500,float("nan"),10.4,20.8,41.7].index(bw)
else:
val = self.read(bytes([_READ_REGISTER, 0x08, 0x89, 0x00,0x00]))[4]
self.write(bytes([_WRITE_REG, 0x08, 0x89, val|0x04]))
t_sym = 2**sf/125
bw = 0x04 #125kHz
if cr in [4/5, 4/6, 4/7, 4/8]:
cr = [4/5, 4/6, 4/7, 4/8].index(cr) + 1
else:
cr = 0x01 #4/5
if t_sym < 16.38: #low datarate optimization
self.write(bytes([_SET_MOD_PARAMS, sf, bw, cr,0x00]))
else:
self.write(bytes([_SET_MOD_PARAMS, sf, bw, cr,0x01]))
#set sync word
if isinstance(syn_word,int):
if syn_word <= 0xFF:
sw = bytes([(syn_word&0xF0)+4,((syn_word&0x0F)<<4)+4]) #append 4s according to
elif syn_word <= 0xFFFF:
sw = int.to_bytes(syn_word,2,'big')
else:
sw = bytes([0x14, 0x24])
self.write(bytes([_WRITE_REG, 0x07, 0x40]) + sw)
else:
self.write(bytes([_WRITE_REG, 0x07, 0x40, 0x14, 0x24]))
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
def set_freq(self, freq):
"""
Parameters
----------
freq : float, int
Transmission frequency in MHz.
"""
self.write(bytes([_SET_STANDBY, 0x00]))
utime.sleep_ms(1)
#calibrate
if freq > 430 and freq < 440:
self.write(bytes([_CAL_IMG, 0x6B,0x6F]))
elif freq > 470 and freq < 510:
self.write(bytes([_CAL_IMG, 0x75,0x81]))
elif freq > 779 and freq < 787:
self.write(bytes([_CAL_IMG, 0xC1,0xC5]))
elif freq > 863 and freq < 870:
self.write(bytes([_CAL_IMG, 0xD7,0xDB]))
elif freq > 902 and freq < 928:
self.write(bytes([_CAL_IMG, 0xE1,0xE9]))
else:
raise ValueError("Invalid Frequency")
#Set freq
rf_freq = int(freq*(2**25)/32)
self.write(bytes([_SET_RF_FREQ]) + int.to_bytes(rf_freq,4,'big')) #Datasheet p.85 (868.1*2**25)/32=910268826=0x3641999A
utime.sleep_ms(1)
def send(self,msg,freq=868.1, blocking=True):
"""
Parameters
----------
msg : str,bytes
Message to be sent via LoRa.
freq : float, int
Transmission frequency in MHz. Default: 868.1 MHz
blocking : bool
Waits for TxDone flag and clears Irq if True. Default: True
"""
if (isinstance(msg,str) or isinstance(msg,bytes)) and (isinstance(freq,float) or isinstance(freq,int)):
self.set_freq(freq)
self.write(bytes([_CLEAR_IRQ, 0x03, 0xFF])) #clear possibly remaining IRQ flags
self.write(bytes([_CLEAR_DEV_ERRS, 0x00, 0x00]))
if len(msg) > 255:
msg = msg[:255]
if isinstance(msg,str):
msg = msg.encode()
#write payload
self.write(bytes([_WRITE_BUFFER, 0x00]) + msg)
utime.sleep_ms(1)
#define frame format
self.write(bytes([_SET_PKT_PARAMS, 0x00,0x08,self.exp_header,len(msg),self.crc,self.inv_iq])) #preamble length 0x00 0x08,explicit header 0x00, payload length, crc off 0x00, iq invert false 0x00
#config dio
self.write(bytes([_SET_DIO_IRQ, 0x02, 0x01, 0x02, 0x01, 0x00, 0x00, 0x00, 0x00])) #IRQ Mask, DIO1 Mask, DIO2 Mask, DIO3 Mask
#transmit
utime.sleep_ms(10)
self.write(bytes([_SET_TX, 0x62, 0x98, 0x00])) #3.840.000 for 1 min timeout
utime.sleep_ms(100)
tx_err = False
if blocking:
while self.dio1.value() == 0:
utime.sleep_us(100)
if self.read(bytes([_GET_STATUS,0x00]))[1]&0x0E in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
self.write(bytes([_SET_STANDBY, 0x00]))
utime.sleep_ms(1)
tx_err = True
print("TX Error")
err=self.read(bytes([_GET_DEV_ERRS, 0x00, 0x00, 0x00]))[-1]
if err&0x20:
print("XOSC Start Error")
if err&0x40:
print("PLL Lock Error")
self.write(bytes([_CLEAR_DEV_ERRS, 0x00, 0x00]))
break
self.write(bytes([_CLEAR_IRQ, 0x02, 0x01]))
utime.sleep_ms(1)
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
def receive(self, freq=868.1, timeout=0):
"""
Receive LoRa message
Parameters
----------
freq : float
Frequency to listen for new messages in MHz. Default: 868.1 MHz
timeout : int
Timeout in seconds, put 0 to disable timeout. Default: 0
Returns
-----
payload : bytes
Received messageer(SX126X_REG_RTC_EVENT, rtcEvent_mv, 1)
rtcEvent_mv[0] |= 0x02
self.writeRegister(SX126X_REG_RTC_EVENT, rtcEvent, 1)
snr : float
SNR in dB
rssi : float
RSSI in dBm
"""
self.write(bytes([_SET_STANDBY, 0x00]))
utime.sleep_ms(1)
self.write(bytes([_CLEAR_IRQ, 0x02, 0x43])) #clear possibly remaining RXDone, TXDone or Timeout flags
self.set_freq(freq)
#packet params
self.write(bytes([_SET_PKT_PARAMS, 0x00, 0x08, self.exp_header, 0xFF, self.crc, self.inv_iq])) #preamble length 0x00 0x08,explicit header 0x00, max. payload length, crc off 0x00, iq invert false 0x00
#set dio irq params
self.write(bytes([_SET_DIO_IRQ, 0x02, 0x02, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00])) #IRQ Mask, DIO1 Mask, DIO2 Mask, DIO3 Mask
#set rx
to_factor = 0x000000 #No timeout
self.write(bytes([_SET_RX, 0x00, 0x00, 0x00]))
t_rx = utime.ticks_ms()
print("Receiving...")
#wait for rxdone
rx_err=False
while self.dio1.value() == 0:
utime.sleep_us(100)
#print(utime.ticks_diff(utime.ticks_ms(),t_rx)/1000)
if utime.ticks_diff(utime.ticks_ms(),t_rx)/1000 > timeout and timeout > 0:
print("Start: " + str(t_rx) + " End: " + str(utime.ticks_ms()))
rx_err = True
break
#if self.read(bytes([_GET_STATUS,0x00]))[1]&0x0E in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
# rx_err = True
# print("Error while receiving")
# break
#if rx done, getirqstatus to check crc
utime.sleep_ms(1)
irq_status = self.read([_GET_IRQ_STATUS, 0x00, 0x00, 0x00])
print(self.read(bytes([_GET_STATUS,0x00])))
print("IRQ: ",irq_status)
if not (irq_status[3] & 0x40) and not(irq_status[2] & 0x02) and not rx_err: #if crc correct and no timeout
#GetRxBufferStatus
buffer_status = self.read(bytes([_GET_RX_BUFFER_STATUS, 0x00, 0x00, 0x00]))
payload_len = buffer_status[2]
buf_start = buffer_status[3]
#read packet
payload = self.read(bytes([_READ_BUFFER,buf_start,0x00]) + bytes(payload_len))[3:]
snr,rssi = self.get_meta()
else:
payload = None
snr = None
rssi = None
#RX Workaround
#self.write(bytes([_WRITE_REG, 0x09, 0x20, 0x00]))
#val = self.read(bytes([_READ_REGISTER, 0x09, 0x44, 0x00,0x00]))[4]
#self.write(bytes([_WRITE_REG, 0x09, 0x44, val|0x02]))
#clear irq
self.write(bytes([_CLEAR_IRQ, 0x02, 0x43]))
self.write(bytes([_SET_STANDBY, 0x00]))
utime.sleep_ms(1)
self.write(bytes([_SET_SLEEP, 0x04]))#put to sleep mode for saving energy
return payload, snr, rssi
def get_meta(self):
"""
Get metadata of last received message.
Returns
-----
snr : float
SNR in dB
signal_power : float
RSSI in dBm
"""
pkt_status = self.read(bytes([_GET_PKT_STATUS, 0x00, 0x00, 0x00, 0x00]))
signal_power = pkt_status[2]/-2 #dBm
snr_twos = pkt_status[3]
if snr_twos & 0x80:
snr_twos -= 0x100
snr = snr_twos/4 #dB
rssi = pkt_status[4]/-2 #dB
return snr, signal_power
def write(self,msg, retries=3):
"""
Parameters
----------
msg : bytes
Command in bytes to be written via SPI
retries : int
number of retries after failed write attempt. Default: 3
"""
for i in range(retries): #in case of error try again
utime.sleep_ms(1)
self.cs.off()
utime.sleep_us(150)
while True:
utime.sleep_us(100)
if self.busy.value() == 0:
break
self.spi.write(msg)
utime.sleep_ms(1)
status = self.spi.read(1)
utime.sleep_ms(1)
self.cs.on()
utime.sleep_us(600)
if (status[0] & 0x0E) not in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
break
else:
print(str(i) + ": writing failed, trying again")
def read(self,buf, retries=3):
"""
Parameters
----------
buf : bytes
Bytes to send while reading.
retries : int
number of retries after failed write attempt. Default: 3
Returns
-------
rx_buf : bytes
Bytes read
"""
tx_buf = bytearray(buf)
rx_buf = bytearray(len(tx_buf))
self.cs.off()
utime.sleep_us(150)
while True:
utime.sleep_us(100)
if self.busy.value() == 0:
break
self.spi.write_readinto(tx_buf,rx_buf)
status = rx_buf[1]
utime.sleep_ms(1)
self.cs.on()
utime.sleep_us(600)
if (status & 0x0E) in (_ERR_CMD_TIMEOUT,_ERR_CMD_PROCESSING,_ERR_FAIL_EXEC_CMD):
print("reading failed, buf: " + str(buf) + " rx: " + str(rx_buf))
return bytes(rx_buf)
if __name__ == "__main__":
from machine import SPI, Pin
import machine
import utime
import random
spi = machine.SoftSPI(baudrate=400000, sck=9, mosi=10, miso=11)
cs = Pin(8, Pin.OUT, value=1)
rst = Pin(12, Pin.OUT, value=1)
busy = Pin(13, Pin.IN)
dio1 = Pin(14, Pin.IN)
sx1262 = Transceiver(spi, cs, rst, busy, dio1)
sx1262.settings(power=17, sf=7, bw=125, cr=4/5, syn_word=0x12, inv_iq=False, crc=True, exp_header=True)
sx1262.send("Hello World!")
while True:
print(sx1262.receive())