Toggle Navigation
Hatchery
Eggs
updi_test
physical.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
physical.py
raw
Content
""" Serial driver for UPDI stack MODIFIED FOR USE ON THE BADGE.TEAM ESP32 PLATFORM FIRMWARE """ import machine import updi_test.constants as constants class UpdiPhysical(object): """ PDI physical driver using a given COM port at a given baud """ def __init__(self, port, baud=115200): """ Initialise the COM port """ # Inter-byte delay self.ibdly = 0.0001 self.port = port self.baud = baud self.ser = None self.initialise_serial(self.port, self.baud) # send an initial break as handshake self.send([constants.UPDI_BREAK]) def initialise_serial(self, port, baud): """ Standard COM port initialisation """ print("Opening ({},{}) at {} baud".format(port[0], port[1], baud)) self.ser = machine.UART(2, rx=port[0], tx=port[1], baudrate=baud, bits=8, parity=0, stop=2, timeout=2000, buffer_size=1024) def _loginfo(self, msg, data): if data and isinstance(data[0], str): i_data = [ord(x) for x in data] else: i_data = data data_str = "[" + ", ".join([hex(x) for x in i_data]) + "]" print(msg + ' : ' + data_str) def send_double_break(self): """ Sends a double break to reset the UPDI port BREAK is actually just a slower zero frame A double break is guaranteed to push the UPDI state machine into a known state, albeit rather brutally """ print("Sending double break") #txPin = machine.Pin(self.port[1], machine.Pin.OUT) #txPin.value(0) #time.sleep_ms(52) #txPin.value(1) try: self.ser.deinit() except: pass try: self.ser.deinit() except: pass try: self.ser.deinit() except: pass print("Temporary port at ",self.port) self.ser = machine.UART(2, rx=self.port[0], tx=self.port[1], baudrate=300, timeout=2000, buffer_size=1024) self.ser.write(bytes([constants.UPDI_BREAK, constants.UPDI_BREAK])) xx = self.ser.read(2) print("BREAK_RX:",xx) try: self.ser.deinit() except: pass try: self.ser.deinit() except: pass try: self.ser.deinit() except: pass self.initialise_serial(self.port, self.baud) ign = self.ser.read(1) while len(ign) > 0: print("ignored after break: ", ign) ign = self.ser.read(1) def send(self, command): """ Sends a char array to UPDI with inter-byte delay Note that the byte will echo back """ self._loginfo("send", command) ign = self.ser.read(1) while len(ign) > 0: print("ignored: ", ign) ign = self.ser.read(1) self.ser.write(bytes(command)) # it will echo back. #time.sleep(0.1) echo = [] echoCharArray = self.ser.read(1) while len(echo) < len(command) and len(echoCharArray) > 0: echo.append(echoCharArray[0]) print("Received echo ({}): {}".format(len(echo), echo)) echoCharArray = self.ser.read(1) if bytes(echo) != bytes(command): self._loginfo("incorrect echo", echo) print(echo) print(command) raise Exception("Incorrect echo data") def receive(self, size): """ Receives a frame of a known number of chars from UPDI """ response = [] timeout = 1 # For each byte while size and timeout: # Read character = self.ser.read(1) # Anything in? if character: response.append(ord(character)) size -= 1 else: timeout -= 1 self._loginfo("receive", response) return response def sib(self): """ System information block is just a string coming back from a SIB command """ self.send([ constants.UPDI_PHY_SYNC, constants.UPDI_KEY | constants.UPDI_KEY_SIB | constants.UPDI_SIB_16BYTES]) return self.ser.readline() def __del__(self): if self.ser: print("Closing {}".format(self.port)) self.ser.close()