Compare commits

..

11 Commits

Author SHA1 Message Date
e3fda73dd3 0.1.8rc7
* update pybambu module from upstream HA project, groundwork for fixing new cloud authorization process, #59
* potential fix for stuck progress/canceled printing status, #52
2024-11-09 16:15:13 -05:00
5633d6f6ea groundwork for plate processing contained in 3mf file, will store contents of plate_1.json in OctoPrint metadata for that file 2024-11-04 22:32:52 -05:00
884101c0ba bump version 2024-11-04 14:19:27 -05:00
7c87ba9482 fix base path for start print command for non X1 devices 2024-11-04 14:18:46 -05:00
1d9f874560 update user-agent 2024-11-01 20:59:31 -04:00
21e30034d0 switch to any comparison instead of all for allowed suffixes to prevent issues with file names that contain additional periods, ie part_v2.1.gcode.3mf 2024-11-01 20:12:34 -04:00
3c8b904a26 revert access code requirement 2024-10-30 00:38:25 -04:00
55ad4c1718 Merge branch 'rc' of https://github.com/jneilliii/OctoPrint-BambuPrinter into rc 2024-10-30 00:34:44 -04:00
4ef8e40702 remove call to creating connection in printer_worker thread since it's called on initialization. 2024-10-30 00:34:15 -04:00
2537bc8f57 remove call to creating connection in printer_worker thread since it's called on initialization. 2024-10-30 00:28:37 -04:00
28be048300 improve initial connection sequence, wait for bambu client connection before returning firmware information. 2024-10-28 17:56:41 -04:00
11 changed files with 320 additions and 103 deletions

View File

