Toggle Navigation
Hatchery
Eggs
JuNePager
service.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
service.py
raw
Content
import badge import ugfx import binascii import hashlib import utime try: from umqtt.simple import MQTTClient import machine import wifi except Exception as e: print("[JuNePager] Emulator sux0rs.") ''' Service construction: - check for message on MQTT every serviceTime - when message received vibrate / blink leds / get attention and show message on main screen until acked by user - user can ack by pressing konami code - ACK is sent back to sender -- optional MQTT topic is june/rules/pager/#RECEIVERID/#SENDERID BenV: >>> machine.unique_id() b'$\n\xc4\x12\xdd\xcc -> binascii.hexlify(hashlib.sha256(machine.unique_id()).digest())[0:4].decode() -> 6068 Lotjuh: >>> machine.unique_id() b'$\n\xc4\x12\xe3\x88' -> binascii.hexlify(hashlib.sha256(b'$\n\xc4\x12\xe3\x88').digest())[0:4].decode() -> 71eb ''' def setup(): global mqtt global serviceEnabled global serviceMessage global serviceTime global serviceID global serviceFont global serviceLastCheckTime global serviceLastSender #ugfx.init() #badge.init() serviceID = binascii.hexlify(hashlib.sha256(machine.unique_id()).digest())[0:4].decode() serviceFont = badge.nvs_get_str('junepager', 'font', 'Roboto_Regular12') serviceMessage = badge.nvs_get_str('junepager', 'lastmessage', None) serviceEnabled = badge.nvs_get_u8('junepager', 'service', 1) serviceTime = badge.nvs_get_u8('junepager', 'time', 300) serviceLastCheckTime = 0 serviceLastSender = None if (serviceEnabled<1) or len(serviceID) < 4: print("[JuNePager] Disabled! Please enable in the app!") return print("[JuNePager] Enabled! Checking ID " + serviceID + " every " + str(serviceTime) + " seconds.") ugfx.input_init() ugfx.input_attach(ugfx.BTN_A, lambda pressed: clearMessage(pressed)) ugfx.input_attach(ugfx.BTN_B, lambda pressed: clearMessage(pressed)) def loop(): global serviceTime global serviceEnabled global serviceMessage global serviceID global serviceLastCheckTime global mqtt if not serviceEnabled: return [9999999, 0] # Check last time we checked. We don't want to start wifi every single time... delta = utime.ticks_diff(utime.ticks_ms(), serviceLastCheckTime) if serviceLastCheckTime != 0 and delta < (serviceTime*1000): print("[JuNePager] Not yet time to check. Save battery. Delta {} - {} is {} while we wait for {}.".format(utime.ticks_ms(), serviceLastCheckTime, str(delta/1000),serviceTime)) return [delta, 0] # Wait for WiFi connection, otherwise MQTT doesn't like it :/ print("[JuNePager] Get wifi up.") try: wifi.sta_if.isconnected() except Exception as e: wifi.init() try: count = 0 while not wifi.sta_if.isconnected(): count = count + 1 utime.sleep_ms(50) if (count % 20) == 0: print("[JuNePager] Still trying to get wifi up.") if count > 100: print("[JuNePager] Fuck this. Fix your wifi.") return [60000, 0] print("[JuNePager] Wifi up.") except Exception as e: print("[JuNePager] Error: " + str(e)) mqtt = MQTTClient("JunePager" + serviceID, "mqtt.sha2017.org") mqtt.set_callback(mqtt_cb) # Check for messages print("[JuNePager] Connect MQTT") mqtt.connect(clean_session=True) # topics = ["june/rules/pager/" + serviceID + '/#', "june/rules/pager/" + serviceID + '/+', "june/rules/pager/" + serviceID + '/'] topics = ["june/rules/pager/" + serviceID + '/+', "june/rules/pager/" + serviceID + '/'] # print("[JuNePager] Subscribe topic {} and {} MQTT".format(topic, topic + '/#')) # mqtt.subscribe(topic=topic) print("[JuNePager] Subscribe to topics {} MQTT".format(", ".join(topics))) for topic in topics: mqtt.subscribe(topic=topic) print("[JuNePager] Check messages on MQTT") try: mqtt.check_msg() mqtt.check_msg() except Exception as e: print("[JuNePager] MQTT failed - /care") pass print("[JuNePager] MQTT disconnect") mqtt.disconnect() serviceLastCheckTime = utime.ticks_ms() print("[JuNePager] Will check again in {} seconds".format(str(serviceTime))) return serviceTime*1000 def mqtt_cb(topic, msg): print("[JuNePager] MQTT Callback start") global serviceID global serviceMessage global serviceLastSender global mqtt # Sanity checking if len(topic) > len("june/rules/pager/" + serviceID + "/1234") or len(msg) > 512 or len(msg) == 0: # Fuck this. if len(msg) > 0: print("[JuNePager] MQTT callback with too large topic/msg ({}/{} size) -- fsck sender.".format(len(topic), len(msg))) sendMessage(topic, None) print("[JuNePager] Returning") return try: msg = msg.decode('utf-8') topic = topic.decode('utf-8') except Exception as e: print("[JuNePager] Can't decode msg, delete: " + str(e)) if len(msg): sendMessage(topic, None) return print("[JuNePager] MQTT callback on topic {} with msg[{}]".format(topic, msg)) # Check if topic has sender. ackid = '' try: ackid = topic[len("/june/rules/pager/" + serviceID):] # Todo: # serviceLastSender = lookupNick(ackid) serviceLastSender = ackid except Exception as e: print("[JuNePager] No sender? Fine, no ack.") # Store/show the message and possible ack it. showMessage(msg) if len(ackid) == 4: ackMessage(ackid) print("[JuNePager] Clearing message from persistent mqtt.") sendMessage(topic, None) def sendMessage(topic, msg): global mqtt # Send a message, if msg is None it'll delete it - for persistent messages. print("[JuNePager] Sending message to topic {} with length {}".format(topic, len(msg) if msg is not None else 0)) justConnected = False try: print("[JuNePager] See if connection available....") mqtt.ping() except Exception as e: print("[JuNePager] Nope, connecting....") while not mqtt.connect(clean_session=True): # TODO: improve this. pass justConnected = True print("[JuNePager] Connected, publish message....") if msg is None: msg = '' mqtt.publish(topic, msg, retain=True, qos=1) if justConnected: print("[JuNePager] Disconnect again.") mqtt.disconnect() def ackMessage(sender): global serviceID # __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={}): # ID -> machine.unique_id() topic = "june/rules/pager/" + sender; msg = "ACK: " + serviceID + " received your message." sendMessage(topic, msg) def clearMessage(really): if really: print("[JuNePager] Clearing message on user request.") serviceMesssage = None badge.nvs_set_str('junepager', 'lastmessage', '') else: print("[JuNePager] Key release?.") def showMessage(msg): # ugfx.string(0, y-12, "Still about %s anyway" % t, "Roboto_Regular12", ugfx.BLACK) global serviceMessage global serviceFont global serviceLastSender badge.nvs_set_str('junepager', 'lastmessage', msg) serviceMessage = msg ugfx.fill_rounded_box(20, 40, 250, 80, 10, ugfx.BLACK) header = "PAGER\n{} says:".format(serviceLastSender if serviceLastSender is not None else "Anonymous") ugfx.string_box(25, 50, 240, 72, header + serviceMessage, serviceFont, ugfx.WHITE, ugfx.justifyCenter) ugfx.string_box(25, 70, 240, 72, "Hit A or B to clear", serviceFont, ugfx.WHITE, ugfx.justifyCenter) ugfx.flush() badge.leds_init() badge.leds_enable() badge.vibrator_init() badge.leds_send_data(bytes([255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255]), 24) badge.vibrator_activate(7) utime.sleep_ms(250) # TODO: oneshot timer to kill these leds. badge.leds_send_data(bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), 24) badge.leds_disable() def draw(y, sleep = 2): global serviceEnabled global serviceMessage global serviceLastSender global serviceFont global serviceTime global serviceID if (serviceEnabled<1): print("[JuNePager] Disabled. Jammer.") return [999999999, 0] ugfx.fill_rounded_box(240, 0, 40, 15, 2, ugfx.WHITE) ugfx.string(245, 2, serviceID, "Roboto_Regular12", ugfx.WHITE) serviceMessage = badge.nvs_get_str('junepager', 'lastmessage', None) if serviceMessage == None or len(serviceMessage) == 0: return [serviceTime*1000, 0] print("[JuNePager] " + " Y: " + str(y) + " Text: " + serviceMessage) # Align center ugfx.fill_rounded_box(20, 40, 250, 80, 10, ugfx.BLACK) header = "PAGER\n{} says:".format(serviceLastSender if serviceLastSender is not None else "Anonymous") ugfx.string_box(25, 50, 240, 72, header + serviceMessage, serviceFont, ugfx.WHITE, ugfx.justifyCenter) ugfx.string_box(25, 70, 240, 72, "Hit A or B to clear", serviceFont, ugfx.WHITE, ugfx.justifyCenter) # ugfx.flush() return [serviceTime*1000, 36]