Toggle Navigation
Hatchery
Eggs
WiFi DMX Player
__init__.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
__init__.py
raw
Content
import gc import socket, time, wifi, struct, rgb, uinterface screen = [0x000000FF] * 256 rgb.clear() uinterface.connect_wifi() if not wifi.status(): uinterface.skippabletext('Could not connect to WiFi') MCAST_GRP = '239.255.0.1' MCAST_PORT = 5568 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('0.0.0.0', MCAST_PORT)) def parse_packet(packet): global screen if len(packet) < 4: print('Received packet does not contain ACN preamble and postamble sizes') return acn_preamble_size, acn_postamble_size = struct.unpack('>HH', packet[0:4]) packet = packet[4:] packet_id_size = (acn_preamble_size + acn_postamble_size - 4) if len(packet) < packet_id_size: print('Received packet contains no ACN packet identifier') return packet_id = packet[packet_id_size] packet = packet[packet_id_size:] if len(packet) < 23: print('Received packet does not contain enough root DMX metadata: %d instead of %d' % (len(packet), 23)) return root_pdu_length, root_pdu_protocol = struct.unpack('>HI', packet[0:6]) root_pdu_length &= 0xFFF # First nibble is some flags root_pdu_cid = packet[6:22] packet = packet[22:] if len(packet) != (root_pdu_length - 22) or len(packet) < 77: print('Received packet does not contain enough streaming DMX data: %d instead of %d' % (len(packet), root_pdu_length - 23)) return if root_pdu_protocol != 0x04: print('Please only use the "Ratified DMX (4)" protocol for encapsulating data in ACN packets') return # print(root_pdu_length, root_pdu_protocol, root_pdu_cid) streaming_pdu_length, streaming_pdu_vector = struct.unpack('>HI', packet[0:6]) streaming_pdu_length &= 0xFFF # First nibble is some flags streaming_pdu_source = packet[6:70] streaming_pdu_priority, reserved, streaming_seq_number, \ streaming_options, streaming_universe = struct.unpack('>BHBBH', packet[70:77]) packet = packet[77:] # print(streaming_pdu_length, streaming_pdu_vector, streaming_pdu_priority, streaming_seq_number, streaming_universe) if streaming_pdu_vector != 0x02: print('Unknown streaming PDU vector (should be "Streaming DMX (2)")') return if len(packet) != (streaming_pdu_length - 77): print('Received packet does not contain enough inner DMX data: %d instead of %d' % (len(packet), streaming_pdu_length - 46)) return inner_pdu_length, inner_pdu_vector, inner_pdu_datatype, \ inner_pdu_first_addr, inner_pdu_increment, inner_pdu_count, \ inner_pdu_start_code = struct.unpack('>HBBHHHB', packet[0:11]) inner_pdu_length &= 0xFFF data = packet[11:] data_len = len(data) if inner_pdu_vector != 0x02: print('Unknown inner PDU vector (should be "Set Property (2)"') return if len(data) != (inner_pdu_length - 11): print('DMX data payload is smaller than specified in headers') return # print('%d %d %d' % (root_pdu_length, streaming_pdu_length, inner_pdu_length)) for i in range(0, (data_len//3)): led_index = ((streaming_universe-1) * 128) + i data_index = i*3 r, g, b = struct.unpack('BBB', data[data_index:data_index+3]) # print('uni', streaming_universe, 'led', led_index, r, g, b, 'data', data_index) screen[led_index] = ((r << 24) + (g << 16) + (b << 8) + 0xFF) try: rgb.frame(screen) except: print('Failed:', screen) rgb.clear() rgb.scrolltext('Send data to %s:5568' % wifi._STA_IF.ifconfig()[0]) data, addr = s.recvfrom(1024) rgb.clear() rgb.disablecomp() rgb.frame(screen) while len(data): parse_packet(data) try: data, addr = s.recvfrom(1024) except MemoryError: gc.collect()