Toggle Navigation
Hatchery
Eggs
MQTT button
__init__.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
__init__.py
raw
Content
import touchpads, keypad, display, wifi, audio, time, system, appconfig from umqtt.simple import MQTTClient # Get the settings from the settings menu settings = appconfig.get('mqtt_button', {'MQTT_server_ip': "Your IP", 'MQTT_topic': "/test"}) SERVER_IP = settings['MQTT_server_ip'] TOPIC = settings['MQTT_topic']+"/" # Clear the screen display.drawFill(0x000000) display.flush() # Key press handler def on_key(key_index, pressed): x, y = key_index % 4, int(key_index / 4) print('key event',key_index) if(pressed): c.publish(TOPIC+str(key_index), "on") display.drawPixel(x, y, 0x00FF00) display.flush() else: c.publish(TOPIC+str(key_index), "off") display.drawPixel(x, y, 0x000000) display.flush() # When the home key is pressed, disconnect everything and return to home def on_home(is_pressed): print('home button: ' + str(is_pressed)) if(is_pressed == 512): c.disconnect() wifi.disconnect() audio.play('/apps/mqtt_button/wifi_disconnect.mp3') time.sleep(6) system.launcher() # MQTT subscribe handler def sub_cb(topic, msg): print((topic, msg)) # Split the topic into a list, the last item in the list is the number # of the key. key_index = int(topic.decode('utf-8').split('/')[-1]) x, y = key_index % 4, int(key_index / 4) try: msg_hex = int(msg.decode('utf-8'), 16) display.drawPixel(x, y, msg_hex) display.flush() except: print("Not a Hex number") # Connect to Wi-Fi if not wifi.status(): audio.play('/cache/system/wifi_connecting.mp3') wifi.connect() wifi.wait() if not wifi.status(): audio.play('/cache/system/wifi_failed.mp3') time.sleep(6) system.launcher() # Setup MQTT and connect to it c = MQTTClient("umqtt_client", SERVER_IP) c.set_callback(sub_cb) # Set a last-will c.set_last_will(TOPIC+"available", "offline", retain=True) # Connect to MQTT server, fail if not possible and return. audio.play('/apps/mqtt_button/connect_mqtt.mp3') try: c.connect() # Make sure the device is "online" c.publish(TOPIC+"available", "online") # Subscibe to the "led" topic c.subscribe(TOPIC+"led/#") except: audio.play('/apps/mqtt_button/fail_connect_mqtt.mp3') time.sleep(6) system.launcher() # Configure the key press handler keypad.add_handler(on_key) touchpads.on(touchpads.HOME, on_home) try: while 1: c.check_msg() finally: c.disconnect()