fix new cloud verification process

This commit is contained in:
jneilliii 2024-12-07 00:17:57 -05:00
parent 8950778146
commit 383f0c3cb3
3 changed files with 35 additions and 22 deletions

View File

@ -130,9 +130,7 @@ class BambuPrintPlugin(
and "auth_token" in data
):
self._logger.info(f"Registering user {data['email']}")
self._bambu_cloud = BambuCloud(
data["region"], data["email"], data["password"], data["auth_token"]
)
self._bambu_cloud = BambuCloud(data["region"], data["email"], data["password"], data["auth_token"])
auth_response = self._bambu_cloud.login(data["region"], data["email"], data["password"])
return flask.jsonify(
{

View File

@ -66,7 +66,7 @@ class CurlUnavailableError(Exception):
@dataclass
class BambuCloud:
def __init__(self, region: str, email: str, username: str, auth_token: str):
self._region = region
self._email = email
@ -164,7 +164,7 @@ class BambuCloud:
raise NotImplementedError()
self._test_response(response, return400)
return response
def _get_authentication_token(self) -> str:
@ -184,7 +184,7 @@ class BambuCloud:
if accessToken != '':
# We were provided the accessToken directly.
return accessToken
loginType = auth_json.get("loginType", None)
if loginType is None:
LOGGER.error(f"loginType not present")
@ -192,17 +192,19 @@ class BambuCloud:
return ValueError(0) # FIXME
elif loginType == 'verifyCode':
LOGGER.debug(f"Received verifyCode response")
raise EmailCodeRequiredError()
# raise EmailCodeRequiredError()
return loginType
elif loginType == 'tfa':
# Store the tfaKey for later use
LOGGER.debug(f"Received tfa response")
self._tfaKey = auth_json.get("tfaKey")
raise TfaCodeRequiredError()
# raise TfaCodeRequiredError()
return loginType
else:
LOGGER.debug(f"Did not understand json. loginType = '{loginType}'")
LOGGER.error(f"Response not understood: '{response.text}'")
return ValueError(1) # FIXME
def _get_email_verification_code(self):
# Send the verification code request
data = {
@ -228,7 +230,7 @@ class BambuCloud:
LOGGER.debug("Authentication successful.")
LOGGER.debug(f"Response = '{response.json()}'")
elif status_code == 400:
LOGGER.debug(f"Received response: {response.json()}")
LOGGER.debug(f"Received response: {response.json()}")
if response.json()['code'] == 1:
# Code has expired. Request a new one.
self._get_email_verification_code()
@ -241,7 +243,7 @@ class BambuCloud:
raise ValueError(response.json()['code'])
return response.json()['accessToken']
def _get_authentication_token_with_2fa_code(self, code: str) -> dict:
LOGGER.debug("Attempting to connect with provided 2FA code.")
@ -261,7 +263,7 @@ class BambuCloud:
#LOGGER.debug(f"token_from_tfa: {token_from_tfa}")
return token_from_tfa
def _get_username_from_authentication_token(self) -> str:
LOGGER.debug("Trying to get username from authentication token.")
# User name is in 2nd portion of the auth token (delimited with periods)
@ -301,7 +303,7 @@ class BambuCloud:
LOGGER.debug(f"Unable to decode authToken to retrieve username. AuthToken = {self._auth_token}")
return username
# Retrieves json description of devices in the form:
# {
# 'message': 'success',
@ -340,7 +342,7 @@ class BambuCloud:
# }
# ]
# }
def test_authentication(self, region: str, email: str, username: str, auth_token: str) -> bool:
self._region = region
self._email = email
@ -358,18 +360,31 @@ class BambuCloud:
self._password = password
result = self._get_authentication_token()
self._auth_token = result
self._username = self._get_username_from_authentication_token()
if result is None:
LOGGER.error("Unable to authenticate.")
return None
elif len(result) < 20:
return result
else:
self._auth_token = result
self._username = self._get_username_from_authentication_token()
return 'success'
# self._auth_token = result
# self._username = self._get_username_from_authentication_token()
def login_with_verification_code(self, code: str):
result = self._get_authentication_token_with_verification_code(code)
self._auth_token = result
self._username = self._get_username_from_authentication_token()
if self._auth_token != "" and self._username != "" and self._auth_token != None and self._username != None:
return "success"
def login_with_2fa_code(self, code: str):
result = self._get_authentication_token_with_2fa_code(code)
self._auth_token = result
self._username = self._get_username_from_authentication_token()
if self._auth_token != "" and self._username != "" and self._auth_token != None and self._username != None:
return "success"
def get_device_list(self) -> dict:
LOGGER.debug("Getting device list from Bambu Cloud")
@ -454,7 +469,7 @@ class BambuCloud:
return None
LOGGER.debug("Succeeded")
return response.json()
# The task list is of the following form with a 'hits' array with typical 20 entries.
#
# "total": 531,
@ -569,15 +584,15 @@ class BambuCloud:
@property
def username(self):
return self._username
@property
def auth_token(self):
return self._auth_token
@property
def bambu_connected(self) -> bool:
return self._auth_token != "" and self._auth_token != None
@property
def cloud_mqtt_host(self):
return "cn.mqtt.bambulab.com" if self._region == "China" else "us.mqtt.bambulab.com"

View File

@ -14,7 +14,7 @@ plugin_package = "octoprint_bambu_printer"
plugin_name = "OctoPrint-BambuPrinter"
# The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module
plugin_version = "0.1.8rc13"
plugin_version = "0.1.8rc14"
# The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin
# module