Toggle Navigation
Hatchery
Eggs
Mqtt amp control
__init__.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
__init__.py
raw
Content
""" Tkkrlab mqtt amp control following can be done: start: toggles the amp power select: toggles between CD and AUX input. dpad up: volume up dpad down: volume down when ever a change is send any badge running this script will be updated to recent volume and mode values. """ import esp import ugfx import time import machine import easywifi import easydraw import virtualtimers import tasks.powermanagement as pm from umqtt.simple import MQTTClient # logging count = 0 debug_enable = False AMP_RAW = b"tkkrlab/amp/raw" AMP_MODE = b"tkkrlab/amp/mode" AMP_VOLUME = b"tkkrlab/amp/volume" AMP_VOLUME_CHANGED = b"tkkrlab/amp/volume_changed" AMP_MODE_CHANGED = b"tkkrlab/amp/mode_changed" AMP_VOLUME_CURRENT = b"tkkrlab/amp/volume_current" AMP_MODE_CURRENT = b"tkkrlab/amp/mode_current" AMP_POWER = (1 << 0) # Amp Power enable bit AMP_TAPE1 = (1 << 1) # Tape 1 enable bit AMP_TAPE2 = (1 << 2) # Tape 2 enable bit AMP_AUX_CD = (1 << 3) # togles between Aux and Cd VOLUME_MAX = 65 VOLUME_MIN = 0 amp_mode = AMP_POWER amp_volume = 20 # make the app the launcher. esp.rtcmem_write_string("mqtt_amp_control") ugfx.input_init() easywifi.enable() # query the most recent stored volume and mode. def mode_update(): client.publish(AMP_MODE_CURRENT, b"value") def volume_update(): client.publish(AMP_VOLUME_CURRENT, b"value") # update the volume and mode when anyone changes them def sub_callback(topic, msg): global amp_mode global amp_volume print((topic, msg)) if topic == AMP_VOLUME_CHANGED: amp_volume = int(msg) elif topic == AMP_MODE_CHANGED: amp_mode = int(msg) server = b"10.42.1.2" client_id = machine.unique_id() client = MQTTClient(bytes(str("AmpControl[%s]" % (client_id)).encode("ascii")), server) client.set_callback(sub_callback) client.connect() client.subscribe(AMP_VOLUME_CHANGED) client.subscribe(AMP_MODE_CHANGED) mode_update() volume_update() client.wait_msg() def info(string): if debug_enable: easydraw.msg("[%d][info] %s" % (count, string)) else: print("[%d][info] %s" % (count, string)) def debug_state(): global amp_mode global amp_volume info("") info("volume: %d" % amp_volume) info("amp_mode: %s" % str(bin(amp_mode))) if amp_mode & AMP_POWER: info("amp on.") else: info("amp off.") if amp_mode & AMP_AUX_CD: info("amp input: AUX") else: info("amp input: CD") def update_volume(): global AMP_VOLUME global amp_volume client.publish(AMP_VOLUME, str(amp_volume)) debug_state() def update_mode(): global AMP_MODE global amp_mode client.publish(AMP_MODE, str(amp_mode)) debug_state() def button_select(pushed): global amp_mode global update_state if pushed: amp_mode ^= AMP_AUX_CD if not pushed: update_mode() def button_start(pushed): global amp_mode global update_state if pushed: amp_mode ^= AMP_POWER if not pushed: update_mode() def button_up(pushed): global amp_volume global update_state if pushed: amp_volume += 5 if amp_volume > VOLUME_MAX: amp_volume = VOLUME_MAX update_volume() def button_down(pushed): global amp_volume global update_state if pushed: amp_volume -= 5 if amp_volume < VOLUME_MIN: amp_volume = VOLUME_MIN update_volume() ugfx.input_attach(ugfx.BTN_SELECT, button_select) ugfx.input_attach(ugfx.BTN_START, button_start) ugfx.input_attach(ugfx.JOY_UP, button_up) ugfx.input_attach(ugfx.JOY_DOWN, button_down) # badge.leds_init() # badge.leds_enable() # badge.leds_send_data(bytes([0x01] * (6 * 4))) def check_messages(): global count client.check_msg() # counting for log. count += 1 return 1000 def goto_sleep(idletime): easydraw.msg("", "Going To sleep!", True) ugfx.flush(ugfx.GREYSCALE) time.sleep(0.1) pm.feed() pm.callback(goto_sleep) virtualtimers.new(1000, check_messages, True) virtualtimers.activate(100)