Toggle Navigation
Hatchery
Eggs
Hackalot MQTT
__init__.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
__init__.py
raw
Content
import badge, esp, easywifi, ugfx, easydraw, deepsleep, sys, json, time, gc, virtualtimers from umqtt.simple import MQTTClient from random import randint badge.init() ugfx.init() badge.leds_init() badge.leds_enable() ugfx.set_lut(ugfx.LUT_FASTER) ugfx.clear(ugfx.WHITE) title = b"Hackalot MQTT" mqttTopic = b"#" stateTopic = "hackalot/state" spacestate = "unknown :(" numLeds = 6 fadeDuration = 5000 # ms per led maxBrightness = 96 # 0-255 grbw = (0, 0, 0, 0) leds = [grbw for i in range(numLeds)] font = 'Terminus14' with open('%s/hackalot_mqtt/%s.json' % (sys.path[1], font), 'r') as f: ugfx.fonts_load(json.loads(f.read())) f.close() gc.collect() termSize = [37, 6] lineHeight = 16 buf = [" " for _ in range(termSize[1])] cursor = [0, 0] lastFlush = 0 lastReset = 0 mbox = [] ugfx.flush(ugfx.LUT_FULL) badge.eink_busy_wait() easydraw.msg("Welcome!", title, True) # Message, title, reset easywifi.enable(True) if easywifi.state == False: easydraw.msg("Unable to connect to network!", "FAILURE") easydraw.msg("Waiting 5 seconds to try again!") badge.eink_busy_wait() esp.rtcmem_write_string('hackalot_mqtt') deepsleep.start_sleeping(5000) def showLeds(): byteStream = bytes(leds[numLeds - 1]) for i in range(2, numLeds + 1): byteStream = byteStream + bytes(leds[numLeds - i]) badge.leds_send_data(byteStream, numLeds * 4) def updateLeds(): leds.pop() leds.insert(0, grbw) showLeds() return fadeDuration def flushDisplay(): badge.eink_busy_wait() ugfx.flush() def drawDisplay(flush=True): ugfx.clear(ugfx.WHITE) ugfx.string(0, 0, "Hackalot is %s" % spacestate, 'Roboto_Regular22', ugfx.BLACK) for i in range(len(buf)): startHeight = 12 + lineHeight * (1 + i) if (i % 2): ugfx.string(0, startHeight, buf[i], font, ugfx.BLACK) else: width = ugfx.get_string_width(buf[i], font) ugfx.area(0, startHeight, width, lineHeight, ugfx.BLACK) ugfx.string(0, startHeight, buf[i], font, ugfx.WHITE) if (flush): flushDisplay() def rowDown(): global buf, cursor row = cursor[1] + 1 col = 0 if row >= termSize[1]: # scroll down row = termSize[1] - 1 buf.pop(0) buf.append(" ") cursor = [col, row] return cursor def newPayload(topic, message): global buf, cursor cursor[0] = 0 [col, row] = rowDown() if (row == 1): row = 0 buf[row] = "[%s]" % topic cursor = [col, row] [col, row] = rowDown() buf[row] = " %s" % message cursor = [col, row] drawDisplay() def callback(topic, msg): global mbox if len(msg) > 255: return if len(mbox) > 50: return gc.collect() mbox.append([topic.decode('UTF-8'), msg.decode('UTF-8')]) def update_display(): global spacestate, mbox if mbox: [topic, msg] = mbox.pop(0) gc.collect() if leds[0] == grbw: newLed = (randint(0, 1) * 2, randint(0, 1) * 2, randint(0, 1) * 2, 0) else: newLed = (min(leds[0][0] * 2, maxBrightness), min(leds[0][1] * 2, maxBrightness), min(leds[0][2] * 2, maxBrightness), 0) leds[0] = newLed showLeds() if topic == stateTopic: spacestate = msg newPayload(topic, msg) print("[%s] %s" % (topic, msg)) return 100 def restartApp(): esp.rtcmem_write_string('hackalot_mqtt') deepsleep.reboot() def mqtt(): try: client.check_msg() except: restartApp() return 100 def process(): global lastReset, lastFlush if time.ticks_diff(time.ticks_ms(), lastFlush) >= 30000: badge.eink_busy_wait() ugfx.flush(ugfx.LUT_FULL) lastFlush = time.ticks_ms() elif time.ticks_diff(time.ticks_ms(), lastReset) >= 120000: badge.eink_busy_wait() ugfx.flush(ugfx.GREYSCALE) lastReset = time.ticks_ms() return 100 # Initialize MQTT server = b"mqtt.space.hackalot.nl" client = MQTTClient(b"mqttBadge", server) client.set_callback(callback) client.connect() client.subscribe(mqttTopic) virtualtimers.new(100, update_display, True) virtualtimers.new(100, process, True) virtualtimers.new(100, mqtt, True) virtualtimers.new(fadeDuration, updateLeds, True) virtualtimers.activate(100)