@ -8,6 +8,8 @@ from contextlib import contextmanager
import flask import flask
import logging.handlers import logging.handlers
from urllib.parse import quote as urlquote from urllib.parse import quote as urlquote
import os
import zipfile
import octoprint.printer import octoprint.printer
import octoprint.server import octoprint.server
@ -106,6 +108,11 @@ class BambuPrintPlugin(
"ams_current_tray": 255, "ams_current_tray": 255,
} }
def on_settings_save(self, data):
if data.get("local_mqtt", False) is True:
data["auth_token"] = ""
octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
def is_api_adminonly(self): def is_api_adminonly(self):
return True return True
@ -135,6 +142,15 @@ class BambuPrintPlugin(
def on_event(self, event, payload): def on_event(self, event, payload):
if event == Events.TRANSFER_DONE: if event == Events.TRANSFER_DONE:
self._printer.commands("M20 L T", force=True) self._printer.commands("M20 L T", force=True)
elif event == Events.FILE_ADDED:
if payload["operation"] == "add" and "3mf" in payload["type"]:
file_container = os.path.join(self._settings.getBaseFolder("uploads"), payload["path"])
with zipfile.ZipFile(file_container) as z:
with z.open("Metadata/plate_1.json", "r") as json_data:
plate_data = json.load(json_data)
if plate_data:
self._file_manager.set_additional_metadata("sdcard", payload["path"], "plate_data", plate_data, overwrite=True)
def support_3mf_files(self): def support_3mf_files(self):
return {"machinecode": {"3mf": ["3mf"]}} return {"machinecode": {"3mf": ["3mf"]}}

View File

@ -210,7 +210,7 @@ class BambuVirtualPrinter:
self._telemetry.bedTemp = temperatures.bed_temp self._telemetry.bedTemp = temperatures.bed_temp
self._telemetry.bedTargetTemp = temperatures.target_bed_temp self._telemetry.bedTargetTemp = temperatures.target_bed_temp
self._telemetry.chamberTemp = temperatures.chamber_temp self._telemetry.chamberTemp = temperatures.chamber_temp
if device_data.push_all_data: if device_data.push_all_data and "ams" in device_data.push_all_data:
self._telemetry.ams_current_tray = device_data.push_all_data["ams"]["tray_now"] or 255 self._telemetry.ams_current_tray = device_data.push_all_data["ams"]["tray_now"] or 255
if self._telemetry.ams_current_tray != self._settings.get_int(["ams_current_tray"]): if self._telemetry.ams_current_tray != self._settings.get_int(["ams_current_tray"]):
@ -345,21 +345,21 @@ class BambuVirtualPrinter:
self._selected_project_file = None self._selected_project_file = None
def select_project_file(self, file_path: str) -> bool: def select_project_file(self, file_path: str) -> bool:
self._log.debug(f"Select project file: {file_path}") file_info = self._project_files_view.get_file_by_name(file_path)
file_info = self._project_files_view.get_file_by_stem(
file_path, [".gcode", ".3mf"]
)
if ( if (
self._selected_project_file is not None self._selected_project_file is not None
and file_info is not None and file_info is not None
and self._selected_project_file.path == file_info.path and self._selected_project_file.path == file_info.path
): ):
self._log.debug(f"File already selected: {file_path}")
return True return True
if file_info is None: if file_info is None:
self._log.error(f"Cannot select non-existent file: {file_path}") self._log.error(f"Cannot select non-existent file: {file_path}")
return False return False
self._log.debug(f"Select project file: {file_path}")
self._selected_project_file = file_info self._selected_project_file = file_info
self._send_file_selected_message() self._send_file_selected_message()
return True return True
@ -367,8 +367,9 @@ class BambuVirtualPrinter:
##~~ command implementations ##~~ command implementations
@gcode_executor.register_no_data("M21") @gcode_executor.register_no_data("M21")
def _sd_status(self) -> None: def _sd_status(self) -> bool:
self.sendIO("SD card ok") self.sendIO("SD card ok")
return True
@gcode_executor.register("M23") @gcode_executor.register("M23")
def _select_sd_file(self, data: str) -> bool: def _select_sd_file(self, data: str) -> bool:
@ -469,6 +470,9 @@ class BambuVirtualPrinter:
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@gcode_executor.register_no_data("M115") @gcode_executor.register_no_data("M115")
def _report_firmware_info(self) -> bool: def _report_firmware_info(self) -> bool:
# wait for connection to be established before sending back firmware info
while self.bambu_client.connected is False:
time.sleep(1)
self.sendIO("Bambu Printer Integration") self.sendIO("Bambu Printer Integration")
self.sendIO("Cap:AUTOREPORT_SD_STATUS:1") self.sendIO("Cap:AUTOREPORT_SD_STATUS:1")
self.sendIO("Cap:AUTOREPORT_TEMP:1") self.sendIO("Cap:AUTOREPORT_TEMP:1")
@ -681,7 +685,7 @@ class BambuVirtualPrinter:
self._state_change_queue.join() self._state_change_queue.join()
def _printer_worker(self): def _printer_worker(self):
self._create_client_connection_async() # self._create_client_connection_async()
self.sendIO("Printer connection complete") self.sendIO("Printer connection complete")
while self._running: while self._running:
try: try:

View File

@ -35,8 +35,8 @@ class CachedFileView:
result: list[FileInfo] = [] result: list[FileInfo] = []
with self.file_system.get_ftps_client() as ftp: with self.file_system.get_ftps_client() as ftp:
for filter in self.folder_view.keys(): for key in self.folder_view.keys():
result.extend(self.file_system.list_files(*filter, ftp, existing_files)) result.extend(self.file_system.list_files(*key, ftp, existing_files))
return result return result
def update(self): def update(self):
@ -56,6 +56,9 @@ class CachedFileView:
def get_all_cached_info(self): def get_all_cached_info(self):
return list(self._file_data_cache.values()) return list(self._file_data_cache.values())
def get_keys_as_list(self):
return list(self._file_data_cache.keys()) + list(self._file_alias_cache.keys())
def get_file_data(self, file_path: str | Path) -> FileInfo | None: def get_file_data(self, file_path: str | Path) -> FileInfo | None:
file_data = self.get_file_data_cached(file_path) file_data = self.get_file_data_cached(file_path)
if file_data is None: if file_data is None:
@ -73,22 +76,19 @@ class CachedFileView:
file_path = self._file_alias_cache.get(file_path, file_path) file_path = self._file_alias_cache.get(file_path, file_path)
return self._file_data_cache.get(file_path, None) return self._file_data_cache.get(file_path, None)
def get_file_by_stem(self, file_stem: str, allowed_suffixes: list[str]): def get_file_by_name(self, file_name: str):
if file_stem == "": if file_name == "":
return None return None
file_stem = Path(file_stem).with_suffix("").stem file_list = self.get_keys_as_list()
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes) if not file_name in file_list:
if f"{file_name}.3mf" in file_list:
file_name = f"{file_name}.3mf"
elif f"{file_name}.gcode.3mf" in file_list:
file_name = f"{file_name}.gcode.3mf"
file_data = self.get_file_data_cached(file_name)
if file_data is None: if file_data is None:
self.update() self.update()
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes) return self.get_file_by_name(file_name)
return file_data return file_data
def _get_file_by_stem_cached(self, file_stem: str, allowed_suffixes: list[str]):
for file_path_str in list(self._file_data_cache.keys()) + list(self._file_alias_cache.keys()):
file_path = Path(file_path_str)
if file_stem == file_path.with_suffix("").stem and all(
suffix in allowed_suffixes for suffix in file_path.suffixes
):
return self.get_file_data_cached(file_path)
return None

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import queue import queue
import json import json
import math import math
@ -36,6 +37,7 @@ class WatchdogThread(threading.Thread):
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._last_received_data = time.time() self._last_received_data = time.time()
super().__init__() super().__init__()
self.daemon = True
self.setName(f"{self._client._device.info.device_type}-Watchdog-{threading.get_native_id()}") self.setName(f"{self._client._device.info.device_type}-Watchdog-{threading.get_native_id()}")
def stop(self): def stop(self):
@ -70,6 +72,7 @@ class ChamberImageThread(threading.Thread):
self._client = client self._client = client
self._stop_event = threading.Event() self._stop_event = threading.Event()
super().__init__() super().__init__()
self.daemon = True
self.setName(f"{self._client._device.info.device_type}-Chamber-{threading.get_native_id()}") self.setName(f"{self._client._device.info.device_type}-Chamber-{threading.get_native_id()}")
def stop(self): def stop(self):
@ -178,7 +181,7 @@ class ChamberImageThread(threading.Thread):
# Reset buffer # Reset buffer
img = None img = None
# else: # else:
# Otherwise we need to continue looping without reseting the buffer to receive the remaining data # Otherwise we need to continue looping without reseting the buffer to receive the remaining data
# and without delaying. # and without delaying.
@ -223,6 +226,7 @@ class MqttThread(threading.Thread):
self._client = client self._client = client
self._stop_event = threading.Event() self._stop_event = threading.Event()
super().__init__() super().__init__()
self.daemon = True
self.setName(f"{self._client._device.info.device_type}-Mqtt-{threading.get_native_id()}") self.setName(f"{self._client._device.info.device_type}-Mqtt-{threading.get_native_id()}")
def stop(self): def stop(self):
@ -282,7 +286,7 @@ class BambuClient:
_usage_hours: float _usage_hours: float
def __init__(self, device_type: str, serial: str, host: str, local_mqtt: bool, region: str, email: str, def __init__(self, device_type: str, serial: str, host: str, local_mqtt: bool, region: str, email: str,
username: str, auth_token: str, access_code: str, usage_hours: float = 0, manual_refresh_mode: bool = False): username: str, auth_token: str, access_code: str, usage_hours: float = 0, manual_refresh_mode: bool = False, chamber_image: bool = True):
self.callback = None self.callback = None
self.host = host self.host = host
self._local_mqtt = local_mqtt self._local_mqtt = local_mqtt
@ -299,6 +303,7 @@ class BambuClient:
self._device = Device(self) self._device = Device(self)
self.bambu_cloud = BambuCloud(region, email, username, auth_token) self.bambu_cloud = BambuCloud(region, email, username, auth_token)
self.slicer_settings = SlicerSettings(self) self.slicer_settings = SlicerSettings(self)
self.use_chamber_image = chamber_image
@property @property
def connected(self): def connected(self):
@ -319,6 +324,10 @@ class BambuClient:
# Reconnect normally # Reconnect normally
self.connect(self.callback) self.connect(self.callback)
def setup_tls(self):
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
self.client.tls_insecure_set(True)
def connect(self, callback): def connect(self, callback):
"""Connect to the MQTT Broker""" """Connect to the MQTT Broker"""
self.client = mqtt.Client() self.client = mqtt.Client()
@ -329,8 +338,9 @@ class BambuClient:
# Set aggressive reconnect polling. # Set aggressive reconnect polling.
self.client.reconnect_delay_set(min_delay=1, max_delay=1) self.client.reconnect_delay_set(min_delay=1, max_delay=1)
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE) # Run the blocking tls_set method in a separate thread
self.client.tls_insecure_set(True) self.setup_tls()
self._port = 8883 self._port = 8883
if self._local_mqtt: if self._local_mqtt:
self.client.username_pw_set("bblp", password=self._access_code) self.client.username_pw_set("bblp", password=self._access_code)
@ -369,10 +379,14 @@ class BambuClient:
self._watchdog = WatchdogThread(self) self._watchdog = WatchdogThread(self)
self._watchdog.start() self._watchdog.start()
if self._device.supports_feature(Features.CAMERA_IMAGE): if not self._device.supports_feature(Features.CAMERA_RTSP):
LOGGER.debug("Starting Chamber Image thread") if self._device.supports_feature(Features.CAMERA_IMAGE):
self._camera = ChamberImageThread(self) if self.use_chamber_image:
self._camera.start() LOGGER.debug("Starting Chamber Image thread")
self._camera = ChamberImageThread(self)
self._camera.start()
elif (self.host == "") or (self._access_code == ""):
LOGGER.debug("Skipping camera setup as local access details not provided.")
def try_on_connect(self, def try_on_connect(self,
client_: mqtt.Client, client_: mqtt.Client,
@ -396,7 +410,7 @@ class BambuClient:
"""Called when MQTT Disconnects""" """Called when MQTT Disconnects"""
LOGGER.warn(f"On Disconnect: Printer disconnected with error code: {result_code}") LOGGER.warn(f"On Disconnect: Printer disconnected with error code: {result_code}")
self._on_disconnect() self._on_disconnect()
def _on_disconnect(self): def _on_disconnect(self):
LOGGER.debug("_on_disconnect: Lost connection to the printer") LOGGER.debug("_on_disconnect: Lost connection to the printer")
self._connected = False self._connected = False
@ -451,9 +465,7 @@ class BambuClient:
LOGGER.debug("Got Version Data") LOGGER.debug("Got Version Data")
self._device.info_update(data=json_data.get("info")) self._device.info_update(data=json_data.get("info"))
except Exception as e: except Exception as e:
LOGGER.error("An exception occurred processing a message:") LOGGER.error("An exception occurred processing a message:", exc_info=e)
LOGGER.error(f"Exception type: {type(e)}")
LOGGER.error(f"Exception data: {e}")
def subscribe(self): def subscribe(self):
"""Subscribe to report topic""" """Subscribe to report topic"""
@ -516,8 +528,10 @@ class BambuClient:
self.client.on_disconnect = self.on_disconnect self.client.on_disconnect = self.on_disconnect
self.client.on_message = on_message self.client.on_message = on_message
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE) # Run the blocking tls_set method in a separate thread
self.client.tls_insecure_set(True) loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.setup_tls)
if self._local_mqtt: if self._local_mqtt:
self.client.username_pw_set("bblp", password=self._access_code) self.client.username_pw_set("bblp", password=self._access_code)
else: else:

