0.1.8rc12

update pybambu module from upstream HA project
This commit is contained in:
jneilliii 2024-12-01 16:30:12 -05:00
parent 98ab94b371
commit 52ba3ff214
8 changed files with 457 additions and 1317 deletions

View File

@ -259,19 +259,20 @@ class BambuVirtualPrinter:
f"connecting via local mqtt: {self._settings.get_boolean(['local_mqtt'])}"
)
bambu_client = BambuClient(
device_type=self._settings.get(["device_type"]),
serial=self._settings.get(["serial"]),
host=self._settings.get(["host"]),
username=(
{"device_type": self._settings.get(["device_type"]),
"serial": self._settings.get(["serial"]),
"host": self._settings.get(["host"]),
"username": (
"bblp"
if self._settings.get_boolean(["local_mqtt"])
else self._settings.get(["username"])
),
access_code=self._settings.get(["access_code"]),
local_mqtt=self._settings.get_boolean(["local_mqtt"]),
region=self._settings.get(["region"]),
email=self._settings.get(["email"]),
auth_token=self._settings.get(["auth_token"]) if self._settings.get_boolean(["local_mqtt"]) is False else "",
"access_code": self._settings.get(["access_code"]),
"local_mqtt": self._settings.get_boolean(["local_mqtt"]),
"region": self._settings.get(["region"]),
"email": self._settings.get(["email"]),
"auth_token": self._settings.get(["auth_token"]) if self._settings.get_boolean(["local_mqtt"]) is False else "",
}
)
bambu_client.on_disconnect = self.on_disconnect(bambu_client.on_disconnect)
bambu_client.on_connect = self.on_connect(bambu_client.on_connect)

View File

@ -237,7 +237,7 @@ class MqttThread(threading.Thread):
exceptionSeen = ""
while True:
try:
host = self._client.host if self._client.local_mqtt else self._client.bambu_cloud.cloud_mqtt_host
host = self._client.host if self._client._local_mqtt else self._client.bambu_cloud.cloud_mqtt_host
LOGGER.debug(f"Connect: Attempting Connection to {host}")
self._client.client.connect(host, self._client._port, keepalive=5)
@ -285,25 +285,32 @@ class BambuClient:
_camera = None
_usage_hours: float
def __init__(self, device_type: str, serial: str, host: str, local_mqtt: bool, region: str, email: str,
username: str, auth_token: str, access_code: str, usage_hours: float = 0, manual_refresh_mode: bool = False, chamber_image: bool = True):
self.callback = None
self.host = host
self.local_mqtt = local_mqtt
self._serial = serial
self._auth_token = auth_token
self._access_code = access_code
self._username = username
def __init__(self, config):
self.host = config['host']
self._callback = None
self._access_code = config.get('access_code', '')
self._auth_token = config.get('auth_token', '')
self._device_type = config.get('device_type', 'unknown')
self._local_mqtt = config.get('local_mqtt', False)
self._manual_refresh_mode = config.get('manual_refresh_mode', False)
self._serial = config.get('serial', '')
self._usage_hours = config.get('usage_hours', 0)
self._username = config.get('username', '')
self._enable_camera = config.get('enable_camera', True)
self._connected = False
self._device_type = device_type
self._usage_hours = usage_hours
self._port = 1883
self._refreshed = False
self._manual_refresh_mode = manual_refresh_mode
self._device = Device(self)
self.bambu_cloud = BambuCloud(region, email, username, auth_token)
self.bambu_cloud = BambuCloud(
config.get('region', ''),
config.get('email', ''),
config.get('username', ''),
config.get('auth_token', '')
)
self.slicer_settings = SlicerSettings(self)
self.use_chamber_image = chamber_image
@property
def connected(self):
@ -322,7 +329,22 @@ class BambuClient:
self.disconnect()
else:
# Reconnect normally
self.connect(self.callback)
self.connect(self._callback)
@property
def camera_enabled(self):
return self._enable_camera
def callback(self, event: str):
if self._callback is not None:
self._callback(event)
def set_camera_enabled(self, enable):
self._enable_camera = enable
if self._enable_camera:
self._start_camera()
else:
self._stop_camera()
def setup_tls(self):
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
@ -331,7 +353,7 @@ class BambuClient:
def connect(self, callback):
"""Connect to the MQTT Broker"""
self.client = mqtt.Client()
self.callback = callback
self._callback = callback
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
@ -342,7 +364,7 @@ class BambuClient:
self.setup_tls()
self._port = 8883
if self.local_mqtt:
if self._local_mqtt:
self.client.username_pw_set("bblp", password=self._access_code)
else:
self.client.username_pw_set(self._username, password=self._auth_token)
@ -371,6 +393,22 @@ class BambuClient:
LOGGER.info("On Connect: Connected to printer")
self._on_connect()
def _start_camera(self):
if not self._device.supports_feature(Features.CAMERA_RTSP):
if self._device.supports_feature(Features.CAMERA_IMAGE):
if self._enable_camera:
LOGGER.debug("Starting Chamber Image thread")
self._camera = ChamberImageThread(self)
self._camera.start()
elif (self.host == "") or (self._access_code == ""):
LOGGER.debug("Skipping camera setup as local access details not provided.")
def _stop_camera(self):
if self._camera is not None:
LOGGER.debug("Stopping camera thread")
self._camera.stop()
self._camera.join()
def _on_connect(self):
self._connected = True
self.subscribe_and_request_info()
@ -379,14 +417,7 @@ class BambuClient:
self._watchdog = WatchdogThread(self)
self._watchdog.start()
if not self._device.supports_feature(Features.CAMERA_RTSP):
if self._device.supports_feature(Features.CAMERA_IMAGE):
if self.use_chamber_image:
LOGGER.debug("Starting Chamber Image thread")
self._camera = ChamberImageThread(self)
self._camera.start()
elif (self.host == "") or (self._access_code == ""):
LOGGER.debug("Skipping camera setup as local access details not provided.")
self._start_camera()
def try_on_connect(self,
client_: mqtt.Client,
@ -419,10 +450,7 @@ class BambuClient:
LOGGER.debug("Stopping watchdog thread")
self._watchdog.stop()
self._watchdog.join()
if self._camera is not None:
LOGGER.debug("Stopping camera thread")
self._camera.stop()
self._camera.join()
self._stop_camera()
def _on_watchdog_fired(self):
LOGGER.info("Watch dog fired")
@ -487,7 +515,7 @@ class BambuClient:
"""Force refresh data"""
if self._manual_refresh_mode:
self.connect(self.callback)
self.connect(self._callback)
else:
LOGGER.debug("Force Refresh: Getting Version Info")
self._refreshed = True
@ -532,7 +560,7 @@ class BambuClient:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.setup_tls)
if self.local_mqtt:
if self._local_mqtt:
self.client.username_pw_set("bblp", password=self._access_code)
else:
self.client.username_pw_set(self._username, password=self._auth_token)

View File

@ -1,11 +1,25 @@
from __future__ import annotations
from enum import (
Enum,
)
import base64
import cloudscraper
import json
import requests
curl_available = True
class ConnectionMechanismEnum(Enum):
CLOUDSCRAPER = 1,
CURL_CFFI = 2,
REQUESTS = 3
CONNECTION_MECHANISM = ConnectionMechanismEnum.CLOUDSCRAPER
curl_available = False
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
try:
from curl_cffi import requests as curl_requests
curl_available = True
except ImportError:
curl_available = False
@ -20,6 +34,36 @@ from .utils import get_Url
IMPERSONATE_BROWSER='chrome'
class CloudflareError(Exception):
def __init__(self):
super().__init__("Blocked by Cloudflare")
self.error_code = 403
class EmailCodeRequiredError(Exception):
def __init__(self):
super().__init__("Email code required")
self.error_code = 400
class EmailCodeExpiredError(Exception):
def __init__(self):
super().__init__("Email code expired")
self.error_code = 400
class EmailCodeIncorrectError(Exception):
def __init__(self):
super().__init__("Email code incorrect")
self.error_code = 400
class TfaCodeRequiredError(Exception):
def __init__(self):
super().__init__("Two factor authentication code required")
self.error_code = 400
class CurlUnavailableError(Exception):
def __init__(self):
super().__init__("curl library unavailable")
self.error_code = 400
@dataclass
class BambuCloud:
@ -30,16 +74,101 @@ class BambuCloud:
self._auth_token = auth_token
self._tfaKey = None
def _get_headers(self):
return {
'User-Agent': 'bambu_network_agent/01.09.05.01',
'X-BBL-Client-Name': 'OrcaSlicer',
'X-BBL-Client-Type': 'slicer',
'X-BBL-Client-Version': '01.09.05.51',
'X-BBL-Language': 'en-US',
'X-BBL-OS-Type': 'linux',
'X-BBL-OS-Version': '6.2.0',
'X-BBL-Agent-Version': '01.09.05.01',
'X-BBL-Executable-info': '{}',
'X-BBL-Agent-OS-Type': 'linux',
'accept': 'application/json',
'Content-Type': 'application/json'
}
# Orca/Bambu Studio also add this - need to work out what an appropriate ID is to put here:
# 'X-BBL-Device-ID': BBL_AUTH_UUID,
# Example: X-BBL-Device-ID: 370f9f43-c6fe-47d7-aec9-5fe5ef7e7673
def _get_headers_with_auth_token(self) -> dict:
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
headers = {}
else:
headers = self._get_headers()
headers['Authorization'] = f"Bearer {self._auth_token}"
return headers
def _get_authentication_token(self) -> dict:
LOGGER.debug("Getting accessToken from Bambu Cloud")
def _test_response(self, response, return400=False):
# Check specifically for cloudflare block
if response.status_code == 403 and 'cloudflare' in response.text:
LOGGER.debug("BLOCKED BY CLOUDFLARE")
raise CloudflareError()
if response.status_code == 400 and not return400:
LOGGER.error(f"Connection failed with error code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise PermissionError(response.status_code, response.text)
if response.status_code > 400:
LOGGER.error(f"Connection failed with error code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise PermissionError(response.status_code, response.text)
LOGGER.debug(f"Response: {response.status_code}")
def _get(self, urlenum: BambuUrl):
url = get_Url(urlenum, self._region)
headers=self._get_headers_with_auth_token()
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
if not curl_available:
LOGGER.debug(f"Curl library is unavailable.")
return 'curlUnavailable'
raise CurlUnavailableError()
response = curl_requests.get(url, headers=headers, timeout=10, impersonate=IMPERSONATE_BROWSER)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.CLOUDSCRAPER:
if len(headers) == 0:
headers = self._get_headers()
scraper = cloudscraper.create_scraper()
response = scraper.get(url, headers=headers, timeout=10)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.REQUESTS:
if len(headers) == 0:
headers = self._get_headers()
response = requests.get(url, headers=headers, timeout=10)
else:
raise NotImplementedError()
self._test_response(response)
return response
def _post(self, urlenum: BambuUrl, json: str, headers={}, return400=False):
url = get_Url(urlenum, self._region)
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
if not curl_available:
LOGGER.debug(f"Curl library is unavailable.")
raise CurlUnavailableError()
response = curl_requests.post(url, headers=headers, json=json, impersonate=IMPERSONATE_BROWSER)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.CLOUDSCRAPER:
if len(headers) == 0:
headers = self._get_headers()
scraper = cloudscraper.create_scraper()
response = scraper.post(url, headers=headers, json=json)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.REQUESTS:
if len(headers) == 0:
headers = self._get_headers()
response = requests.post(url, headers=headers, json=json)
else:
raise NotImplementedError()
self._test_response(response, return400)
return response
def _get_authentication_token(self) -> str:
LOGGER.debug("Getting accessToken from Bambu Cloud")
# First we need to find out how Bambu wants us to login.
data = {
@ -48,20 +177,7 @@ class BambuCloud:
"apiError": ""
}
response = curl_requests.post(get_Url(BambuUrl.LOGIN, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
# Check specifically for cloudflare block
if response.status_code == 403:
if 'cloudflare' in response.text:
LOGGER.error('CloudFlare blocked connection attempt')
return 'cloudFlare'
if response.status_code >= 400:
LOGGER.error(f"Login attempt failed with error code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
LOGGER.debug(f"Response: {response.status_code}")
response = self._post(BambuUrl.LOGIN, json=data)
auth_json = response.json()
accessToken = auth_json.get('accessToken', '')
@ -73,18 +189,19 @@ class BambuCloud:
if loginType is None:
LOGGER.error(f"loginType not present")
LOGGER.error(f"Response not understood: '{response.text}'")
return None
return ValueError(0) # FIXME
elif loginType == 'verifyCode':
LOGGER.debug(f"Received verifyCode response")
raise EmailCodeRequiredError()
elif loginType == 'tfa':
# Store the tfaKey for later use
LOGGER.debug(f"Received tfa response")
self._tfaKey = auth_json.get("tfaKey")
raise TfaCodeRequiredError()
else:
LOGGER.debug(f"Did not understand json. loginType = '{loginType}'")
LOGGER.error(f"Response not understood: '{response.text}'")
return loginType
return ValueError(1) # FIXME
def _get_email_verification_code(self):
# Send the verification code request
@ -94,14 +211,8 @@ class BambuCloud:
}
LOGGER.debug("Requesting verification code")
response = curl_requests.post(get_Url(BambuUrl.EMAIL_CODE, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
if response.status_code == 200:
self._post(BambuUrl.EMAIL_CODE, json=data)
LOGGER.debug("Verification code requested successfully.")
else:
LOGGER.error(f"Received error trying to send verification code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
def _get_authentication_token_with_verification_code(self, code) -> dict:
LOGGER.debug("Attempting to connect with provided verification code.")
@ -110,27 +221,24 @@ class BambuCloud:
"code": code
}
response = curl_requests.post(get_Url(BambuUrl.LOGIN, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
response = self._post(BambuUrl.LOGIN, json=data, return400=True)
status_code = response.status_code
LOGGER.debug(f"Response: {response.status_code}")
if response.status_code == 200:
if status_code == 200:
LOGGER.debug("Authentication successful.")
elif response.status_code == 400:
LOGGER.debug(f"Response: '{response.json()}'")
LOGGER.debug(f"Response = '{response.json()}'")
elif status_code == 400:
LOGGER.debug(f"Received response: {response.json()}")
if response.json()['code'] == 1:
# Code has expired. Request a new one.
self._get_email_verification_code()
return 'codeExpired'
raise EmailCodeExpiredError()
elif response.json()['code'] == 2:
# Code was incorrect. Let the user try again.
return 'codeIncorrect'
raise EmailCodeIncorrectError()
else:
LOGGER.error(f"Response not understood: '{response.json()}'")
raise ValueError(response.json()['code'])
else:
LOGGER.error(f"Received error trying to authenticate with verification code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
return response.json()['accessToken']
@ -142,30 +250,57 @@ class BambuCloud:
"tfaCode": code
}
response = curl_requests.post(get_Url(BambuUrl.TFA_LOGIN, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
response = self._post(BambuUrl.TFA_LOGIN, json=data)
LOGGER.debug(f"Response: {response.status_code}")
if response.status_code == 200:
LOGGER.debug("Authentication successful.")
else:
LOGGER.error(f"Received error trying to authenticate with verification code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
cookies = response.cookies.get_dict()
token_from_tfa = cookies.get("token")
LOGGER.debug(f"token_from_tfa: {token_from_tfa}")
#LOGGER.debug(f"token_from_tfa: {token_from_tfa}")
return token_from_tfa
def _get_username_from_authentication_token(self) -> str:
LOGGER.debug("Trying to get username from authentication token.")
# User name is in 2nd portion of the auth token (delimited with periods)
username = None
tokens = self._auth_token.split(".")
if len(tokens) != 3:
LOGGER.debug("Received authToken is not a JWT.")
LOGGER.debug("Trying to use project API to retrieve username instead")
response = self.get_projects();
if response is not None:
projectsnode = response.get('projects', None)
if projectsnode is None:
LOGGER.debug("Failed to find projects node")
else:
if len(projectsnode) == 0:
LOGGER.debug("No projects node in response")
else:
project=projectsnode[0]
if project.get('user_id', None) is None:
LOGGER.debug("No user_id entry")
else:
username = f"u_{project['user_id']}"
LOGGER.debug(f"Found user_id of {username}")
else:
LOGGER.debug("Authentication token looks to be a JWT")
try:
b64_string = self._auth_token.split(".")[1]
# String must be multiples of 4 chars in length. For decode pad with = character
b64_string += "=" * ((4 - len(b64_string) % 4) % 4)
jsonAuthToken = json.loads(base64.b64decode(b64_string))
# Gives json payload with "username":"u_<digits>" within it
return jsonAuthToken['username']
username = jsonAuthToken.get('username', None)
except:
LOGGER.debug("Unable to decode authToken to json to retrieve username.")
if username is None:
LOGGER.debug(f"Unable to decode authToken to retrieve username. AuthToken = {self._auth_token}")
return username
# Retrieves json description of devices in the form:
# {
@ -223,49 +358,25 @@ class BambuCloud:
self._password = password
result = self._get_authentication_token()
if result is None:
LOGGER.error("Unable to authenticate.")
return None
elif len(result) < 20:
return result
else:
self._auth_token = result
self._username = self._get_username_from_authentication_token()
return 'success'
def login_with_verification_code(self, code: str):
result = self._get_authentication_token_with_verification_code(code)
if len(result) < 20:
return result
self._auth_token = result
self._username = self._get_username_from_authentication_token()
return 'success'
def login_with_2fa_code(self, code: str):
result = self._get_authentication_token_with_2fa_code(code)
if len(result) < 20:
return result
self._auth_token = result
self._username = self._get_username_from_authentication_token()
return 'success'
def get_device_list(self) -> dict:
LOGGER.debug("Getting device list from Bambu Cloud")
if not curl_available:
LOGGER.debug(f"Curl library is unavailable.")
raise None
response = curl_requests.get(get_Url(BambuUrl.BIND, self._region), headers=self._get_headers_with_auth_token(), timeout=10, impersonate=IMPERSONATE_BROWSER)
if response.status_code == 403:
if 'cloudflare' in response.text:
LOGGER.error('CloudFlare blocked connection attempt')
raise ValueError(response.status_code)
if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}")
LOGGER.error(f"Received error: '{response.text}'")
raise ValueError(response.status_code)
try:
response = self._get(BambuUrl.BIND)
except:
return None
return response.json()['devices']
# The slicer settings are of the following form:
@ -336,23 +447,13 @@ class BambuCloud:
# }
def get_slicer_settings(self) -> dict:
LOGGER.debug("DISABLED: Getting slicer settings from Bambu Cloud")
# Disabled for now since it may be contributing to cloudflare detection speed.
#
# if curl_available:
# response = curl_requests.get(get_Url(BambuUrl.SLICER_SETTINGS, self._region), headers=self._get_headers_with_auth_token(), timeout=10, impersonate=IMPERSONATE_BROWSER)
# if response.status_code == 403:
# if 'cloudflare' in response.text:
# LOGGER.error(f"Cloudflare blocked slicer settings lookup.")
# return None
# if response.status_code >= 400:
# LOGGER.error(f"Slicer settings load failed: {response.status_code}")
# LOGGER.error(f"Slicer settings load failed: '{response.text}'")
# return None
# return response.json()
LOGGER.debug("Getting slicer settings from Bambu Cloud")
try:
response = self._get(BambuUrl.SLICER_SETTINGS)
except:
return None
LOGGER.debug("Succeeded")
return response.json()
# The task list is of the following form with a 'hits' array with typical 20 entries.
#
@ -398,28 +499,37 @@ class BambuCloud:
def get_tasklist(self) -> dict:
LOGGER.debug("Getting full task list from Bambu Cloud")
if not curl_available:
LOGGER.debug(f"Curl library is unavailable.")
raise None
url = get_Url(BambuUrl.TASKS, self._region)
response = curl_requests.get(url, headers=self._get_headers_with_auth_token(), timeout=10, impersonate=IMPERSONATE_BROWSER)
if response.status_code == 403:
if 'cloudflare' in response.text:
LOGGER.error('CloudFlare blocked connection attempt')
try:
response = self._get(BambuUrl.TASKS)
except:
return None
return response.json()
# Check specifically for cloudflare block
if response.status_code == 403:
if 'cloudflare' in response.text:
LOGGER.error('CloudFlare blocked connection attempt')
# Returns a list of projects for the account.
#
# {
# "message": "success",
# "code": null,
# "error": null,
# "projects": [
# {
# "project_id": "164995388",
# "user_id": "1688388450",
# "model_id": "US48e2103d939bf8",
# "status": "ACTIVE",
# "name": "Alcohol_Marker_Storage_for_Copic,_Ohuhu_and_the_like",
# "content": "{'printed_plates': [{'plate': 1}]}",
# "create_time": "2024-11-17 06:12:33",
# "update_time": "2024-11-17 06:12:40"
# },
# ...
#
def get_projects(self) -> dict:
LOGGER.debug("Getting projects list from Bambu Cloud")
try:
response = self._get(BambuUrl.PROJECTS)
except:
return None
if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}")
LOGGER.debug(f"Received error: '{response.text}'")
raise None
return response.json()
def get_latest_task_for_printer(self, deviceId: str) -> dict:
@ -429,10 +539,8 @@ class BambuCloud:
if len(data) != 0:
return data[0]
LOGGER.debug("No tasks found for printer")
except:
LOGGER.debug("Unable to make call")
return None
except:
return None
def get_tasklist_for_printer(self, deviceId: str) -> dict:
@ -451,21 +559,11 @@ class BambuCloud:
def download(self, url: str) -> bytearray:
LOGGER.debug(f"Downloading cover image: {url}")
if not curl_available:
LOGGER.debug(f"Curl library is unavailable.")
try:
# This is just a standard download from an unauthenticated end point.
response = requests.get(url)
except:
return None
response = curl_requests.get(url, timeout=10, impersonate=IMPERSONATE_BROWSER)
if response.status_code == 403:
if 'cloudflare' in response.text:
LOGGER.error('CloudFlare blocked connection attempt')
raise ValueError(response.status_code)
if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}")
LOGGER.debug(f"Received error: {response.text}")
raise ValueError(response.status_code)
return response.content
@property

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@
"GFB01": "Bambu ASA",
"GFB02": "Bambu ASA-Aero",
"GFB50": "Bambu ABS-GF",
"GFB51": "Bambu ASA-CF",
"GFB60": "PolyLite ABS",
"GFB61": "PolyLite ASA",
"GFB98": "Generic ASA",
@ -26,6 +27,7 @@
"GFG02": "Bambu PETG HF",
"GFG50": "Bambu PETG-CF",
"GFG60": "PolyLite PETG",
"GFG96": "Generic PETG HF",
"GFG97": "Generic PCTG",
"GFG98": "Generic PETG-CF",
"GFG99": "Generic PETG",
@ -34,6 +36,13 @@
"GFL03": "eSUN PLA+",
"GFL04": "Overture PLA",
"GFL05": "Overture Matte PLA",
"GFL06": "Fiberon PETG-ESD",
"GFL50": "Fiberon PA6-CF",
"GFL51": "Fiberon PA6-GF",
"GFL52": "Fiberon PA12-CF",
"GFL53": "Fiberon PA612-CF",
"GFL54": "Fiberon PET-CF",
"GFL55": "Fiberon PETG-rCF",
"GFL95": "Generic PLA High Speed",
"GFL96": "Generic PLA Silk",
"GFL98": "Generic PLA-CF",
@ -41,6 +50,7 @@
"GFN03": "Bambu PA-CF",
"GFN04": "Bambu PAHT-CF",
"GFN05": "Bambu PA6-CF",
"GFN06": "Bambu PPA-CF",
"GFN08": "Bambu PA6-GF",
"GFN96": "Generic PPA-GF",
"GFN97": "Generic PPA-CF",
@ -64,9 +74,12 @@
"GFS98": "Generic HIPS",
"GFS99": "Generic PVA",
"GFT01": "Bambu PET-CF",
"GFT02": "Bambu PPS-CF",
"GFT97": "Generic PPS",
"GFT98": "Generic PPS-CF",
"GFU00": "Bambu TPU 95A HF",
"GFU01": "Bambu TPU 95A",
"GFU02": "Bambu TPU for AMS",
"GFU98": "Generic TPU for AMS",
"GFU99": "Generic TPU"
}

View File

@ -22,6 +22,7 @@ from .utils import (
get_generic_AMS_HMS_error_code,
get_HMS_severity,
get_HMS_module,
set_temperature_to_gcode,
)
from .const import (
LOGGER,
@ -32,6 +33,7 @@ from .const import (
SPEED_PROFILE,
GCODE_STATE_OPTIONS,
PRINT_TYPE_OPTIONS,
TempEnum,
)
from .commands import (
CHAMBER_LIGHT_ON,
@ -42,7 +44,7 @@ from .commands import (
class Device:
def __init__(self, client):
self._client = client
self.temperature = Temperature()
self.temperature = Temperature(client = client)
self.lights = Lights(client = client)
self.info = Info(client = client)
self.print_job = PrintJob(client = client)
@ -53,7 +55,7 @@ class Device:
self.external_spool = ExternalSpool(client = client)
self.hms = HMSList(client = client)
self.print_error = PrintErrorList(client = client)
self.camera = Camera()
self.camera = Camera(client = client)
self.home_flag = HomeFlag(client=client)
self.push_all_data = None
self.get_version_data = None
@ -77,7 +79,6 @@ class Device:
send_event = send_event | self.camera.print_update(data = data)
send_event = send_event | self.home_flag.print_update(data = data)
if send_event and self._client.callback is not None:
self._client.callback("event_printer_data_update")
if data.get("msg", 0) == 0:
@ -90,6 +91,16 @@ class Device:
if data.get("command") == "get_version":
self.get_version_data = data
def _supports_temperature_set(self):
# When talking to the Bambu cloud mqtt, setting the temperatures is allowed.
if self.info.mqtt_mode == "bambu_cloud":
return True
# X1* have not yet blocked setting the temperatures when in nybrid connection mode.
if self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E":
return True
# What's left is P1 and A1 printers that we are connecting by local mqtt. These are supported only in pure Lan Mode.
return not self._client.bambu_cloud.bambu_connected
def supports_feature(self, feature):
if feature == Features.AUX_FAN:
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
@ -126,6 +137,8 @@ class Device:
elif feature == Features.AMS_FILAMENT_REMAINING:
# Technically this is not the AMS Lite but that's currently tied to only these printer types.
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
elif feature == Features.SET_TEMPERATURE:
return self._supports_temperature_set()
return False
@ -185,14 +198,12 @@ class Lights:
def TurnChamberLightOn(self):
self.chamber_light = "on"
self.chamber_light_override = "on"
if self._client.callback is not None:
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_ON)
def TurnChamberLightOff(self):
self.chamber_light = "off"
self.chamber_light_override = "off"
if self._client.callback is not None:
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_OFF)
@ -205,7 +216,8 @@ class Camera:
rtsp_url: str
timelapse: str
def __init__(self):
def __init__(self, client):
self._client = client
self.recording = ''
self.resolution = ''
self.rtsp_url = None
@ -227,7 +239,10 @@ class Camera:
self.timelapse = data.get("ipcam", {}).get("timelapse", self.timelapse)
self.recording = data.get("ipcam", {}).get("ipcam_record", self.recording)
self.resolution = data.get("ipcam", {}).get("resolution", self.resolution)
if self._client._enable_camera:
self.rtsp_url = data.get("ipcam", {}).get("rtsp_url", self.rtsp_url)
else:
self.rtsp_url = None
return (old_data != f"{self.__dict__}")
@ -240,7 +255,8 @@ class Temperature:
nozzle_temp: int
target_nozzle_temp: int
def __init__(self):
def __init__(self, client):
self._client = client
self.bed_temp = 0
self.target_bed_temp = 0
self.chamber_temp = 0
@ -258,6 +274,20 @@ class Temperature:
return (old_data != f"{self.__dict__}")
def set_target_temp(self, temp: TempEnum, temperature: int):
command = set_temperature_to_gcode(temp, temperature)
# if type == TempEnum.HEATBED:
# self.bed_temp = temperature
# elif type == TempEnum.NOZZLE:
# self.nozzle_temp = temperature
LOGGER.debug(command)
self._client.publish(command)
self._client.callback("event_printer_data_update")
@dataclass
class Fans:
"""Return all fan related info"""
@ -337,7 +367,6 @@ class Fans:
LOGGER.debug(command)
self._client.publish(command)
if self._client.callback is not None:
self._client.callback("event_printer_data_update")
def get_fan_speed(self, fan: FansEnum) -> int:
@ -483,7 +512,6 @@ class PrintJob:
currently_idle = self.gcode_state == "IDLE" or self.gcode_state == "FAILED" or self.gcode_state == "FINISH"
if previously_idle and not currently_idle:
if self._client.callback is not None:
self._client.callback("event_print_started")
# Generate the start_time for P1P/S when printer moves from idle to another state. Original attempt with remaining time
@ -508,19 +536,16 @@ class PrintJob:
isCanceledPrint = False
if data.get("print_error") == 50348044 and self.print_error == 0:
isCanceledPrint = True
if self._client.callback is not None:
self._client.callback("event_print_canceled")
self.print_error = data.get("print_error", self.print_error)
# Handle print failed
if previous_gcode_state != "unknown" and previous_gcode_state != "FAILED" and self.gcode_state == "FAILED":
if not isCanceledPrint:
if self._client.callback is not None:
self._client.callback("event_print_failed")
# Handle print finish
if previous_gcode_state != "unknown" and previous_gcode_state != "FINISH" and self.gcode_state == "FINISH":
if self._client.callback is not None:
self._client.callback("event_print_finished")
if currently_idle and not previously_idle and previous_gcode_state != "unknown":
@ -666,7 +691,7 @@ class Info:
self.sw_ver = "unknown"
self.online = False
self.new_version_state = 0
self.mqtt_mode = "local" if self._client.local_mqtt else "bambu_cloud"
self.mqtt_mode = "local" if self._client._local_mqtt else "bambu_cloud"
self.nozzle_diameter = 0
self.nozzle_type = "unknown"
self.usage_hours = client._usage_hours
@ -674,7 +699,6 @@ class Info:
def set_online(self, online):
if self.online != online:
self.online = online
if self._client.callback is not None:
self._client.callback("event_printer_data_update")
def info_update(self, data):
@ -704,7 +728,6 @@ class Info:
LOGGER.debug(f"Device is {self.device_type}")
self.hw_ver = get_hw_version(modules, self.hw_ver)
self.sw_ver = get_sw_version(modules, self.sw_ver)
if self._client.callback is not None:
self._client.callback("event_printer_info_update")
def print_update(self, data) -> bool:
@ -891,7 +914,6 @@ class AMSList:
data_changed = data_changed or (old_data != f"{self.__dict__}")
if data_changed:
if self._client.callback is not None:
self._client.callback("event_ams_info_update")
def print_update(self, data) -> bool:
@ -1121,7 +1143,6 @@ class Speed:
command = SPEED_PROFILE_TEMPLATE
command['print']['param'] = f"{id}"
self._client.publish(command)
if self._client.callback is not None:
self._client.callback("event_speed_update")
@ -1187,7 +1208,8 @@ class HMSList:
attr = int(hms['attr'])
code = int(hms['code'])
hms_notif = HMSNotification(attr=attr, code=code)
errors[f"{index}-Error"] = f"HMS_{hms_notif.hms_code}: {get_HMS_error_text(hms_notif.hms_code)}"
errors[f"{index}-Code"] = f"HMS_{hms_notif.hms_code}"
errors[f"{index}-Error"] = get_HMS_error_text(hms_notif.hms_code)
errors[f"{index}-Wiki"] = hms_notif.wiki_url
errors[f"{index}-Severity"] = hms_notif.severity
#LOGGER.debug(f"HMS error for '{hms_notif.module}' and severity '{hms_notif.severity}': HMS_{hms_notif.hms_code}")
@ -1198,8 +1220,7 @@ class HMSList:
self._errors = errors
if self._count != 0:
LOGGER.warning(f"HMS ERRORS: {errors}")
if self._client.callback is not None:
self._client.callback("event_hms_errors")
self._client.callback("event_printer_error")
return True
return False
@ -1217,11 +1238,9 @@ class HMSList:
class PrintErrorList:
"""Return all print_error related info"""
_error: dict
_count: int
def __init__(self, client):
self._error = None
self._count = 0
self._client = client
def print_update(self, data) -> bool:
@ -1238,13 +1257,12 @@ class PrintErrorList:
hex_conversion = f'0{int(print_error_code):x}'
print_error_code_hex = hex_conversion[slice(0,4,1)] + "_" + hex_conversion[slice(4,8,1)]
errors = {}
errors[f"Code"] = f"{print_error_code_hex.upper()}"
errors[f"Error"] = f"{print_error_code_hex.upper()}: {get_print_error_text(print_error_code)}"
errors[f"code"] = print_error_code_hex.upper()
errors[f"error"] = get_print_error_text(print_error_code)
# LOGGER.warning(f"PRINT ERRORS: {errors}") # This will emit a message to home assistant log every 1 second if enabled
if self._error != errors:
self._error = errors
if self._client.callback is not None:
self._client.callback("event_print_error")
# We send the error event directly so always return False for the general data event.
@ -1296,13 +1314,23 @@ class ChamberImage:
def __init__(self, client):
self._client = client
self._bytes = bytearray()
self._image_last_updated = datetime.now()
def set_jpeg(self, bytes):
self._bytes = bytes
self._image_last_updated = datetime.now()
self._client.callback("event_printer_chamber_image_update")
def get_jpeg(self) -> bytearray:
return self._bytes.copy()
def get_last_update_time(self) -> datetime:
return self._image_last_updated
@property
def available(self):
return self._client._enable_camera
@dataclass
class CoverImage:
"""Returns the cover image from the Bambu API"""
@ -1311,7 +1339,6 @@ class CoverImage:
self._client = client
self._bytes = bytearray()
self._image_last_updated = datetime.now()
if self._client.callback is not None:
self._client.callback("event_printer_cover_image_update")
def set_jpeg(self, bytes):
@ -1449,7 +1476,7 @@ class SlicerSettings:
def update(self):
self.custom_filaments = {}
if self._client.bambu_cloud.auth_token != "" and self._client.local_mqtt is False:
if self._client.bambu_cloud.auth_token != "":
LOGGER.debug("Loading slicer settings")
slicer_settings = self._client.bambu_cloud.get_slicer_settings()
if slicer_settings is not None:

View File

@ -11,8 +11,9 @@ from .const import (
HMS_SEVERITY_LEVELS,
HMS_MODULES,
LOGGER,
BAMBU_URL,
FansEnum,
BAMBU_URL
TempEnum
)
from .commands import SEND_GCODE_TEMPLATE
@ -49,6 +50,18 @@ def fan_percentage_to_gcode(fan: FansEnum, percentage: int):
return command
def set_temperature_to_gcode(temp: TempEnum, temperature: int):
"""Converts a temperature to the gcode command to set that"""
if temp == TempEnum.NOZZLE:
tempCommand = "M104"
elif temp == TempEnum.HEATBED:
tempCommand = "M140"
command = SEND_GCODE_TEMPLATE
command['print']['param'] = f"{tempCommand} S{temperature}\n"
return command
def to_whole(number):
if not number:
return 0

View File

@ -14,7 +14,7 @@ plugin_package = "octoprint_bambu_printer"
plugin_name = "OctoPrint-BambuPrinter"
# The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module
plugin_version = "0.1.8rc11"
plugin_version = "0.1.8rc12"
# The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin
# module
@ -33,7 +33,7 @@ plugin_url = "https://github.com/jneilliii/OctoPrint-BambuPrinter"
plugin_license = "AGPLv3"
# Any additional requirements besides OctoPrint should be listed here
plugin_requires = ["paho-mqtt<2", "python-dateutil", "curl_cffi"]
plugin_requires = ["paho-mqtt<2", "python-dateutil", "cloudscraper"]
### --------------------------------------------------------------------------------------------------------------------
### More advanced options that you usually shouldn't have to touch follow after this point