LoRaWAN-Stack-for-Micropython/source/lib/LoRaWAN.py
2025-06-24 21:54:38 +02:00

345 lines
No EOL
13 KiB
Python

from cryptolib import aes
from cmac import CMAC
import struct
import uos
import utime
import math
import random
class LoRaWAN:
def __init__(self, transceiver, freqs):
"""
Initializes LoRaWAN
Parameters
----------
transceiver : transceiver object
Object of a transceiver chip. Should contain settings, send and receive functions.
freqs : list of floats
List containing standard frequencies of a region
"""
self.freqs = freqs
self.transceiver = transceiver
self.joined = False
if ".config.cfg" in uos.listdir():
print("Read config")
self.read_cfg()
if self.NwkSKey != bytes(16):
self.joined = True
else:
self.NwkSKey = bytes(16)
self.AppSKey = bytes(16)
self.DevAddr = bytes(4)
self.DevEui = bytes(8)
self.FCntUp = 0
self.FCntDown = 0
self.devNonce = 0
self.rx1_delay = 5
self.rx1_dr_offset = 0
self.save_cfg()
def setup_abp(self, NwkSKey, AppSKey, DevAddr, DevEui):
"""
Sets the device up for ABP, saves the keys and resets counters
Parameters
----------
NwkSKey : list of 16 bytes
NwkSKey, big endian
AppSKey : list of 16 bytes
AppSKey, big endian
DevAddr : list of 4 bytes
DevAddr, big endian
DevEui : list of 8 bytes
DevEui, big endian
"""
# Reset counters and update keys if keys are new
if not(self.NwkSKey == bytes(NwkSKey) and
self.AppSKey == bytes(AppSKey) and
self.DevAddr == bytes(DevAddr) and
self.DevEui == bytes(DevEui)):
self.NwkSKey = bytes(NwkSKey)
self.AppSKey = bytes(AppSKey)
self.DevAddr = bytes(DevAddr)
self.DevEui = bytes(DevEui)
self.FCntUp = 0
self.FCntDown = 0
self.joined = True
self.save_cfg()
def reset(self):
"""
Resets stored keys and counters except devNonce.
"""
self.NwkSKey = bytes(16)
self.AppSKey = bytes(16)
self.DevAddr = bytes(4)
self.DevEui = bytes(8)
self.FCntUp = 0
self.FCntDown = 0
self.joined = False
self.save_cfg()
def join_otaa(self, AppKey, joinEUI, devEUI, retries=3, sf=7):
"""
Sets the device up for ABP, saves the keys and resets counters
Parameters
----------
AppKey : list of 16 bytes
AppKey, big endian
joinEUI : list of 8 bytes
joinEUI, big endian, can be all zeros
devEui : list of 8 bytes
DevEui, big endian
retries : int
Number of join attempts.
sf : int
Spreading Factor for join attempts can be between 7 and 12. Default: 7
"""
AppKey = bytes(AppKey)
joinEUI = bytes(joinEUI)
self.DevEui = bytes(devEUI)
tries = 0
while tries < retries:
tries+=1
self.transceiver.settings(power=20, sf=sf, bw=125, cr=4/5, syn_word=0x34, inv_iq=False, crc=True, exp_header=True)
mhdr = bytes([0x00]) #join request(000), rfu (000), major(00)
join_request = bytes(reversed(joinEUI)) + bytes(reversed(self.DevEui)) + int.to_bytes(self.devNonce,2,'little')
self.devNonce += 1
self.save_cfg()
cmac = CMAC()
mic = cmac.aes_cmac(AppKey, mhdr + join_request)[0:4]
msg=mhdr+join_request+mic
print(msg)
rx_msg = self.transceiver.send(msg, self.freqs[0])
self.transceiver.settings(power=20, sf=sf, bw=125, cr=4/5, syn_word=0x34, inv_iq=True, crc=False, exp_header=True)
rx_msg,snr,rssi=self.transceiver.receive(self.freqs[0],timeout=self.rx1_delay+3) #see regional parameters
print(rx_msg)
if rx_msg != None:
rx_mhdr = rx_msg[0]
else:
rx_mhdr = None
if not rx_mhdr == 0x20: #check if received message is join accept
utime.sleep(5)
continue
join_accept_payload, mic_correct = self.decrypt_join_accept(AppKey, rx_msg[1:])
print("JA PAyload",join_accept_payload)
if not mic_correct:
self.joined = False
continue
else:
return
def send(self,msg,power=20,sf=7,ack=False, adr=False, fport=b'\x01'):
"""
Sends message to LoRaWAN network if joined.
Parameters
----------
msg : str or bytes
Payload to be transmitted via LoRaWAN
power : int
Tx power. Default: 20
ack : bool
Request ack. Default: False
adr : bool
RFU. Default: False
fport : byte
LoRaWAN FPort. Default: 0x01
"""
if self.joined:
self.transceiver.settings(power=power, sf=sf, bw=125, cr=4/5, syn_word=0x34, inv_iq=False, crc=True, exp_header=True)
utime.sleep_ms(1)
#mac header
mhdr = 0x00
if ack:
mhdr |= 0x80
else:
mhdr |= 0x40
mhdr = bytes([mhdr])
#mac payload
#frame header
devaddr = bytes(reversed(self.DevAddr))
adr_bit = 0x80 if adr else 0x00
adr_ack_req = 0x00
ack_bit = 0x00
classb = 0x00 #currently no class b support planed
fopts_len = 0x00 #currently only payload, no mac
fctrl = bytes([adr_bit + adr_ack_req + ack_bit+ classb + fopts_len]) #ADR,adrackreq,ack,classb,foptslen
fcnt = int.to_bytes(self.FCntUp,2,'little')
fopts = bytes([])
fhdr = devaddr + fctrl + fcnt + fopts
#frame payload
if isinstance(msg,str):
frmpayload = self.encrypt_payload(msg.encode())
elif isinstance(msg, bytes):
frmpayload = self.encrypt_payload(msg)
print("Encrypted Payload: {}".format(frmpayload))
msg_no_mic = mhdr + fhdr + fport + frmpayload
B0 = bytes([0x49, 0x00, 0x00, 0x00, 0x00, 0x00]) + devaddr + int.to_bytes(self.FCntUp, 4, 'little') + bytes([0x00, len(msg_no_mic)])
print(B0)
cmac = CMAC()
mic = cmac.aes_cmac(self.NwkSKey, B0 + msg_no_mic)[0:4]
payload = msg_no_mic + mic
print("Payload: " + str(payload))
freq=random.choice(self.freqs)
self.transceiver.send(payload, freq)
self.FCntUp+=1
self.save_cfg()
#rx1
ack_received = False
rx_msg = None
if ack:
sf_rx = sf+self.rx1_dr_offset if sf+self.rx1_dr_offset < 12 else 12
self.transceiver.settings(power=20, sf=sf_rx, bw=125, cr=4/5, syn_word=0x34, inv_iq=True, crc=False, exp_header=True)
rx_msg,snr,rssi=self.transceiver.receive(freq,timeout=self.rx1_delay+3) #see regional parameters
if rx_msg != None:
if len(rx_msg) > 5 and bytes(reversed(rxmsg[1:5])) == self.DevAddr: #verify DevAddr
if rx_msg[5] & 0x20:
ack_received = True
else:
utime.sleep(self.rx1_delay+1)#wait until rx2 window is over
return rx_msg, ack_received
else:
raise Exception("Not joined to a Network")
return None
def encrypt_payload(self, msg):
"""
Encrypts payload for LoRaWAN transmission
Parameters
----------
msg : bytes
Payload to be encrypted
"""
#Enter Keys in Big Endian, this code changes and transmitts as little endian
K = self.AppSKey
k = math.ceil(len(msg)/16)
DevAddr = bytes(reversed(self.DevAddr))
cipher = aes(K,1)
S = b''
for i in range(k):
A = bytes([0x01, 0x00, 0x00, 0x00, 0x00, 0x00]) + DevAddr + int.to_bytes(self.FCntUp, 4, 'little') + bytes([0x00]) + int.to_bytes(i+1, 1, 'little')
S_tmp = cipher.encrypt(A)
S+=S_tmp
msg_pad = msg + bytes(k*16-len(msg)) #Append 0x00 so that padded length is multiple of 16
#xor:
msg_crypt = bytes([a ^ b for a, b in zip(msg_pad, S)])
return msg_crypt[:len(msg)]
def decrypt_join_accept(self, AppKey, join_accept):
"""
Decrypts join accept message, calculates keys.
If CF List of Type 0 is received, it saves the frequencies.
Parameters
----------
AppKey : list of bytes
AppKey used to decrypt join accept message.
join_accept : bytes
Join accept message to be decrypted
"""
K = AppKey
cipher = aes(K,1)
if len(join_accept)%16:
join_accept = join_accept + bytes(16-len(join_accept)%16)
print(join_accept)
join_accept_dec = cipher.encrypt(join_accept) #message is encrypted, such that it can be decrypted by encrypt function
mic_rx = join_accept_dec[-4:]
cmac = CMAC()
mic_calc= cmac.aes_cmac(AppKey, bytes([0x20])+join_accept_dec[:-4])[:4]
print("RX MIC: {}, Calc MIC: {}".format(mic_rx,mic_calc))
c = join_accept_dec[:6] + int.to_bytes(self.devNonce-1, 2, 'little') #-1 due to increment after transmission
c_pad = c + bytes(15-len(c))
self.NwkSKey = cipher.encrypt(bytes([0x01]) + c_pad)
self.AppSKey = cipher.encrypt(bytes([0x02]) + c_pad)
self.DevAddr = bytes(reversed(bytes(join_accept_dec[6:10])))
self.rx1_delay = join_accept_dec[11]
self.rx1_dr_offset = (join_accept_dec[10] & 0x70) >> 4
print("NwkSKey: {}, AppSKey: {}, DevAddr: {}, rx1_delay: {}, DR_RX1: {}".format(self.NwkSKey,self.AppSKey,self.DevAddr,self.rx1_delay, self.rx1_dr_offset))
cf_list = join_accept_dec[12:-4] #safe as freqs
if len(cf_list) == 16:
if cf_list[-1] == 0:
for i in range(0,15,3):
print(cf_list[i:i+3])
tmp = int.from_bytes(cf_list[i:i+3],'little')
try:
tmp = tmp/10000
self.freqs.append(tmp)
except Exception as exc:
print(exc)
if mic_calc == mic_rx:
self.save_cfg()
self.joined = True
self.send("Joined")
return join_accept_dec, mic_calc == mic_rx
def save_cfg(self):
"""
Saves configuration in hidden file ".config.cfg"
"""
f_bytes = bytes(0)
for freq in self.freqs:
f_bytes += struct.pack('f',freq)
cfg = struct.pack(">iiiii16s16s4s8s",self.FCntUp, self.FCntDown, self.devNonce, self.rx1_delay, self.rx1_dr_offset, self.NwkSKey, self.AppSKey, self.DevAddr, self.DevEui)
with open(".config.cfg","wb") as f:
f.write(cfg+f_bytes)
def read_cfg(self):
"""
Reads configuration from hidden file ".config.cfg"
"""
with open(".config.cfg","rb") as f:
cfg = f.read()
self.FCntUp, self.FCntDown, self.devNonce, self.rx1_delay, self.rx1_dr_offset, self.NwkSKey, self.AppSKey, self.DevAddr, self.DevEui = struct.unpack(">iiiii16s16s4s8s",cfg)
tmp_freqs = cfg[64:]
for i in range(0,len(tmp_freqs),4):
freq = struct.unpack('f',tmp_freqs[i:i+4])[0]
if not freq in self.freqs:
self.freqs.append(freq)
if __name__ == "__main__":
import ubinascii
from config_ABP import *
from machine import SPI, Pin
import machine
import utime
import EU868
from SX1262 import Transceiver
spi = machine.SoftSPI(baudrate=400000, sck=9, mosi=10, miso=11)
cs = Pin(8, Pin.OUT, value=1)
rst = Pin(12, Pin.OUT, value=1)
busy = Pin(13, Pin.IN)
dio1 = Pin(14, Pin.IN)
sx1262 = Transceiver(spi, cs, rst, busy, dio1)
lw = LoRaWAN(sx1262, EU868.FREQS)
lw.setup_abp(NwkSKey, AppSKey, DevAddr, DevEUI)
if not lw.joined:
print(lw.join_otaa(1))
else:
print(lw.send("Hello World"))