Toggle Navigation
Hatchery
Eggs
Thermo-MQTT
__init__.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
__init__.py
raw
Content
import gc import time, badge, ugfx, wifi, onewire, ds18x20, machine, json, esp gc.collect() #try to maintain targettemp settings=json.loads('{"settings":{"mqtt_broker":"","mqtt_port":"1883","mqtt_user":"","mqtt_password":"","mqtt_clientid":"thermostat","offset":"-0.5","margin":"0.5","loglevel":"0","default_screen":"0","store_data":"0","openweatherkey":""}}') message="Have a nice day!" ctemp=20.0 ttemp=20.0 itemp=0 uptime=0 heater=0 tempsensor=0 # 0=internal,1=DS18B20,2=BME280 pc=0 burn=0 dotw=('','Zondag','Maandag','Dinsdag','Woensdag','Donderdag','Vrijdag','Zaterdag') def store_settings(): global settings for p in settings['settings']: badge.nvs_set_str('thermostat',p,settings['settings'][p]) def read_settings(): global settings settings['settings']['mqtt_clientid']=badge.nvs_get_str('thermostat','mqtt_clientid',settings['settings']['mqtt_clientid']) settings['settings']['mqtt_broker']=badge.nvs_get_str('thermostat','mqtt_broker',settings['settings']['mqtt_broker']) settings['settings']['mqtt_port']=badge.nvs_get_str('thermostat','mqtt_port',settings['settings']['mqtt_port']) settings['settings']['mqtt_user']=badge.nvs_get_str('thermostat','mqtt_user',settings['settings']['mqtt_user']) settings['settings']['mqtt_password']=badge.nvs_get_str('thermostat','mqtt_user',settings['settings']['mqtt_password']) def init_hw(): global ds, roms, uptime,bme,tempsensor badge.init() ugfx.init() #ugfx.input_init() ugfx.clear(ugfx.BLACK) ugfx.flush() ugfx.clear(ugfx.WHITE) ugfx.string(40, 30, "Initializing!" , "DejaVuSans20", ugfx.BLACK) ugfx.string(40, 70, "THERMO-MQTT!" , "DejaVuSans20", ugfx.BLACK) ugfx.flush() wifi.init() #p33 for relay switch #p12 for temp sensor badge.power_sdcard_enable() #p33=machine.Pin(33,machine.Pin.OUT) #p33.value(0) # create the onewire object dat = machine.Pin(4) ds = ds18x20.DS18X20(onewire.OneWire(dat)) # scan for DS18B20 devices on onewire the bus roms = ds.scan() if roms: tempsensor=1 print("Onewire ROM 0 Serial:%s"%( roms[0])) ds.read_temp(roms[0]) time.sleep(0.75) ds.read_temp(roms[0]) uptime=time.time() def wifi_up(): while not wifi.sta_if.isconnected(): time.sleep(0.1) pass print(wifi.sta_if.ifconfig()[0]) return wifi.sta_if.ifconfig()[0] def init_mqtt(): global mqtt port=1883 mqtt=0 user="None" password="None" if len(settings['settings']['mqtt_port'])>0: port=settings['settings']['mqtt_port'] if len(settings['settings']['mqtt_user'])>0 and len(settings['settings']['mqtt_password'])>0: user=settings['settings']['mqtt_password'] password=settings['settings']['mqtt_port'] if len(settings['settings']['mqtt_broker'])>0: from umqtt.simple import MQTTClient mqtt = MQTTClient('badge', settings['settings']['mqtt_broker'],port=port,user=user,password=password) mqtt.set_callback(mqtt_cb) mqtt.connect() if mqtt: print("Connected to MQTT Server as %s!"%(settings['settings']['mqtt_clientid'])) mqtt.subscribe(b"thermostat/#") else: print("Could not connect to MQTT Broker %s:%s with user %s !"%(settings['settings']['mqtt_broker'],port,user)) def mqtt_cb(topic, msg): global ttemp,message,burn #global leds print((topic, msg)) #lets strip all the extra characters mqttmessage = (msg.decode('utf-8')) mqttmessage = mqttmessage.replace('\x08', '') mqttmessage = mqttmessage.replace('\x05', '') if(topic.decode('utf-8')=='thermostat/targettemp'): ttemp = mqttmessage if(topic.decode('utf-8')=='thermostat/message'): message = mqttmessage if(topic.decode('utf-8')=='thermostat/relay'): burn = mqttmessage def mqtt_publish(): global itemp,ctemp,mqtt if mqtt: try: mqtt.publish("badgesens/cpu", itemp) mqtt.publish("badgesens/sensor", ctemp) mqtt.publish("badgesens/type", tempsensor) mqtt.publish("badgesens/uptime", uptime) except: #did we lose our mqtt connection! let's try to connect again print("Something went wrong publsihing values!") #init_mqtt() def main_loop(): global ctemp,ttemp, ds, roms, settings,itemp,bme print("Target temp thread started!") ugfx.input_init() ugfx.flush() while True: itemp=(esp.temperature_sens_read()-32)/1.8 if roms: ds.convert_temp() ctemp=float(ds.read_temp(roms[0]))+float(settings['settings']['offset']) else: ctemp=itemp mqtt_publish() #p33.value(burn) #do screen stuff ugfx.input_attach(ugfx.BTN_START, lambda pushed: ugfx.flush() if pushed else False) ugfx.input_attach(ugfx.BTN_SELECT, lambda pushed: ugfx.flush() if pushed else False) ugfx.input_attach(ugfx.JOY_UP, lambda pushed: new_temp(0.5) if pushed else False) ugfx.input_attach(ugfx.JOY_DOWN, lambda pushed: new_temp(-0.5) if pushed else False) #ugfx.input_attach(ugfx.JOY_LEFT, lambda pushed: switch_screen(-1) if pushed else False) #ugfx.input_attach(ugfx.JOY_RIGHT, lambda pushed: switch_screen(1) if pushed else False) #ugfx.input_attach(ugfx.BTN_A, lambda pushed: get_TIL() if pushed else False) ugfx.flush() show_main_screen() time.sleep(10) gc.collect() def show_main_screen(): global ctemp,ttemp,burn,message,dotw tstatus=('.zZ','!!!') moty=('','Jan','Feb','Mar','Apr','Mei','Jun','Jul','Aug','Sep','Oct','Nov','Dec') now=time.localtime() #0year,1 month,2 day,3 hour,4 minute,5 sec,6 dayoftheweek,7 dayoftheyear if now[4]<10: mytime="%d:0%d"%(now[3],now[4]) else: mytime="%d:%d"%(now[3],now[4]) ugfx.clear(ugfx.WHITE) ugfx.string(4, 0, "Huidige temperatuur is %.1f, Gewenste temperatuur is %.1f" % (ctemp,ttemp) , "Roboto_BlackItalic12", ugfx.BLACK) ugfx.string(4, 28, "Het is %s %d %s %d " % (dotw[now[6]],now[2],moty[now[1]],now[0],) , "Roboto_BlackItalic12", ugfx.BLACK) ugfx.string(30, 62, "%.1f C %s [|||]=%s" %(ctemp,mytime,tstatus[burn]), "Roboto_Black22", ugfx.BLACK) ugfx.string(4, 90, " %s" % (message) , "Roboto_BlackItalic12", ugfx.BLACK) ugfx.flush() def new_temp(delta): global ttemp #print("Temp is %.1f en delta is %.1f"%(ttemp,delta)) ttemp+=float(delta) #ttemp=round(ttemp,1) ugfx.clear(ugfx.WHITE) ugfx.string(80, 35, "%.1f" %(ttemp), "DejaVuSans20", ugfx.BLACK) if ttemp>24.0: if ttemp<26.0: ugfx.string(80, 55, "Een beetje warm???", "DejaVuSans20", ugfx.BLACK) else: ugfx.string(80, 55, "Doe niet zo gek!!", "DejaVuSans20", ugfx.BLACK) ugfx.flush() time.sleep_ms(200) def progress(): global pc ugfx.string(pc,100, '#', "Roboto_BlackItalic12", ugfx.BLACK) ugfx.flush() pc+=10 try: gc.collect() print("Free mem: %d" %(gc.mem_free())) print("Initializing") init_hw() progress()#1 wifi_up() progress()#2 read_settings() progress()#3 init_mqtt() progress()#4 except Exception as e: #probably a bug: ugfx.string(50, 50,str(e), "Roboto_BlackItalic12", ugfx.BLACK) print("Picked up an error inp phase 1 : %s"%(str(e))) ugfx.flush() try: gc.collect() main_loop() print("All systems go!") time.sleep(5) gc.collect() print("Free mem: %d" %(gc.mem_free())) except Exception as e: #probably a bug: ugfx.string(50, 50,str(e), "Roboto_BlackItalic12", ugfx.BLACK) print("Picked up an error phase 3 : %s"%(str(e))) ugfx.flush()