mirror of
https://github.com/B00mb0x/LoRaWAN-Stack-for-Micropython.git
synced 2025-08-04 05:00:05 +02:00
345 lines
No EOL
13 KiB
Python
345 lines
No EOL
13 KiB
Python
from cryptolib import aes
|
|
from cmac import CMAC
|
|
import struct
|
|
import uos
|
|
import utime
|
|
import math
|
|
import random
|
|
|
|
class LoRaWAN:
|
|
def __init__(self, transceiver, freqs):
|
|
"""
|
|
Initializes LoRaWAN
|
|
|
|
Parameters
|
|
----------
|
|
transceiver : transceiver object
|
|
Object of a transceiver chip. Should contain settings, send and receive functions.
|
|
freqs : list of floats
|
|
List containing standard frequencies of a region
|
|
"""
|
|
self.freqs = freqs
|
|
self.transceiver = transceiver
|
|
self.joined = False
|
|
if ".config.cfg" in uos.listdir():
|
|
print("Read config")
|
|
self.read_cfg()
|
|
if self.NwkSKey != bytes(16):
|
|
self.joined = True
|
|
else:
|
|
self.NwkSKey = bytes(16)
|
|
self.AppSKey = bytes(16)
|
|
self.DevAddr = bytes(4)
|
|
self.DevEui = bytes(8)
|
|
self.FCntUp = 0
|
|
self.FCntDown = 0
|
|
self.devNonce = 0
|
|
self.rx1_delay = 5
|
|
self.rx1_dr_offset = 0
|
|
self.save_cfg()
|
|
|
|
def setup_abp(self, NwkSKey, AppSKey, DevAddr, DevEui):
|
|
"""
|
|
Sets the device up for ABP, saves the keys and resets counters
|
|
|
|
Parameters
|
|
----------
|
|
NwkSKey : list of 16 bytes
|
|
NwkSKey, big endian
|
|
AppSKey : list of 16 bytes
|
|
AppSKey, big endian
|
|
DevAddr : list of 4 bytes
|
|
DevAddr, big endian
|
|
DevEui : list of 8 bytes
|
|
DevEui, big endian
|
|
"""
|
|
# Reset counters and update keys if keys are new
|
|
if not(self.NwkSKey == bytes(NwkSKey) and
|
|
self.AppSKey == bytes(AppSKey) and
|
|
self.DevAddr == bytes(DevAddr) and
|
|
self.DevEui == bytes(DevEui)):
|
|
self.NwkSKey = bytes(NwkSKey)
|
|
self.AppSKey = bytes(AppSKey)
|
|
self.DevAddr = bytes(DevAddr)
|
|
self.DevEui = bytes(DevEui)
|
|
self.FCntUp = 0
|
|
self.FCntDown = 0
|
|
self.joined = True
|
|
self.save_cfg()
|
|
|
|
def reset(self):
|
|
"""
|
|
Resets stored keys and counters except devNonce.
|
|
"""
|
|
self.NwkSKey = bytes(16)
|
|
self.AppSKey = bytes(16)
|
|
self.DevAddr = bytes(4)
|
|
self.DevEui = bytes(8)
|
|
self.FCntUp = 0
|
|
self.FCntDown = 0
|
|
self.joined = False
|
|
self.save_cfg()
|
|
|
|
def join_otaa(self, AppKey, joinEUI, devEUI, retries=3, sf=7):
|
|
"""
|
|
Sets the device up for ABP, saves the keys and resets counters
|
|
|
|
Parameters
|
|
----------
|
|
AppKey : list of 16 bytes
|
|
AppKey, big endian
|
|
joinEUI : list of 8 bytes
|
|
joinEUI, big endian, can be all zeros
|
|
devEui : list of 8 bytes
|
|
DevEui, big endian
|
|
retries : int
|
|
Number of join attempts.
|
|
sf : int
|
|
Spreading Factor for join attempts can be between 7 and 12. Default: 7
|
|
"""
|
|
AppKey = bytes(AppKey)
|
|
joinEUI = bytes(joinEUI)
|
|
self.DevEui = bytes(devEUI)
|
|
|
|
tries = 0
|
|
while tries < retries:
|
|
tries+=1
|
|
self.transceiver.settings(power=20, sf=sf, bw=125, cr=4/5, syn_word=0x34, inv_iq=False, crc=True, exp_header=True)
|
|
mhdr = bytes([0x00]) #join request(000), rfu (000), major(00)
|
|
join_request = bytes(reversed(joinEUI)) + bytes(reversed(self.DevEui)) + int.to_bytes(self.devNonce,2,'little')
|
|
self.devNonce += 1
|
|
self.save_cfg()
|
|
cmac = CMAC()
|
|
mic = cmac.aes_cmac(AppKey, mhdr + join_request)[0:4]
|
|
msg=mhdr+join_request+mic
|
|
print(msg)
|
|
rx_msg = self.transceiver.send(msg, self.freqs[0])
|
|
self.transceiver.settings(power=20, sf=sf, bw=125, cr=4/5, syn_word=0x34, inv_iq=True, crc=False, exp_header=True)
|
|
rx_msg,snr,rssi=self.transceiver.receive(self.freqs[0],timeout=self.rx1_delay+3) #see regional parameters
|
|
|
|
print(rx_msg)
|
|
if rx_msg != None:
|
|
rx_mhdr = rx_msg[0]
|
|
else:
|
|
rx_mhdr = None
|
|
if not rx_mhdr == 0x20: #check if received message is join accept
|
|
utime.sleep(5)
|
|
continue
|
|
join_accept_payload, mic_correct = self.decrypt_join_accept(AppKey, rx_msg[1:])
|
|
print("JA PAyload",join_accept_payload)
|
|
if not mic_correct:
|
|
self.joined = False
|
|
continue
|
|
else:
|
|
return
|
|
|
|
def send(self,msg,power=20,sf=7,ack=False, adr=False, fport=b'\x01'):
|
|
"""
|
|
Sends message to LoRaWAN network if joined.
|
|
|
|
Parameters
|
|
----------
|
|
msg : str or bytes
|
|
Payload to be transmitted via LoRaWAN
|
|
power : int
|
|
Tx power. Default: 20
|
|
ack : bool
|
|
Request ack. Default: False
|
|
adr : bool
|
|
RFU. Default: False
|
|
fport : byte
|
|
LoRaWAN FPort. Default: 0x01
|
|
"""
|
|
if self.joined:
|
|
self.transceiver.settings(power=power, sf=sf, bw=125, cr=4/5, syn_word=0x34, inv_iq=False, crc=True, exp_header=True)
|
|
utime.sleep_ms(1)
|
|
#mac header
|
|
mhdr = 0x00
|
|
if ack:
|
|
mhdr |= 0x80
|
|
else:
|
|
mhdr |= 0x40
|
|
mhdr = bytes([mhdr])
|
|
|
|
#mac payload
|
|
#frame header
|
|
devaddr = bytes(reversed(self.DevAddr))
|
|
|
|
adr_bit = 0x80 if adr else 0x00
|
|
adr_ack_req = 0x00
|
|
ack_bit = 0x00
|
|
classb = 0x00 #currently no class b support planed
|
|
fopts_len = 0x00 #currently only payload, no mac
|
|
fctrl = bytes([adr_bit + adr_ack_req + ack_bit+ classb + fopts_len]) #ADR,adrackreq,ack,classb,foptslen
|
|
|
|
fcnt = int.to_bytes(self.FCntUp,2,'little')
|
|
|
|
fopts = bytes([])
|
|
|
|
fhdr = devaddr + fctrl + fcnt + fopts
|
|
|
|
#frame payload
|
|
if isinstance(msg,str):
|
|
frmpayload = self.encrypt_payload(msg.encode())
|
|
elif isinstance(msg, bytes):
|
|
frmpayload = self.encrypt_payload(msg)
|
|
print("Encrypted Payload: {}".format(frmpayload))
|
|
|
|
msg_no_mic = mhdr + fhdr + fport + frmpayload
|
|
|
|
B0 = bytes([0x49, 0x00, 0x00, 0x00, 0x00, 0x00]) + devaddr + int.to_bytes(self.FCntUp, 4, 'little') + bytes([0x00, len(msg_no_mic)])
|
|
print(B0)
|
|
|
|
cmac = CMAC()
|
|
mic = cmac.aes_cmac(self.NwkSKey, B0 + msg_no_mic)[0:4]
|
|
|
|
payload = msg_no_mic + mic
|
|
print("Payload: " + str(payload))
|
|
freq=random.choice(self.freqs)
|
|
self.transceiver.send(payload, freq)
|
|
self.FCntUp+=1
|
|
self.save_cfg()
|
|
|
|
#rx1
|
|
ack_received = False
|
|
rx_msg = None
|
|
if ack:
|
|
sf_rx = sf+self.rx1_dr_offset if sf+self.rx1_dr_offset < 12 else 12
|
|
self.transceiver.settings(power=20, sf=sf_rx, bw=125, cr=4/5, syn_word=0x34, inv_iq=True, crc=False, exp_header=True)
|
|
rx_msg,snr,rssi=self.transceiver.receive(freq,timeout=self.rx1_delay+3) #see regional parameters
|
|
if rx_msg != None:
|
|
if len(rx_msg) > 5 and bytes(reversed(rxmsg[1:5])) == self.DevAddr: #verify DevAddr
|
|
if rx_msg[5] & 0x20:
|
|
ack_received = True
|
|
else:
|
|
utime.sleep(self.rx1_delay+1)#wait until rx2 window is over
|
|
return rx_msg, ack_received
|
|
else:
|
|
raise Exception("Not joined to a Network")
|
|
return None
|
|
|
|
def encrypt_payload(self, msg):
|
|
"""
|
|
Encrypts payload for LoRaWAN transmission
|
|
|
|
Parameters
|
|
----------
|
|
msg : bytes
|
|
Payload to be encrypted
|
|
"""
|
|
#Enter Keys in Big Endian, this code changes and transmitts as little endian
|
|
K = self.AppSKey
|
|
k = math.ceil(len(msg)/16)
|
|
DevAddr = bytes(reversed(self.DevAddr))
|
|
cipher = aes(K,1)
|
|
|
|
S = b''
|
|
for i in range(k):
|
|
A = bytes([0x01, 0x00, 0x00, 0x00, 0x00, 0x00]) + DevAddr + int.to_bytes(self.FCntUp, 4, 'little') + bytes([0x00]) + int.to_bytes(i+1, 1, 'little')
|
|
S_tmp = cipher.encrypt(A)
|
|
S+=S_tmp
|
|
|
|
msg_pad = msg + bytes(k*16-len(msg)) #Append 0x00 so that padded length is multiple of 16
|
|
#xor:
|
|
msg_crypt = bytes([a ^ b for a, b in zip(msg_pad, S)])
|
|
return msg_crypt[:len(msg)]
|
|
|
|
def decrypt_join_accept(self, AppKey, join_accept):
|
|
"""
|
|
Decrypts join accept message, calculates keys.
|
|
If CF List of Type 0 is received, it saves the frequencies.
|
|
|
|
Parameters
|
|
----------
|
|
AppKey : list of bytes
|
|
AppKey used to decrypt join accept message.
|
|
join_accept : bytes
|
|
Join accept message to be decrypted
|
|
"""
|
|
K = AppKey
|
|
cipher = aes(K,1)
|
|
if len(join_accept)%16:
|
|
join_accept = join_accept + bytes(16-len(join_accept)%16)
|
|
print(join_accept)
|
|
join_accept_dec = cipher.encrypt(join_accept) #message is encrypted, such that it can be decrypted by encrypt function
|
|
mic_rx = join_accept_dec[-4:]
|
|
cmac = CMAC()
|
|
mic_calc= cmac.aes_cmac(AppKey, bytes([0x20])+join_accept_dec[:-4])[:4]
|
|
print("RX MIC: {}, Calc MIC: {}".format(mic_rx,mic_calc))
|
|
c = join_accept_dec[:6] + int.to_bytes(self.devNonce-1, 2, 'little') #-1 due to increment after transmission
|
|
c_pad = c + bytes(15-len(c))
|
|
self.NwkSKey = cipher.encrypt(bytes([0x01]) + c_pad)
|
|
self.AppSKey = cipher.encrypt(bytes([0x02]) + c_pad)
|
|
self.DevAddr = bytes(reversed(bytes(join_accept_dec[6:10])))
|
|
self.rx1_delay = join_accept_dec[11]
|
|
self.rx1_dr_offset = (join_accept_dec[10] & 0x70) >> 4
|
|
print("NwkSKey: {}, AppSKey: {}, DevAddr: {}, rx1_delay: {}, DR_RX1: {}".format(self.NwkSKey,self.AppSKey,self.DevAddr,self.rx1_delay, self.rx1_dr_offset))
|
|
|
|
cf_list = join_accept_dec[12:-4] #safe as freqs
|
|
if len(cf_list) == 16:
|
|
if cf_list[-1] == 0:
|
|
for i in range(0,15,3):
|
|
print(cf_list[i:i+3])
|
|
tmp = int.from_bytes(cf_list[i:i+3],'little')
|
|
try:
|
|
tmp = tmp/10000
|
|
self.freqs.append(tmp)
|
|
except Exception as exc:
|
|
print(exc)
|
|
if mic_calc == mic_rx:
|
|
self.save_cfg()
|
|
self.joined = True
|
|
|
|
self.send("Joined")
|
|
|
|
return join_accept_dec, mic_calc == mic_rx
|
|
|
|
def save_cfg(self):
|
|
"""
|
|
Saves configuration in hidden file ".config.cfg"
|
|
"""
|
|
f_bytes = bytes(0)
|
|
for freq in self.freqs:
|
|
f_bytes += struct.pack('f',freq)
|
|
cfg = struct.pack(">iiiii16s16s4s8s",self.FCntUp, self.FCntDown, self.devNonce, self.rx1_delay, self.rx1_dr_offset, self.NwkSKey, self.AppSKey, self.DevAddr, self.DevEui)
|
|
with open(".config.cfg","wb") as f:
|
|
f.write(cfg+f_bytes)
|
|
|
|
def read_cfg(self):
|
|
"""
|
|
Reads configuration from hidden file ".config.cfg"
|
|
"""
|
|
with open(".config.cfg","rb") as f:
|
|
cfg = f.read()
|
|
self.FCntUp, self.FCntDown, self.devNonce, self.rx1_delay, self.rx1_dr_offset, self.NwkSKey, self.AppSKey, self.DevAddr, self.DevEui = struct.unpack(">iiiii16s16s4s8s",cfg)
|
|
tmp_freqs = cfg[64:]
|
|
for i in range(0,len(tmp_freqs),4):
|
|
freq = struct.unpack('f',tmp_freqs[i:i+4])[0]
|
|
if not freq in self.freqs:
|
|
self.freqs.append(freq)
|
|
|
|
if __name__ == "__main__":
|
|
import ubinascii
|
|
from config_ABP import *
|
|
from machine import SPI, Pin
|
|
import machine
|
|
import utime
|
|
import EU868
|
|
|
|
from SX1262 import Transceiver
|
|
spi = machine.SoftSPI(baudrate=400000, sck=9, mosi=10, miso=11)
|
|
cs = Pin(8, Pin.OUT, value=1)
|
|
rst = Pin(12, Pin.OUT, value=1)
|
|
busy = Pin(13, Pin.IN)
|
|
dio1 = Pin(14, Pin.IN)
|
|
|
|
sx1262 = Transceiver(spi, cs, rst, busy, dio1)
|
|
|
|
lw = LoRaWAN(sx1262, EU868.FREQS)
|
|
lw.setup_abp(NwkSKey, AppSKey, DevAddr, DevEUI)
|
|
|
|
if not lw.joined:
|
|
print(lw.join_otaa(1))
|
|
else:
|
|
print(lw.send("Hello World"))
|
|
|