View File

@ -2,11 +2,19 @@ from __future__ import annotations
import base64 import base64
import json import json
import httpx
from curl_cffi import requests
from dataclasses import dataclass from dataclasses import dataclass
from .const import LOGGER from .const import (
LOGGER,
BambuUrl
)
from .utils import get_Url
IMPERSONATE_BROWSER='chrome'
@dataclass @dataclass
class BambuCloud: class BambuCloud:
@ -16,22 +24,126 @@ class BambuCloud:
self._email = email self._email = email
self._username = username self._username = username
self._auth_token = auth_token self._auth_token = auth_token
self._tfaKey = None
def _get_headers_with_auth_token(self) -> dict:
headers = {}
headers['Authorization'] = f"Bearer {self._auth_token}"
return headers
def _get_authentication_token(self) -> dict: def _get_authentication_token(self) -> dict:
LOGGER.debug("Getting accessToken from Bambu Cloud") LOGGER.debug("Getting accessToken from Bambu Cloud")
if self._region == "China":
url = 'https://api.bambulab.cn/v1/user-service/user/login'
else:
url = 'https://api.bambulab.com/v1/user-service/user/login'
headers = {'User-Agent' : "HA Bambulab"}
data = {'account': self._email, 'password': self._password}
with httpx.Client(http2=True) as client:
response = client.post(url, headers=headers, json=data, timeout=10)
if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}")
raise ValueError(response.status_code)
return response.json()['accessToken']
# First we need to find out how Bambu wants us to login.
data = {
"account": self._email,
"password": self._password,
"apiError": ""
}
response = requests.post(get_Url(BambuUrl.LOGIN, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
if response.status_code >= 400:
LOGGER.error(f"Login attempt failed with error code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
LOGGER.debug(f"Response: {response.status_code}")
auth_json = response.json()
accessToken = auth_json.get('accessToken', '')
if accessToken != '':
# We were provided the accessToken directly.
return accessToken
loginType = auth_json.get("loginType", None)
if loginType is None:
LOGGER.error(f"loginType not present")
LOGGER.error(f"Response not understood: '{response.text}'")
return None
elif loginType == 'verifyCode':
LOGGER.debug(f"Received verifyCode response")
elif loginType == 'tfa':
# Store the tfaKey for later use
LOGGER.debug(f"Received tfa response")
self._tfaKey = auth_json.get("tfaKey")
else:
LOGGER.debug(f"Did not understand json. loginType = '{loginType}'")
LOGGER.error(f"Response not understood: '{response.text}'")
return loginType
def _get_email_verification_code(self):
# Send the verification code request
data = {
"email": self._email,
"type": "codeLogin"
}
LOGGER.debug("Requesting verification code")
response = requests.post(get_Url(BambuUrl.EMAIL_CODE, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
if response.status_code == 200:
LOGGER.debug("Verification code requested successfully.")
else:
LOGGER.error(f"Received error trying to send verification code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
def _get_authentication_token_with_verification_code(self, code) -> dict:
LOGGER.debug("Attempting to connect with provided verification code.")
data = {
"account": self._email,
"code": code
}
response = requests.post(get_Url(BambuUrl.LOGIN, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
LOGGER.debug(f"Response: {response.status_code}")
if response.status_code == 200:
LOGGER.debug("Authentication successful.")
elif response.status_code == 400:
LOGGER.debug(f"Response: '{response.json()}'")
if response.json()['code'] == 1:
# Code has expired. Request a new one.
self._get_email_verification_code()
return 'codeExpired'
elif response.json()['code'] == 2:
# Code was incorrect. Let the user try again.
return 'codeIncorrect'
else:
LOGGER.error(f"Response not understood: '{response.json()}'")
raise ValueError(response.json()['code'])
else:
LOGGER.error(f"Received error trying to authenticate with verification code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
return response.json()['accessToken']
def _get_authentication_token_with_2fa_code(self, code: str) -> dict:
LOGGER.debug("Attempting to connect with provided 2FA code.")
data = {
"tfaKey": self._tfaKey,
"tfaCode": code
}
response = requests.post(get_Url(BambuUrl.TFA_LOGIN, self._region), json=data, impersonate=IMPERSONATE_BROWSER)
LOGGER.debug(f"Response: {response.status_code}")
if response.status_code == 200:
LOGGER.debug("Authentication successful.")
else:
LOGGER.error(f"Received error trying to authenticate with verification code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise ValueError(response.status_code)
cookies = response.cookies.get_dict()
token_from_tfa = cookies.get("token")
LOGGER.debug(f"token_from_tfa: {token_from_tfa}")
return token_from_tfa
def _get_username_from_authentication_token(self) -> str: def _get_username_from_authentication_token(self) -> str:
# User name is in 2nd portion of the auth token (delimited with periods) # User name is in 2nd portion of the auth token (delimited with periods)
b64_string = self._auth_token.split(".")[1] b64_string = self._auth_token.split(".")[1]
@ -91,23 +203,41 @@ class BambuCloud:
return False return False
return True return True
def login(self, region: str, email: str, password: str): def login(self, region: str, email: str, password: str) -> str:
self._region = region self._region = region
self._email = email self._email = email
self._password = password self._password = password
self._auth_token = self._get_authentication_token() result = self._get_authentication_token()
if result == 'verifyCode':
return result
elif result == 'tfa':
return result
elif result is None:
LOGGER.error("Unable to authenticate.")
return None
else:
self._auth_token = result
self._username = self._get_username_from_authentication_token()
return 'success'
def login_with_verification_code(self, code: str):
result = self._get_authentication_token_with_verification_code(code)
if result == 'codeExpired' or result == 'codeIncorrect':
return result
self._auth_token = result
self._username = self._get_username_from_authentication_token() self._username = self._get_username_from_authentication_token()
return 'success'
def login_with_2fa_code(self, code: str):
result = self._get_authentication_token_with_2fa_code(code)
self._auth_token = result
self._username = self._get_username_from_authentication_token()
return 'success'
def get_device_list(self) -> dict: def get_device_list(self) -> dict:
LOGGER.debug("Getting device list from Bambu Cloud") LOGGER.debug("Getting device list from Bambu Cloud")
if self._region == "China": response = requests.get(get_Url(BambuUrl.BIND, self._region), headers=self._get_headers_with_auth_token(), timeout=10, impersonate=IMPERSONATE_BROWSER)
url = 'https://api.bambulab.cn/v1/iot-service/api/user/bind'
else:
url = 'https://api.bambulab.com/v1/iot-service/api/user/bind'
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
with httpx.Client(http2=True) as client:
response = client.get(url, headers=headers, timeout=10)
if response.status_code >= 400: if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}") LOGGER.debug(f"Received error: {response.status_code}")
raise ValueError(response.status_code) raise ValueError(response.status_code)
@ -182,15 +312,10 @@ class BambuCloud:
def get_slicer_settings(self) -> dict: def get_slicer_settings(self) -> dict:
LOGGER.debug("Getting slicer settings from Bambu Cloud") LOGGER.debug("Getting slicer settings from Bambu Cloud")
if self._region == "China": response = requests.get(get_Url(BambuUrl.SLICER_SETTINGS, self._region), headers=self._get_headers_with_auth_token(), timeout=10, impersonate=IMPERSONATE_BROWSER)
url = 'https://api.bambulab.cn/v1/iot-service/api/slicer/setting?version=undefined'
else:
url = 'https://api.bambulab.com/v1/iot-service/api/slicer/setting?version=undefined'
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
with httpx.Client(http2=True) as client:
response = client.get(url, headers=headers, timeout=10)
if response.status_code >= 400: if response.status_code >= 400:
LOGGER.error(f"Slicer settings load failed: {response.status_code}") LOGGER.error(f"Slicer settings load failed: {response.status_code}")
LOGGER.error(f"Slicer settings load failed: '{response.text}'")
return None return None
return response.json() return response.json()
@ -237,20 +362,16 @@ class BambuCloud:
# }, # },
def get_tasklist(self) -> dict: def get_tasklist(self) -> dict:
if self._region == "China": url = get_Url(BambuUrl.TASKS, self._region)
url = 'https://api.bambulab.cn/v1/user-service/my/tasks' response = requests.get(url, headers=self._get_headers_with_auth_token(), timeout=10, impersonate=IMPERSONATE_BROWSER)
else:
url = 'https://api.bambulab.com/v1/user-service/my/tasks'
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
with httpx.Client(http2=True) as client:
response = client.get(url, headers=headers, timeout=10)
if response.status_code >= 400: if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}") LOGGER.debug(f"Received error: {response.status_code}")
LOGGER.debug(f"Received error: '{response.text}'")
raise ValueError(response.status_code) raise ValueError(response.status_code)
return response.json() return response.json()
def get_latest_task_for_printer(self, deviceId: str) -> dict: def get_latest_task_for_printer(self, deviceId: str) -> dict:
LOGGER.debug(f"Getting latest task from Bambu Cloud for Printer: {deviceId}") LOGGER.debug(f"Getting latest task from Bambu Cloud")
data = self.get_tasklist_for_printer(deviceId) data = self.get_tasklist_for_printer(deviceId)
if len(data) != 0: if len(data) != 0:
return data[0] return data[0]
@ -258,7 +379,7 @@ class BambuCloud:
return None return None
def get_tasklist_for_printer(self, deviceId: str) -> dict: def get_tasklist_for_printer(self, deviceId: str) -> dict:
LOGGER.debug(f"Getting task list from Bambu Cloud for Printer: {deviceId}") LOGGER.debug(f"Getting task list from Bambu Cloud")
tasks = [] tasks = []
data = self.get_tasklist() data = self.get_tasklist()
for task in data['hits']: for task in data['hits']:
@ -273,8 +394,7 @@ class BambuCloud:
def download(self, url: str) -> bytearray: def download(self, url: str) -> bytearray:
LOGGER.debug(f"Downloading cover image: {url}") LOGGER.debug(f"Downloading cover image: {url}")
with httpx.Client(http2=True) as client: response = requests.get(url, timeout=10, impersonate=IMPERSONATE_BROWSER)
response = client.get(url, timeout=10)
if response.status_code >= 400: if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}") LOGGER.debug(f"Received error: {response.status_code}")
raise ValueError(response.status_code) raise ValueError(response.status_code)

View File

@ -27,6 +27,7 @@ class Features(Enum):
CAMERA_IMAGE = 15, CAMERA_IMAGE = 15,
DOOR_SENSOR = 16, DOOR_SENSOR = 16,
MANUAL_MODE = 17, MANUAL_MODE = 17,
AMS_FILAMENT_REMAINING = 18,
class FansEnum(Enum): class FansEnum(Enum):
@ -1220,3 +1221,19 @@ class Home_Flag_Values(IntEnum):
SUPPORTED_PLUS = 0x08000000, SUPPORTED_PLUS = 0x08000000,
# Gap # Gap
class BambuUrl(Enum):
LOGIN = 1,
TFA_LOGIN = 2,
EMAIL_CODE = 3,
BIND = 4,
SLICER_SETTINGS = 5,
TASKS = 6,
BAMBU_URL = {
BambuUrl.LOGIN: 'https://api.bambulab.com/v1/user-service/user/login',
BambuUrl.TFA_LOGIN: 'https://bambulab.com/api/sign-in/tfa',
BambuUrl.EMAIL_CODE: 'https://api.bambulab.com/v1/user-service/user/sendemail/code',
BambuUrl.BIND: 'https://api.bambulab.com/v1/iot-service/api/user/bind',
BambuUrl.SLICER_SETTINGS: 'https://api.bambulab.com/v1/iot-service/api/slicer/setting?version=undefined',
BambuUrl.TASKS: 'https://api.bambulab.com/v1/user-service/my/tasks',
}

View File

@ -78,7 +78,6 @@ class Device:
send_event = send_event | self.home_flag.print_update(data = data) send_event = send_event | self.home_flag.print_update(data = data)
if send_event and self._client.callback is not None: if send_event and self._client.callback is not None:
LOGGER.debug("event_printer_data_update")
self._client.callback("event_printer_data_update") self._client.callback("event_printer_data_update")
if data.get("msg", 0) == 0: if data.get("msg", 0) == 0:
@ -93,7 +92,7 @@ class Device:
def supports_feature(self, feature): def supports_feature(self, feature):
if feature == Features.AUX_FAN: if feature == Features.AUX_FAN:
return True return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
elif feature == Features.CHAMBER_LIGHT: elif feature == Features.CHAMBER_LIGHT:
return True return True
elif feature == Features.CHAMBER_FAN: elif feature == Features.CHAMBER_FAN:
@ -124,6 +123,9 @@ class Device:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E" return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.MANUAL_MODE: elif feature == Features.MANUAL_MODE:
return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI" return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI"
elif feature == Features.AMS_FILAMENT_REMAINING:
# Technically this is not the AMS Lite but that's currently tied to only these printer types.
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
return False return False
@ -384,7 +386,7 @@ class PrintJob:
values = {} values = {}
for i in range(16): for i in range(16):
if self._ams_print_weights[i] != 0: if self._ams_print_weights[i] != 0:
values[f"AMS Slot {i}"] = self._ams_print_weights[i] values[f"AMS Slot {i+1}"] = self._ams_print_weights[i]
return values return values
@property @property
@ -392,7 +394,7 @@ class PrintJob:
values = {} values = {}
for i in range(16): for i in range(16):
if self._ams_print_lengths[i] != 0: if self._ams_print_lengths[i] != 0:
values[f"AMS Slot {i}"] = self._ams_print_lengths[i] values[f"AMS Slot {i+1}"] = self._ams_print_lengths[i]
return values return values
def __init__(self, client): def __init__(self, client):
@ -450,7 +452,8 @@ class PrintJob:
self.gcode_file = data.get("gcode_file", self.gcode_file) self.gcode_file = data.get("gcode_file", self.gcode_file)
self.print_type = data.get("print_type", self.print_type) self.print_type = data.get("print_type", self.print_type)
if self.print_type.lower() not in PRINT_TYPE_OPTIONS: if self.print_type.lower() not in PRINT_TYPE_OPTIONS:
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'") if self.print_type != "":
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'")
self.print_type = "unknown" self.print_type = "unknown"
self.subtask_name = data.get("subtask_name", self.subtask_name) self.subtask_name = data.get("subtask_name", self.subtask_name)
self.file_type_icon = "mdi:file" if self.print_type != "cloud" else "mdi:cloud-outline" self.file_type_icon = "mdi:file" if self.print_type != "cloud" else "mdi:cloud-outline"
@ -471,9 +474,7 @@ class PrintJob:
if data.get("mc_remaining_time") is not None: if data.get("mc_remaining_time") is not None:
existing_remaining_time = self.remaining_time existing_remaining_time = self.remaining_time
self.remaining_time = data.get("mc_remaining_time") self.remaining_time = data.get("mc_remaining_time")
if self.start_time is None: if existing_remaining_time != self.remaining_time:
self.end_time = None
elif existing_remaining_time != self.remaining_time:
self.end_time = get_end_time(self.remaining_time) self.end_time = get_end_time(self.remaining_time)
LOGGER.debug(f"END TIME2: {self.end_time}") LOGGER.debug(f"END TIME2: {self.end_time}")
@ -796,6 +797,12 @@ class Info:
@dataclass @dataclass
class AMSInstance: class AMSInstance:
"""Return all AMS instance related info""" """Return all AMS instance related info"""
serial: str
sw_version: str
hw_version: str
humidity_index: int
temperature: int
tray: list["AMSTray"]
def __init__(self, client): def __init__(self, client):
self.serial = "" self.serial = ""
@ -813,11 +820,14 @@ class AMSInstance:
@dataclass @dataclass
class AMSList: class AMSList:
"""Return all AMS related info""" """Return all AMS related info"""
tray_now: int
data: list[AMSInstance]
def __init__(self, client): def __init__(self, client):
self._client = client self._client = client
self.tray_now = 0 self.tray_now = 0
self.data = [None] * 4 self.data = [None] * 4
self._first_initialization_done = False
def info_update(self, data): def info_update(self, data):
old_data = f"{self.__dict__}" old_data = f"{self.__dict__}"
@ -859,7 +869,7 @@ class AMSList:
if index != -1: if index != -1:
# Sometimes we get incomplete version data. We have to skip if that occurs since the serial number is # Sometimes we get incomplete version data. We have to skip if that occurs since the serial number is
# requires as part of the home assistant device identity. # required as part of the home assistant device identity.
if not module['sn'] == '': if not module['sn'] == '':
# May get data before info so create entries if necessary # May get data before info so create entries if necessary
if self.data[index] is None: if self.data[index] is None:
@ -874,6 +884,9 @@ class AMSList:
if self.data[index].hw_version != module['hw_ver']: if self.data[index].hw_version != module['hw_ver']:
data_changed = True data_changed = True
self.data[index].hw_version = module['hw_ver'] self.data[index].hw_version = module['hw_ver']
elif not self._first_initialization_done:
self._first_initialization_done = True
data_changed = True
data_changed = data_changed or (old_data != f"{self.__dict__}") data_changed = data_changed or (old_data != f"{self.__dict__}")
@ -969,6 +982,19 @@ class AMSList:
@dataclass @dataclass
class AMSTray: class AMSTray:
"""Return all AMS tray related info""" """Return all AMS tray related info"""
empty: bool
idx: int
name: str
type: str
sub_brands: str
color: str
nozzle_temp_min: int
nozzle_temp_max: int
remain: int
k: float
tag_uid: str
tray_uuid: str
def __init__(self, client): def __init__(self, client):
self._client = client self._client = client
@ -982,7 +1008,8 @@ class AMSTray:
self.nozzle_temp_max = 0 self.nozzle_temp_max = 0
self.remain = 0 self.remain = 0
self.k = 0 self.k = 0
self.tag_uid = "0000000000000000" self.tag_uid = ""
self.tray_uuid = ""
def print_update(self, data) -> bool: def print_update(self, data) -> bool:
old_data = f"{self.__dict__}" old_data = f"{self.__dict__}"
@ -998,7 +1025,8 @@ class AMSTray:
self.nozzle_temp_min = 0 self.nozzle_temp_min = 0
self.nozzle_temp_max = 0 self.nozzle_temp_max = 0
self.remain = 0 self.remain = 0
self.tag_uid = "0000000000000000" self.tag_uid = ""
self.tray_uuid = ""
self.k = 0 self.k = 0
else: else:
self.empty = False self.empty = False
@ -1011,6 +1039,7 @@ class AMSTray:
self.nozzle_temp_max = data.get('nozzle_temp_max', self.nozzle_temp_max) self.nozzle_temp_max = data.get('nozzle_temp_max', self.nozzle_temp_max)
self.remain = data.get('remain', self.remain) self.remain = data.get('remain', self.remain)
self.tag_uid = data.get('tag_uid', self.tag_uid) self.tag_uid = data.get('tag_uid', self.tag_uid)
self.tray_uuid = data.get('tray_uuid', self.tray_uuid)
self.k = data.get('k', self.k) self.k = data.get('k', self.k)
return (old_data != f"{self.__dict__}") return (old_data != f"{self.__dict__}")
@ -1191,8 +1220,9 @@ class PrintErrorList:
_count: int _count: int
def __init__(self, client): def __init__(self, client):
self._error = None
self._count = 0
self._client = client self._client = client
self._error = {}
def print_update(self, data) -> bool: def print_update(self, data) -> bool:
# Example payload: # Example payload:
@ -1202,7 +1232,7 @@ class PrintErrorList:
# 'Unable to feed filament into the extruder. This could be due to entangled filament or a stuck spool. If not, please check if the AMS PTFE tube is connected.' # 'Unable to feed filament into the extruder. This could be due to entangled filament or a stuck spool. If not, please check if the AMS PTFE tube is connected.'
if 'print_error' in data.keys(): if 'print_error' in data.keys():
errors = {} errors = None
print_error_code = data.get('print_error') print_error_code = data.get('print_error')
if print_error_code != 0: if print_error_code != 0:
hex_conversion = f'0{int(print_error_code):x}' hex_conversion = f'0{int(print_error_code):x}'
@ -1232,6 +1262,8 @@ class PrintErrorList:
@dataclass @dataclass
class HMSNotification: class HMSNotification:
"""Return an HMS object and all associated details""" """Return an HMS object and all associated details"""
attr: int
code: int
def __init__(self, attr: int = 0, code: int = 0): def __init__(self, attr: int = 0, code: int = 0):
self.attr = attr self.attr = attr

View File

@ -12,6 +12,7 @@ from .const import (
HMS_MODULES, HMS_MODULES,
LOGGER, LOGGER,
FansEnum, FansEnum,
BAMBU_URL
) )
from .commands import SEND_GCODE_TEMPLATE from .commands import SEND_GCODE_TEMPLATE
@ -59,8 +60,8 @@ def get_filament_name(idx, custom_filaments: dict):
result = FILAMENT_NAMES.get(idx, "unknown") result = FILAMENT_NAMES.get(idx, "unknown")
if result == "unknown" and idx != "": if result == "unknown" and idx != "":
result = custom_filaments.get(idx, "unknown") result = custom_filaments.get(idx, "unknown")
if result == "unknown" and idx != "": # if result == "unknown" and idx != "":
LOGGER.debug(f"UNKNOWN FILAMENT IDX: '{idx}'") # LOGGER.debug(f"UNKNOWN FILAMENT IDX: '{idx}'")
return result return result
@ -225,3 +226,10 @@ def round_minute(date: datetime = None, round_to: int = 1):
date = date.replace(second=0, microsecond=0) date = date.replace(second=0, microsecond=0)
delta = date.minute % round_to delta = date.minute % round_to
return date.replace(minute=date.minute - delta) return date.replace(minute=date.minute - delta)
def get_Url(url: str, region: str):
urlstr = BAMBU_URL[url]
if region == "China":
urlstr = urlstr.replace('.com', '.cn')
return urlstr

View File

@ -25,7 +25,7 @@ class IdleState(APrinterState):
filesystem_root = ( filesystem_root = (
"file:///mnt/sdcard/" "file:///mnt/sdcard/"
if self._printer._settings.get(["device_type"]) in ["X1", "X1C"] if self._printer._settings.get(["device_type"]) in ["X1", "X1C"]
else "file:///" else "file:///sdcard/"
) )
print_command = { print_command = {

View File

@ -22,7 +22,7 @@ class PrintingState(APrinterState):
def __init__(self, printer: BambuVirtualPrinter) -> None: def __init__(self, printer: BambuVirtualPrinter) -> None:
super().__init__(printer) super().__init__(printer)
self._current_print_job = None self._printer.current_print_job = None
self._is_printing = False self._is_printing = False
self._sd_printing_thread = None self._sd_printing_thread = None
@ -40,12 +40,15 @@ class PrintingState(APrinterState):
self._printer.current_print_job = None self._printer.current_print_job = None
def _start_worker_thread(self): def _start_worker_thread(self):
self._is_printing = True
if self._sd_printing_thread is None: if self._sd_printing_thread is None:
self._is_printing = True
self._sd_printing_thread = threading.Thread(target=self._printing_worker) self._sd_printing_thread = threading.Thread(target=self._printing_worker)
self._sd_printing_thread.start() self._sd_printing_thread.start()
else:
self._sd_printing_thread.join()
def _printing_worker(self): def _printing_worker(self):
self._log.debug(f"_printing_worker before while loop: {self._printer.current_print_job}")
while ( while (
self._is_printing self._is_printing
and self._printer.current_print_job is not None and self._printer.current_print_job is not None
@ -55,6 +58,7 @@ class PrintingState(APrinterState):
self._printer.report_print_job_status() self._printer.report_print_job_status()
time.sleep(3) time.sleep(3)
self._log.debug(f"_printing_worker after while loop: {self._printer.current_print_job}")
self.update_print_job_info() self.update_print_job_info()
if ( if (
self._printer.current_print_job is not None self._printer.current_print_job is not None
@ -64,13 +68,15 @@ class PrintingState(APrinterState):
def update_print_job_info(self): def update_print_job_info(self):
print_job_info = self._printer.bambu_client.get_device().print_job print_job_info = self._printer.bambu_client.get_device().print_job
task_name: str = print_job_info.subtask_name subtask_name: str = print_job_info.subtask_name
project_file_info = self._printer.project_files.get_file_by_stem( gcode_file: str = print_job_info.gcode_file
task_name, [".gcode", ".3mf"]
) self._log.info(f"update_print_job_info: {print_job_info}")
project_file_info = self._printer.project_files.get_file_by_name(subtask_name) or self._printer.project_files.get_file_by_name(gcode_file)
if project_file_info is None: if project_file_info is None:
self._log.debug(f"No 3mf file found for {print_job_info}") self._log.debug(f"No 3mf file found for {print_job_info}")
self._current_print_job = None self._printer.current_print_job = None
self._printer.change_state(self._printer._state_idle) self._printer.change_state(self._printer._state_idle)
return return

View File

@ -14,7 +14,7 @@ plugin_package = "octoprint_bambu_printer"
plugin_name = "OctoPrint-BambuPrinter" plugin_name = "OctoPrint-BambuPrinter"
# The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module # The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module
plugin_version = "0.1.8rc2" plugin_version = "0.1.8rc7"
# The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin # The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin
# module # module
@ -33,7 +33,7 @@ plugin_url = "https://github.com/jneilliii/OctoPrint-BambuPrinter"
plugin_license = "AGPLv3" plugin_license = "AGPLv3"
# Any additional requirements besides OctoPrint should be listed here # Any additional requirements besides OctoPrint should be listed here
plugin_requires = ["paho-mqtt<2", "python-dateutil", "httpx[http2]>=0.27.0"] plugin_requires = ["paho-mqtt<2", "python-dateutil", "curl_cffi"]
### -------------------------------------------------------------------------------------------------------------------- ### --------------------------------------------------------------------------------------------------------------------
### More advanced options that you usually shouldn't have to touch follow after this point ### More advanced options that you usually shouldn't have to touch follow after this point