|
|
|
@ -66,7 +66,7 @@ class CurlUnavailableError(Exception):
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class BambuCloud:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, region: str, email: str, username: str, auth_token: str):
|
|
|
|
|
self._region = region
|
|
|
|
|
self._email = email
|
|
|
|
@ -164,7 +164,7 @@ class BambuCloud:
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
self._test_response(response, return400)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def _get_authentication_token(self) -> str:
|
|
|
|
@ -184,7 +184,7 @@ class BambuCloud:
|
|
|
|
|
if accessToken != '':
|
|
|
|
|
# We were provided the accessToken directly.
|
|
|
|
|
return accessToken
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loginType = auth_json.get("loginType", None)
|
|
|
|
|
if loginType is None:
|
|
|
|
|
LOGGER.error(f"loginType not present")
|
|
|
|
@ -192,17 +192,19 @@ class BambuCloud:
|
|
|
|
|
return ValueError(0) # FIXME
|
|
|
|
|
elif loginType == 'verifyCode':
|
|
|
|
|
LOGGER.debug(f"Received verifyCode response")
|
|
|
|
|
raise EmailCodeRequiredError()
|
|
|
|
|
# raise EmailCodeRequiredError()
|
|
|
|
|
return loginType
|
|
|
|
|
elif loginType == 'tfa':
|
|
|
|
|
# Store the tfaKey for later use
|
|
|
|
|
LOGGER.debug(f"Received tfa response")
|
|
|
|
|
self._tfaKey = auth_json.get("tfaKey")
|
|
|
|
|
raise TfaCodeRequiredError()
|
|
|
|
|
# raise TfaCodeRequiredError()
|
|
|
|
|
return loginType
|
|
|
|
|
else:
|
|
|
|
|
LOGGER.debug(f"Did not understand json. loginType = '{loginType}'")
|
|
|
|
|
LOGGER.error(f"Response not understood: '{response.text}'")
|
|
|
|
|
return ValueError(1) # FIXME
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_email_verification_code(self):
|
|
|
|
|
# Send the verification code request
|
|
|
|
|
data = {
|
|
|
|
@ -228,7 +230,7 @@ class BambuCloud:
|
|
|
|
|
LOGGER.debug("Authentication successful.")
|
|
|
|
|
LOGGER.debug(f"Response = '{response.json()}'")
|
|
|
|
|
elif status_code == 400:
|
|
|
|
|
LOGGER.debug(f"Received response: {response.json()}")
|
|
|
|
|
LOGGER.debug(f"Received response: {response.json()}")
|
|
|
|
|
if response.json()['code'] == 1:
|
|
|
|
|
# Code has expired. Request a new one.
|
|
|
|
|
self._get_email_verification_code()
|
|
|
|
@ -241,7 +243,7 @@ class BambuCloud:
|
|
|
|
|
raise ValueError(response.json()['code'])
|
|
|
|
|
|
|
|
|
|
return response.json()['accessToken']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_authentication_token_with_2fa_code(self, code: str) -> dict:
|
|
|
|
|
LOGGER.debug("Attempting to connect with provided 2FA code.")
|
|
|
|
|
|
|
|
|
@ -261,7 +263,7 @@ class BambuCloud:
|
|
|
|
|
#LOGGER.debug(f"token_from_tfa: {token_from_tfa}")
|
|
|
|
|
|
|
|
|
|
return token_from_tfa
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_username_from_authentication_token(self) -> str:
|
|
|
|
|
LOGGER.debug("Trying to get username from authentication token.")
|
|
|
|
|
# User name is in 2nd portion of the auth token (delimited with periods)
|
|
|
|
@ -301,7 +303,7 @@ class BambuCloud:
|
|
|
|
|
LOGGER.debug(f"Unable to decode authToken to retrieve username. AuthToken = {self._auth_token}")
|
|
|
|
|
|
|
|
|
|
return username
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Retrieves json description of devices in the form:
|
|
|
|
|
# {
|
|
|
|
|
# 'message': 'success',
|
|
|
|
@ -340,7 +342,7 @@ class BambuCloud:
|
|
|
|
|
# }
|
|
|
|
|
# ]
|
|
|
|
|
# }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_authentication(self, region: str, email: str, username: str, auth_token: str) -> bool:
|
|
|
|
|
self._region = region
|
|
|
|
|
self._email = email
|
|
|
|
@ -358,18 +360,31 @@ class BambuCloud:
|
|
|
|
|
self._password = password
|
|
|
|
|
|
|
|
|
|
result = self._get_authentication_token()
|
|
|
|
|
self._auth_token = result
|
|
|
|
|
self._username = self._get_username_from_authentication_token()
|
|
|
|
|
|
|
|
|
|
if result is None:
|
|
|
|
|
LOGGER.error("Unable to authenticate.")
|
|
|
|
|
return None
|
|
|
|
|
elif len(result) < 20:
|
|
|
|
|
return result
|
|
|
|
|
else:
|
|
|
|
|
self._auth_token = result
|
|
|
|
|
self._username = self._get_username_from_authentication_token()
|
|
|
|
|
return 'success'
|
|
|
|
|
# self._auth_token = result
|
|
|
|
|
# self._username = self._get_username_from_authentication_token()
|
|
|
|
|
|
|
|
|
|
def login_with_verification_code(self, code: str):
|
|
|
|
|
result = self._get_authentication_token_with_verification_code(code)
|
|
|
|
|
self._auth_token = result
|
|
|
|
|
self._username = self._get_username_from_authentication_token()
|
|
|
|
|
if self._auth_token != "" and self._username != "" and self._auth_token != None and self._username != None:
|
|
|
|
|
return "success"
|
|
|
|
|
|
|
|
|
|
def login_with_2fa_code(self, code: str):
|
|
|
|
|
result = self._get_authentication_token_with_2fa_code(code)
|
|
|
|
|
self._auth_token = result
|
|
|
|
|
self._username = self._get_username_from_authentication_token()
|
|
|
|
|
if self._auth_token != "" and self._username != "" and self._auth_token != None and self._username != None:
|
|
|
|
|
return "success"
|
|
|
|
|
|
|
|
|
|
def get_device_list(self) -> dict:
|
|
|
|
|
LOGGER.debug("Getting device list from Bambu Cloud")
|
|
|
|
@ -454,7 +469,7 @@ class BambuCloud:
|
|
|
|
|
return None
|
|
|
|
|
LOGGER.debug("Succeeded")
|
|
|
|
|
return response.json()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# The task list is of the following form with a 'hits' array with typical 20 entries.
|
|
|
|
|
#
|
|
|
|
|
# "total": 531,
|
|
|
|
@ -569,15 +584,15 @@ class BambuCloud:
|
|
|
|
|
@property
|
|
|
|
|
def username(self):
|
|
|
|
|
return self._username
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def auth_token(self):
|
|
|
|
|
return self._auth_token
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def bambu_connected(self) -> bool:
|
|
|
|
|
return self._auth_token != "" and self._auth_token != None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def cloud_mqtt_host(self):
|
|
|
|
|
return "cn.mqtt.bambulab.com" if self._region == "China" else "us.mqtt.bambulab.com"
|
|
|
|
|