Toggle Navigation
Hatchery
Eggs
Tkkrlab Mpd Control
__init__.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
__init__.py
raw
Content
# Author: Duality/Robert # Github: Duality4y # Mail: robertvdtuuk@gmail.com # Maintainer: Who # Brief: # This code is for controling the Mpd setup in Tkkrlab # Through the touch interface on the hackerhotel2019 badge. # (Or the sha badge) import esp import ugfx import json import badge # import easywifi import wifi import easydraw import virtualtimers import tasks.powermanagement as pm import orientation import version from umqtt.simple import MQTTClient badge.init() ugfx.init() orientation.landscape() ugfx.input_init() # easywifi.enable() wifi.connect() class MpdSong(object): def __init__(self, **entries): self.file = entries.get("file", None) self.LastModified = entries.get("Last-Modified", None) self.Time = entries.get("Time", None) self.duration = entries.get("duration", None) self.Pos = entries.get("Pos", None) self.Id = entries.get("Id", None) class MpdStatus(object): def __init__(self, **entries): self.volume = entries.get('volume', None) self.repeat = entries.get('repeat', None) self.random = entries.get('random', None) self.single = entries.get('single', None) self.consume = entries.get('consume', None) self.playlist = entries.get('playlist', None) self.playlistlength = entries.get('playlistlength', None) self.mixrampdb = entries.get('mixrampdb', None) self.state = entries.get('state', None) self.song = entries.get('song', None) self.songid = entries.get('songid', None) self.time = entries.get('time', None) self.elapsed = entries.get('elapsed', None) self.bitrate = entries.get('bitrate', None) self.duration = entries.get('duration', None) self.audio = entries.get('audio', None) self.nextsong = entries.get('nextsong', None) self.nextsongid = entries.get('nextsongid', None) class TkkrlabMpd(object): def __init__(self): self.mpdTopic = b"tkkrlab/mpd" self.volumeTopic = b"tkkrlab/mpd/volume" self.volumeSetTopic = b"tkkrlab/mpd/volume/set" self.commandTopic = b"tkkrlab/mpd/cmd" self.statusTopic = b"tkkrlab/mpd/status" self.playCmd = b"play" self.pauseCmd = b"pause" self.nextCmd = b"next" self.previousCmd = b"previous" self.server = b"10.42.1.2" self.client = MQTTClient(b"TkkrlabMpd-01", self.server) self.client.set_callback(self.subCallback) self.client.connect() self.client.subscribe(self.mpdTopic) self.client.subscribe(self.volumeTopic) self.client.subscribe(self.statusTopic) virtualtimers.new(100, self.process, True) virtualtimers.activate(100) # volume information self.volume = 0 self.volumeMinimum = 0 self.volumeMaximum = 100 # set button callbacks ugfx.input_attach(ugfx.JOY_UP, self.incVolumeButton) ugfx.input_attach(ugfx.JOY_DOWN, self.decVolumeButton) ugfx.input_attach(ugfx.BTN_START, self.pauseButton) ugfx.input_attach(ugfx.JOY_LEFT, self.previousButton) ugfx.input_attach(ugfx.JOY_RIGHT, self.nextButton) # current mpd state information self.stateLoaded = False self.mpdState = None self.song = None self.status = None self.updateInfo = False self.fontHeight = 19 self.outputLines = [] def drawLine(self, x, y, text): ugfx.string(0, y * self.fontHeight, text, version.font_default, ugfx.BLACK) def drawOutput(self): for i, line in enumerate(self.outputLines): self.drawLine(0, i, line) ugfx.flush(ugfx.LUT_FASTER) def subCallback(self, topic, msg): if(topic == self.statusTopic): self.mpdState = json.loads(msg) self.song = MpdSong(**self.mpdState['song']) self.status = MpdStatus(**self.mpdState['status']) # print(self.mpdState['status']) # print() # print(self.mpdState['song']) # print() self.stateLoaded = True self.updateInfo = True else: print((topic, msg)) def drawMpdInfo(self): self.outputLines = [] ugfx.clear(ugfx.WHITE) if self.song.file: lines = easydraw.lineSplit("Song: %s" % self.song.file) for line in lines: self.outputLines.append(line) self.outputLines.append("Left/Right = Previous/Next") self.outputLines.append("Up/Down = Volume Up/Down") self.outputLines.append("Start = Play/Pause") if self.status.volume: self.volume = int(self.status.volume) self.outputLines.append("Volume: %d%%" % self.volume) self.drawOutput() def process(self): self.client.check_msg() if self.updateInfo: self.updateInfo = False self.drawMpdInfo() return 100 def play(self): if(self.stateLoaded == False): return self.client.publish(self.commandTopic, self.playCmd) def pause(self): if(self.stateLoaded == False): return self.client.publish(self.commandTopic, self.pauseCmd) def next(self): if(self.stateLoaded == False): return self.client.publish(self.commandTopic, self.nextCmd) def previous(self): if(self.stateLoaded == False): return self.client.publish(self.commandTopic, self.previousCmd) def setVolume(self, value): if(self.stateLoaded == False): return if(value > self.volumeMaximum): return if(value < self.volumeMinimum): return self.volume = value def incVolume(self): if(self.stateLoaded == False): return self.volume += 2 if(self.volume >= self.volumeMaximum): self.volume = self.volumeMaximum self.client.publish(self.volumeSetTopic, str(self.volume)) def decVolume(self): if(self.stateLoaded == False): return self.volume -= 2 if(self.volume <= self.volumeMinimum): self.volume = self.volumeMinimum self.client.publish(self.volumeSetTopic, str(self.volume)) def playButton(self, pressed): pm.feed() if pressed: self.play() def pauseButton(self, pressed): pm.feed() if pressed: self.pause() def previousButton(self, pressed): pm.feed() if pressed: self.previous() def nextButton(self, pressed): pm.feed() if pressed: self.next() def incVolumeButton(self, pressed): pm.feed() if pressed: self.incVolume() def decVolumeButton(self, pressed): pm.feed() if pressed: self.decVolume() def gotoSleep(idletime): print("Going to sleep!") easydraw("", "Going to sleep!", True) ugfx.flush(ugfx.GREYSCALE) time.sleep(0.1) if wifi.wait(duration=20, showStatus=True): tkkrlabmpd = TkkrlabMpd() else: easydraw.msg("Failed to connect to wifi.") esp.rtcmem_write_string("TkkrlabMpd") pm.feed() pm.callback(gotoSleep) """ ugfx.clear(ugfx.WHITE) displaytext = "Hello, World!" ugfx.string(0, 0, displaytext, "PermanentMarker36", ugfx.BLACK) def buttonPressed(pressed): if(pressed): ugfx.string(0, 40, "Button Pressed.", "PermanentMarker36", ugfx.BLACK) ugfx.flush(); ugfx.input_attach(ugfx.BTN_A, lambda pressed: buttonPressed(pressed)) ugfx.flush() """