Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
long_description = fh.read()

setup(name='sonoff-python',
version='0.2.1',
version='0.2.2',
description='Make use of your sonoff smart switches without flashing them via the cloud APIs',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
220 changes: 129 additions & 91 deletions sonoff/sonoff.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# The domain of your component. Should be equal to the name of your component.
import logging, time, hmac, hashlib, random, base64, json, socket, requests, re, uuid
import logging, time, hmac, hashlib, random, base64, socket, requests, re, uuid
from json import dumps, loads
from datetime import timedelta
from websocket import create_connection

SCAN_INTERVAL = timedelta(seconds=60)
HTTP_MOVED_PERMANENTLY, HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, HTTP_NOT_FOUND = 301,400,401,404
Expand All @@ -13,8 +15,15 @@ def gen_nonce(length=8):
return ''.join([str(random.randint(0, 9)) for i in range(length)])

class Sonoff():
# def __init__(self, hass, email, password, api_region, grace_period):
def __init__(self, username, password, api_region, user_apikey=None, bearer_token=None, grace_period=600):
def __init__(
self,
username,
password,
api_region,
user_apikey=None,
bearer_token=None,
grace_period=600
):

self._username = username
self._password = password
Expand All @@ -36,6 +45,7 @@ def __init__(self, username, password, api_region, user_apikey=None, bearer_toke
self._os = 'iOS'
self._rom_version = '11.1.2'
self._version = '6'
self._imei = str(uuid.uuid4())

if user_apikey and bearer_token:
self.do_reconnect()
Expand All @@ -44,7 +54,7 @@ def __init__(self, username, password, api_region, user_apikey=None, bearer_toke

def do_reconnect(self):
self._headers = {
'Authorization' : 'Bearer ' + self._bearer_token,
'Authorization' : f'Bearer {self._bearer_token}',
'Content-Type' : 'application/json;charset=UTF-8'
}

Expand All @@ -61,14 +71,14 @@ def do_login(self):

# reset the grace period
self._skipped_login = 0

app_details = {
'password' : self._password,
'version' : self._version,
'ts' : int(time.time()),
'nonce' : gen_nonce(15),
'ts' : str(time.time()).replace('.', '')[:10],
'nonce' : gen_nonce(),
'appid' : self._appid,
'imei' : str(uuid.uuid4()),
'imei' : self._imei,
'os' : self._os,
'model' : self._model,
'romVersion': self._rom_version,
Expand All @@ -83,42 +93,48 @@ def do_login(self):
decryptedAppSecret = b'6Nz4n0xA8s8qdxQf2GqurZj2Fs55FUvM'

hex_dig = hmac.new(
decryptedAppSecret,
str.encode(json.dumps(app_details)),
decryptedAppSecret,
str.encode(dumps(app_details)),
digestmod=hashlib.sha256).digest()

sign = base64.b64encode(hex_dig).decode()

self._headers = {
'Authorization' : 'Sign ' + sign,
'Content-Type' : 'application/json;charset=UTF-8'
}

r = requests.post('https://{}-api.coolkit.cc:8080/api/user/login'.format(self._api_region),
headers=self._headers, json=app_details)
r = requests.post(
f'https://{self._api_region}-api.coolkit.cc:8080/api/user/login',
headers=self._headers, json=app_details
)

resp = r.json()

# get a new region to login
if 'error' in resp and 'region' in resp and resp['error'] == HTTP_MOVED_PERMANENTLY:
self._api_region = resp['region']
try:
if 'region' in resp and resp['error'] == HTTP_MOVED_PERMANENTLY:
self._api_region = resp['region']

_LOGGER.warning("found new region: >>> %s <<< (you should change api_region option to this value in configuration.yaml)", self._api_region)
_LOGGER.warning("found new region: >>> %s <<< (you should change api_region option to this value in configuration.yaml)", self._api_region)

# re-login using the new localized endpoint
self.do_login()
return

elif 'error' in resp and resp['error'] in [HTTP_NOT_FOUND, HTTP_BAD_REQUEST]:
# (most likely) login with +86... phone number and region != cn
if '@' not in self._username and self._api_region != 'cn':
self._api_region = 'cn'
# re-login using the new localized endpoint
self.do_login()
return

else:
_LOGGER.error("Couldn't authenticate using the provided credentials!")
elif resp['error'] in [HTTP_NOT_FOUND, HTTP_BAD_REQUEST]:
# (most likely) login with +86... phone number and region != cn
if '@' not in self._username and self._api_region != 'cn':
self._api_region = 'cn'
self.do_login()

return
else:
_LOGGER.error("Couldn't authenticate using the provided credentials!")

return
except KeyError:
# No error key in response
pass

self._bearer_token = resp['at']
self._user_apikey = resp['user']['apikey']
Expand All @@ -128,17 +144,23 @@ def do_login(self):
if not self._wshost:
self.set_wshost()

self.update_devices() # to get the devices list
self.update_devices() # to get the devices list

def set_wshost(self):
r = requests.post('https://%s-disp.coolkit.cc:8080/dispatch/app' % self._api_region, headers=self._headers)
r = requests.post(
f'https://{self._api_region}-disp.coolkit.cc:8080/dispatch/app',
headers=self._headers
)
resp = r.json()

if 'error' in resp and resp['error'] == 0 and 'domain' in resp:
self._wshost = resp['domain']
_LOGGER.info("Found websocket address: %s", self._wshost)
else:
raise Exception('No websocket domain')
try:
if resp['error'] == 0 and 'domain' in resp:
self._wshost = resp['domain']
_LOGGER.info("Found websocket address: %s", self._wshost)
else:
raise Exception('No websocket domain')
except KeyError:
pass

def is_grace_period(self):
grace_time_elapsed = self._skipped_login * int(SCAN_INTERVAL.total_seconds())
Expand All @@ -156,52 +178,63 @@ def update_devices(self):
return []

# we are in the grace period, no updates to the devices
if self._skipped_login and self.is_grace_period():
_LOGGER.info("Grace period active")
if self._skipped_login and self.is_grace_period():
_LOGGER.info("Grace period active")
return self._devices

query_params = {
'lang': 'en',
'apiKey': self.get_user_apikey(),
'getTags': 1,
'version': self._version,
'ts': int(time.time()),
'nonce': gen_nonce(15),
'ts': str(time.time()).replace('.', '')[:10],
'nonce': gen_nonce(),
'appid': self._appid,
'imei': str(uuid.uuid4()),
'imei': self._imei,
'os': self._os,
'model': self._model,
'romVersion': self._rom_version,
'appVersion': self._app_version
}
r = requests.get('https://{}-api.coolkit.cc:8080/api/user/device'.format(self._api_region),
params=query_params,
headers=self._headers)

r = requests.get(
f'https://{self._api_region}-api.coolkit.cc:8080/api/user/device',
params=query_params,
headers=self._headers
)

resp = r.json()
if 'error' in resp and resp['error'] in [HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED]:
# @IMPROVE add maybe a service call / switch to deactivate sonoff component
if self.is_grace_period():
_LOGGER.warning("Grace period activated!")
try:
if resp['error'] in [HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED]:
# @IMPROVE add maybe a service call / switch to deactivate sonoff component
if self.is_grace_period():
_LOGGER.warning("Grace period activated!")

# return the current (and possible old) state of devices
# in this period any change made with the mobile app (on/off) won't be shown in HA
return self._devices
# return the current (and possible old) state of devices
# in this period any change made with the mobile app (on/off) won't be shown in HA
return self._devices

_LOGGER.info("Re-login component")
self.do_login()
_LOGGER.info("Re-login component")
self.do_login()
except KeyError:
pass

self._devices = resp.get('devicelist', [])
return self._devices

def get_devices(self, force_update = False):
if force_update:
if force_update:
return self.update_devices()

return self._devices

def get_device(self, deviceid):
for device in self.get_devices():
if 'deviceid' in device and device['deviceid'] == deviceid:
return device
try:
if device['deviceid'] == deviceid:
return device
except KeyError:
return None

def get_api_region(self):
return self._api_region
Expand All @@ -214,40 +247,44 @@ def get_user_apikey(self):

def _get_ws(self):
"""Check if the websocket is setup and connected."""
try:
create_connection
except:
from websocket import create_connection

if self._ws is None:
if not self._ws:
try:
self._ws = create_connection(('wss://{}:8080/api/ws'.format(self._wshost)), timeout=10)
self._ws = create_connection(
f'wss://{self._wshost}:8080/api/ws',
timeout=10
)

payload = {
'action' : "userOnline",
'userAgent' : 'app',
'version' : 6,
'nonce' : gen_nonce(15),
'apkVesrion': "1.8",
'os' : 'ios',
'nonce' : gen_nonce(),
'apkVersion': "1.8",
'appid': self._appid,
'os' : self._os,
'at' : self.get_bearer_token(),
'apikey' : self.get_user_apikey(),
'ts' : str(int(time.time())),
'model' : 'iPhone10,6',
'romVersion': '11.1.2',
'sequence' : str(time.time()).replace('.','')
'ts' : str(time.time()).replace('.', '')[:10],
'imei' : self._imei,
'model' : self._model,
'romVersion': self._rom_version,
'sequence' : str(time.time()).replace('.', '')[:13]
}

self._ws.send(json.dumps(payload))
wsresp = self._ws.recv()
# _LOGGER.error("open socket: %s", wsresp)
self._ws.send(dumps(payload))
wsresp = loads(self._ws.recv())
try:
if wsresp['error'] != 0:
_LOGGER.error(f'open socket {wsresp}')
except KeyError:
pass

except (socket.timeout, ConnectionRefusedError, ConnectionResetError):
_LOGGER.error('failed to create the websocket')
self._ws = None

return self._ws

def switch(self, new_state, deviceid, outlet=None):
"""Switch on or off."""

Expand All @@ -257,7 +294,6 @@ def switch(self, new_state, deviceid, outlet=None):
return (not new_state)

self._ws = self._get_ws()

if not self._ws:
_LOGGER.warning('invalid websocket, state cannot be changed')
return (not new_state)
Expand All @@ -279,7 +315,7 @@ def switch(self, new_state, deviceid, outlet=None):
return False

# the payload rule is like this:
# normal device (non-shared)
# normal device (non-shared)
# apikey = login apikey (= device apikey too)
#
# shared device
Expand All @@ -299,7 +335,7 @@ def switch(self, new_state, deviceid, outlet=None):
'params' : params,
'apikey' : device['apikey'],
'deviceid' : str(deviceid),
'sequence' : str(time.time()).replace('.',''),
'sequence' : str(time.time()).replace('.', '')[:13],
'controlType' : device['params']['controlType'] if 'controlType' in device['params'] else 4,
'ts' : 0
}
Expand All @@ -308,24 +344,26 @@ def switch(self, new_state, deviceid, outlet=None):
if device['apikey'] != self.get_user_apikey():
payload['selfApikey'] = self.get_user_apikey()

self._ws.send(json.dumps(payload))
wsresp = self._ws.recv()
# _LOGGER.debug("switch socket: %s", wsresp)

self._ws.close() # no need to keep websocket open (for now)
self._ws = None

# set also te pseudo-internal state of the device until the real refresh kicks in
for idx, device in enumerate(self._devices):
if device['deviceid'] == deviceid:
if outlet is not None:
self._devices[idx]['params']['switches'][outlet]['switch'] = new_state
else:
self._devices[idx]['params']['switch'] = new_state
self._ws.send(dumps(payload))
try:
wsresp = loads(self._ws.recv())
if wsresp['error'] != 0:
_LOGGER.error(f'open socket {wsresp}')
else:
for idx, device in enumerate(self._devices):
if device['deviceid'] == deviceid:
if outlet is not None:
self._devices[idx]['params']['switches'][outlet]['switch'] = new_state
else:
self._devices[idx]['params']['switch'] = new_state


# @TODO add some sort of validation here, maybe call the devices status
# only IF MAIN STATUS is done over websocket exclusively
finally:
self._ws.close() # no need to keep websocket open (for now)
self._ws = None

# @TODO add some sort of validation here, maybe call the devices status
# only IF MAIN STATUS is done over websocket exclusively

return new_state