This commit is contained in:
tixi
2018-09-24 17:56:47 +02:00
parent 5e1063fe01
commit a1feddacc4
2 changed files with 162 additions and 96 deletions

View File

@@ -4,12 +4,12 @@ A Domoticz plugin to manage Tuya Smart Plug
## ONLY TESTED FOR Raspberry Pi
With Python version 3.5 & Domoticz version 4.9700 (stable) and 4.9999 (beta)
With Python version 3.5 & Domoticz version 4.9999 (beta)
## Prerequisites
This plugin is based on the latest pytuya Python library. For the installation of this library,
follow the Installation guide below since pip will not install the latest version commited few days ago.
follow the Installation guide below.
See [`https://github.com/clach04/python-tuya/`](https://github.com/clach04/python-tuya/) for more information.
For the pytuya Python library, you need pycrypto. pycrypto can be installed with pip:
@@ -68,11 +68,8 @@ sudo /etc/init.d/domoticz.sh restart
| **IP address** | IP of the Smart Plug eg. 192.168.1.231 |
| **DevID** | devID of the Smart Plug |
| **Local Key** | Local Key of the Smart Plug |
| **Replay** | default is Yes |
| **Debug** | default is False |
Replay indicates that a command (on/off) will be replay in case of failure except when the Smart Plug is not connected.
## DevID & Local Key Extraction
All the information can be found here:
@@ -80,5 +77,5 @@ All the information can be found here:
## Acknowledgements
* Special thanks for all the hard work of [codetheweb](https://github.com/codetheweb/), [clach04](https://github.com/clach04), [blackrozes](https://github.com/blackrozes), [jepsonrob](https://github.com/jepsonrob), and all the other contributers on [tuyapi](https://github.com/codetheweb/tuyapi) and [python-tuya](https://github.com/clach04/python-tuya) who have made communicating to Tuya devices possible with open source code.
* Special thanks for all the hard work of [clach04](https://github.com/clach04), [codetheweb](https://github.com/codetheweb/) and all the other contributers on [python-tuya](https://github.com/clach04/python-tuya) and [tuyapi](https://github.com/codetheweb/tuyapi) who have made communicating to Tuya devices possible with open source code.
* Domoticz team

249
plugin.py
View File

@@ -26,17 +26,11 @@
########################################################################################
"""
<plugin key="tixi_tuya_smartplug_plugin" name="Tuya SmartPlug" author="tixi" version="1.0.0" externallink=" https://github.com/tixi/Domoticz-Tuya-SmartPlug-Plugin">
<plugin key="tixi_tuya_smartplug_plugin" name="Tuya SmartPlug" author="tixi" version="2.0.0" externallink=" https://github.com/tixi/Domoticz-Tuya-SmartPlug-Plugin">
<params>
<param field="Address" label="IP address" width="200px" required="true"/>
<param field="Mode1" label="DevID" width="200px" required="true"/>
<param field="Mode2" label="Local Key" width="200px" required="true"/>
<param field="Mode3" label="Replay" width="75px">
<options>
<option label="Yes" value="Yes" default="true"/>
<option label="No" value="No" />
</options>
</param>
<param field="Mode6" label="Debug" width="75px">
<options>
<option label="True" value="Debug"/>
@@ -48,116 +42,175 @@
"""
import Domoticz
import socket #needed for socket.timeout exception
import pytuya
import json
class BasePlugin:
__UNIT = 1
__MINUTE = 6 #heartbeat is called every 10 seconds and executed every 60 seconds
__UNIT = 1
__HB_BASE_FREQ = 2
def __init__(self):
self.__address = None #ip address of the smartplug
self.__devID = None #devID of the smartplug
self.__localKey = None #localKey of the smartplug
self.__device = None #pytuya object of the smartplug
self.__replay = True #replay mode
self.__last_cmd_for_replay = None #last command for replay (None/"On"/"Off")
self.__runAgain = self.__MINUTE #heartbeat frequency
return
#check the current status (on/off) of the smartplug to update the device in Domoticz if needed
def check_status(self):
try:
data = self.__device.status()
if(data['dps']['1']):
UpdateDevice(self.__UNIT, 1, "On")
else:
UpdateDevice(self.__UNIT, 0, "Off")
except (ConnectionRefusedError, ConnectionResetError):
Domoticz.Log("A problem occurs while connecting to the Smart Plug")
Domoticz.Debug("Check if the Tuya app is closed on your smartphone")
#if you observe this message only few times in debug logs it works correctly
self.__runAgain = 0
except (socket.timeout, OSError):
Domoticz.Log("Smart Plug not reachable")
Domoticz.Debug("Check if the Smart Plug is connected to the same wifi network")
#execute a command
def exec_cmd(self,Command):
self.__address = None #ip address of the smartplug
self.__devID = None #devID of the smartplug
self.__localKey = None #localKey of the smartplug
self.__device = None #pytuya object of the smartplug
self.__runAgain = self.__HB_BASE_FREQ #heartbeat frequency (20 seconds)
self.__connection = None #connection to the tuya plug
self.__last_cmd = None #last command (None/"On"/"Off")
return
if(self.__replay):
self.__last_cmd_for_replay=Command
try:
if (Command == 'On'):
self.__device.turn_on()
UpdateDevice(self.__UNIT, 1, "On")
else: #Command=='Off'
self.__device.turn_off()
UpdateDevice(self.__UNIT, 0, "Off")
self.__last_cmd_for_replay = None #if no exception no need for replay
except (ConnectionRefusedError, ConnectionResetError):
Domoticz.Log("A problem occurs while connecting to the Smart Plug")
Domoticz.Debug("Check if the Tuya app is closed on your smartphone")
#if you observe this message only few times in debug logs it works correctly (replay while fix it)
if(self.__replay):
Domoticz.Debug("Command for replay: " + Command)
except (socket.timeout, OSError):
Domoticz.Log("Smart Plug not reachable")
Domoticz.Debug("Check if the Smart Plug is connected to the same wifi network")
self.__last_cmd_for_replay = None #no replay if the smartplug is not connected
#onStart Domoticz function
def onStart(self):
Domoticz.Debug("onStart called")
# Debug mode
if Parameters["Mode6"] == "Debug":
if(Parameters["Mode6"] == "Debug"):
Domoticz.Debugging(1)
Domoticz.Debug("onStart called")
else:
Domoticz.Debugging(0)
#get parameters
self.__address = Parameters["Address"]
self.__devID = Parameters["Mode1"]
self.__address = Parameters["Address"]
self.__devID = Parameters["Mode1"]
self.__localKey = Parameters["Mode2"]
if(Parameters["Mode3"]=="No"):
self.__replay=False
Domoticz.Log("Replay mode not activated")
#initialize the defined device in Domoticz
if (len(Devices) == 0):
Domoticz.Device(Name="Tuya SmartPlug", Unit=self.__UNIT, TypeName="Switch").Create()
Domoticz.Log("Tuya SmartPlug Device created.")
#create the pytuya object to communicate with the smartplug
#create the pytuya object
self.__device = pytuya.OutletDevice(self.__devID, self.__address, self.__localKey)
#set the status
self.check_status()
#start the connection
self.__last_cmd = 'status'
self.__connection = Domoticz.Connection(Name="Tuya", Transport="TCP/IP", Address=self.__address, Port="6668")
self.__connection.Connect()
def onConnect(self, Connection, Status, Description):
if (Connection == self.__connection):
if (Status == 0):
Domoticz.Debug("Connected successfully to: "+Connection.Address+":"+Connection.Port)
if(self.__last_cmd != None):
self.__command_to_execute(self.__last_cmd)
else:
self.__connection.Disconnect()
self.__connection.Connect()
def __extract_status(self, Data):
""" Returns a tuple (bool,bool)
first: set to True if an error occur and False otherwise
second: set to True if the device is on and to False if the device is off
second is irrelevant if first is True
"""
start=Data.find(b'{"devId')
if(start==-1):
return (True,"")
result = Data[start:] #in 2 steps to deal with the case where '}}' is present before {"devId'
end=result.find(b'}}')
if(end==-1):
return (True,"")
end=end+2
result = result[:end]
if not isinstance(result, str):
result = result.decode()
try:
result = json.loads(result)
return (False,result['dps']['1'])
except (JSONError, KeyError) as e:
return (True,"")
def onMessage(self, Connection, Data):
Domoticz.Debug("onMessage called: " + Connection.Address + ":" + Connection.Port +" "+ str(Data))
if (Connection == self.__connection):
if(self.__last_cmd == None):#skip nothing was waiting
return
(error,is_on) = self.__extract_status(Data)
if(error):
self.__command_to_execute(self.__last_cmd)
return
if(self.__last_cmd == 'status'):
self.__last_cmd = None
if(is_on):
UpdateDevice(self.__UNIT, 1, "On")
if(self.__last_cmd == 'On'):
self.__last_cmd = None
else:
UpdateDevice(self.__UNIT, 0, "Off")
if(self.__last_cmd == 'Off'):
self.__last_cmd = None
if(self.__last_cmd != None):
self.__command_to_execute(self.__last_cmd)
def __command_to_execute(self,Command):
if(Command == 'status'):
if(self.__last_cmd == None):
self.__last_cmd = Command
else:#On/Off
self.__last_cmd = Command
if(self.__connection.Connected()):
if(Command == 'On'):
payload = self.__device.generate_payload('set', {'1':True})
self.__connection.Send(payload)
status_request = True
elif(Command == 'Off'):
payload = self.__device.generate_payload('set', {'1':False})
self.__connection.Send(payload)
status_request = True
elif(Command == 'status'):
status_request = True
else:
Domoticz.Error("Unknow Command received")
self.__last_cmd = None
status_request = False
if(status_request):
payload=self.__device.generate_payload('status')
self.__connection.Send(payload)
else:
self.__connection.Connect()
#onCommand Domoticz function
def onCommand(self, Unit, Command, Level, Hue):
Domoticz.Debug("onCommand called for Unit " + str(Unit) + ": Parameter '" + str(Command) + "', Level: " + str(Level))
self.exec_cmd(Command)
Domoticz.Debug("onCommand called for Unit " + str(Unit) + ": Parameter '" + str(Command))
self.__command_to_execute(Command)
#onHeartbeat Domoticz function
def onHeartbeat(self):
Domoticz.Debug("onHeartbeat called")
if(self.__last_cmd_for_replay != None):#replay
Domoticz.Debug("Replay: " + self.__last_cmd_for_replay)
self.exec_cmd(self.__last_cmd_for_replay)
else:#normal case
self.__runAgain -= 1
if self.__runAgain == 0:
self.__runAgain = self.__MINUTE
self.check_status()
def onDisconnect(self, Connection):
Domoticz.Error("Disconnected from: "+Connection.Address+":"+Connection.Port)
#if (Connection == self.__connection):
#self.__connection.Connect()
def onHeartbeat(self):
self.__runAgain -= 1
if(self.__runAgain == 0):
self.__runAgain = self.__HB_BASE_FREQ
self.__command_to_execute('status')
#onStop Domoticz function
def onStop(self):
self.__device = None
self.__last_cmd = None
self.__connection.Disconnect()
self.__connection = None
global _plugin
_plugin = BasePlugin()
@@ -166,10 +219,26 @@ def onStart():
global _plugin
_plugin.onStart()
def onStop():
global _plugin
_plugin.onStop()
def onConnect(Connection, Status, Description):
global _plugin
_plugin.onConnect(Connection, Status, Description)
def onMessage(Connection, Data):
global _plugin
_plugin.onMessage(Connection, Data)
def onCommand(Unit, Command, Level, Hue):
global _plugin
_plugin.onCommand(Unit, Command, Level, Hue)
def onDisconnect(Connection):
global _plugin
_plugin.onDisconnect(Connection)
def onHeartbeat():
global _plugin
_plugin.onHeartbeat()