Compare commits

...

7 Commits

Author SHA1 Message Date
52ba3ff214 0.1.8rc12
update pybambu module from upstream HA project
2024-12-01 16:30:12 -05:00
98ab94b371 0.1.8rc11
update pybambu/bambu_cloud.py from upstream HA project
2024-11-14 23:35:03 -05:00
b54e372342 0.1.8rc10
reduce settings save by refactoring ams data updates
2024-11-14 21:26:41 -05:00
76f706df19 0.1.8rc9
* check for filename in cache subfolder for files started through cloud connected printers
* send 0% progress when in prepare state and progress is 100
* minor UI tweaks
2024-11-09 21:57:17 -05:00
5c8a9787d4 0.1.8rc8
* updates to pybambu module
* update cloud login workflow, #59
2024-11-09 20:01:41 -05:00
e3fda73dd3 0.1.8rc7
* update pybambu module from upstream HA project, groundwork for fixing new cloud authorization process, #59
* potential fix for stuck progress/canceled printing status, #52
2024-11-09 16:15:13 -05:00
5633d6f6ea groundwork for plate processing contained in 3mf file, will store contents of plate_1.json in OctoPrint metadata for that file 2024-11-04 22:32:52 -05:00
13 changed files with 761 additions and 1278 deletions

View File

@ -8,6 +8,8 @@ from contextlib import contextmanager
import flask import flask
import logging.handlers import logging.handlers
from urllib.parse import quote as urlquote from urllib.parse import quote as urlquote
import os
import zipfile
import octoprint.printer import octoprint.printer
import octoprint.server import octoprint.server
@ -59,6 +61,7 @@ class BambuPrintPlugin(
_plugin_manager: octoprint.plugin.PluginManager _plugin_manager: octoprint.plugin.PluginManager
_bambu_file_system: RemoteSDCardFileList _bambu_file_system: RemoteSDCardFileList
_timelapse_files_view: CachedFileView _timelapse_files_view: CachedFileView
_bambu_cloud: None
def on_settings_initialized(self): def on_settings_initialized(self):
self._bambu_file_system = RemoteSDCardFileList(self._settings) self._bambu_file_system = RemoteSDCardFileList(self._settings)
@ -106,11 +109,17 @@ class BambuPrintPlugin(
"ams_current_tray": 255, "ams_current_tray": 255,
} }
def on_settings_save(self, data):
if data.get("local_mqtt", False) is True:
data["auth_token"] = ""
octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
def is_api_adminonly(self): def is_api_adminonly(self):
return True return True
def get_api_commands(self): def get_api_commands(self):
return {"register": ["email", "password", "region", "auth_token"]} return {"register": ["email", "password", "region", "auth_token"],
"verify": ["auth_type", "password"]}
def on_api_command(self, command, data): def on_api_command(self, command, data):
if command == "register": if command == "register":
@ -121,20 +130,57 @@ class BambuPrintPlugin(
and "auth_token" in data and "auth_token" in data
): ):
self._logger.info(f"Registering user {data['email']}") self._logger.info(f"Registering user {data['email']}")
bambu_cloud = BambuCloud( self._bambu_cloud = BambuCloud(
data["region"], data["email"], data["password"], data["auth_token"] data["region"], data["email"], data["password"], data["auth_token"]
) )
bambu_cloud.login(data["region"], data["email"], data["password"]) auth_response = self._bambu_cloud.login(data["region"], data["email"], data["password"])
return flask.jsonify( return flask.jsonify(
{ {
"auth_token": bambu_cloud.auth_token, "auth_response": auth_response,
"username": bambu_cloud.username,
} }
) )
elif command == "verify":
auth_response = None
if (
"auth_type" in data
and "password" in data
and self._bambu_cloud is not None
):
self._logger.info(f"Verifying user {self._bambu_cloud._email}")
if data["auth_type"] == "verifyCode":
auth_response = self._bambu_cloud.login_with_verification_code(data["password"])
elif data["auth_type"] == "tfa":
auth_response = self._bambu_cloud.login_with_2fa_code(data["password"])
else:
self._logger.warning(f"Unknown verification type: {data['auth_type']}")
if auth_response == "success":
return flask.jsonify(
{
"auth_token": self._bambu_cloud.auth_token,
"username": self._bambu_cloud.username
}
)
else:
self._logger.info(f"Error verifying: {auth_response}")
return flask.jsonify(
{
"error": "Unable to verify"
}
)
def on_event(self, event, payload): def on_event(self, event, payload):
if event == Events.TRANSFER_DONE: if event == Events.TRANSFER_DONE:
self._printer.commands("M20 L T", force=True) self._printer.commands("M20 L T", force=True)
elif event == Events.FILE_ADDED:
if payload["operation"] == "add" and "3mf" in payload["type"]:
file_container = os.path.join(self._settings.getBaseFolder("uploads"), payload["path"])
with zipfile.ZipFile(file_container) as z:
with z.open("Metadata/plate_1.json", "r") as json_data:
plate_data = json.load(json_data)
if plate_data:
self._file_manager.set_additional_metadata("sdcard", payload["path"], "plate_data", plate_data, overwrite=True)
def support_3mf_files(self): def support_3mf_files(self):
return {"machinecode": {"3mf": ["3mf"]}} return {"machinecode": {"3mf": ["3mf"]}}

View File

@ -170,22 +170,6 @@ class BambuVirtualPrinter:
def change_state(self, new_state: APrinterState): def change_state(self, new_state: APrinterState):
self._state_change_queue.put(new_state) self._state_change_queue.put(new_state)
def _convert2serialize(self, obj):
if isinstance(obj, dict):
return {k: self._convert2serialize(v) for k, v in obj.items()}
elif hasattr(obj, "_ast"):
return self._convert2serialize(obj._ast())
elif not isinstance(obj, str) and hasattr(obj, "__iter__"):
return [self._convert2serialize(v) for v in obj]
elif hasattr(obj, "__dict__"):
return {
k: self._convert2serialize(v)
for k, v in obj.__dict__.items()
if not callable(v) and not k.startswith('_')
}
else:
return obj
def new_update(self, event_type): def new_update(self, event_type):
if event_type == "event_hms_errors": if event_type == "event_hms_errors":
self._update_hms_errors() self._update_hms_errors()
@ -196,7 +180,8 @@ class BambuVirtualPrinter:
device_data = self.bambu_client.get_device() device_data = self.bambu_client.get_device()
print_job_state = device_data.print_job.gcode_state print_job_state = device_data.print_job.gcode_state
temperatures = device_data.temperature temperatures = device_data.temperature
ams_data = self._convert2serialize(device_data.ams.data) # strip out extra data to avoid unneeded settings updates
ams_data = [{"tray": asdict(x).pop("tray", None)} for x in device_data.ams.data if x is not None]
if self.ams_data != ams_data: if self.ams_data != ams_data:
self._log.debug(f"Recieveid AMS Update: {ams_data}") self._log.debug(f"Recieveid AMS Update: {ams_data}")
@ -274,19 +259,20 @@ class BambuVirtualPrinter:
f"connecting via local mqtt: {self._settings.get_boolean(['local_mqtt'])}" f"connecting via local mqtt: {self._settings.get_boolean(['local_mqtt'])}"
) )
bambu_client = BambuClient( bambu_client = BambuClient(
device_type=self._settings.get(["device_type"]), {"device_type": self._settings.get(["device_type"]),
serial=self._settings.get(["serial"]), "serial": self._settings.get(["serial"]),
host=self._settings.get(["host"]), "host": self._settings.get(["host"]),
username=( "username": (
"bblp" "bblp"
if self._settings.get_boolean(["local_mqtt"]) if self._settings.get_boolean(["local_mqtt"])
else self._settings.get(["username"]) else self._settings.get(["username"])
), ),
access_code=self._settings.get(["access_code"]), "access_code": self._settings.get(["access_code"]),
local_mqtt=self._settings.get_boolean(["local_mqtt"]), "local_mqtt": self._settings.get_boolean(["local_mqtt"]),
region=self._settings.get(["region"]), "region": self._settings.get(["region"]),
email=self._settings.get(["email"]), "email": self._settings.get(["email"]),
auth_token=self._settings.get(["auth_token"]), "auth_token": self._settings.get(["auth_token"]) if self._settings.get_boolean(["local_mqtt"]) is False else "",
}
) )
bambu_client.on_disconnect = self.on_disconnect(bambu_client.on_disconnect) bambu_client.on_disconnect = self.on_disconnect(bambu_client.on_disconnect)
bambu_client.on_connect = self.on_connect(bambu_client.on_connect) bambu_client.on_connect = self.on_connect(bambu_client.on_connect)
@ -345,21 +331,21 @@ class BambuVirtualPrinter:
self._selected_project_file = None self._selected_project_file = None
def select_project_file(self, file_path: str) -> bool: def select_project_file(self, file_path: str) -> bool:
self._log.debug(f"Select project file: {file_path}") file_info = self._project_files_view.get_file_by_name(file_path)
file_info = self._project_files_view.get_file_by_stem(
file_path, [".gcode", ".3mf"]
)
if ( if (
self._selected_project_file is not None self._selected_project_file is not None
and file_info is not None and file_info is not None
and self._selected_project_file.path == file_info.path and self._selected_project_file.path == file_info.path
): ):
self._log.debug(f"File already selected: {file_path}")
return True return True
if file_info is None: if file_info is None:
self._log.error(f"Cannot select non-existent file: {file_path}") self._log.error(f"Cannot select non-existent file: {file_path}")
return False return False
self._log.debug(f"Select project file: {file_path}")
self._selected_project_file = file_info self._selected_project_file = file_info
self._send_file_selected_message() self._send_file_selected_message()
return True return True

View File

@ -35,8 +35,8 @@ class CachedFileView:
result: list[FileInfo] = [] result: list[FileInfo] = []
with self.file_system.get_ftps_client() as ftp: with self.file_system.get_ftps_client() as ftp:
for filter in self.folder_view.keys(): for key in self.folder_view.keys():
result.extend(self.file_system.list_files(*filter, ftp, existing_files)) result.extend(self.file_system.list_files(*key, ftp, existing_files))
return result return result
def update(self): def update(self):
@ -56,6 +56,9 @@ class CachedFileView:
def get_all_cached_info(self): def get_all_cached_info(self):
return list(self._file_data_cache.values()) return list(self._file_data_cache.values())
def get_keys_as_list(self):
return list(self._file_data_cache.keys()) + list(self._file_alias_cache.keys())
def get_file_data(self, file_path: str | Path) -> FileInfo | None: def get_file_data(self, file_path: str | Path) -> FileInfo | None:
file_data = self.get_file_data_cached(file_path) file_data = self.get_file_data_cached(file_path)
if file_data is None: if file_data is None:
@ -73,22 +76,23 @@ class CachedFileView:
file_path = self._file_alias_cache.get(file_path, file_path) file_path = self._file_alias_cache.get(file_path, file_path)
return self._file_data_cache.get(file_path, None) return self._file_data_cache.get(file_path, None)
def get_file_by_stem(self, file_stem: str, allowed_suffixes: list[str]): def get_file_by_name(self, file_name: str):
if file_stem == "": if file_name == "":
return None return None
file_stem = Path(file_stem).with_suffix("").stem file_list = self.get_keys_as_list()
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes) if not file_name in file_list:
if f"{file_name}.3mf" in file_list:
file_name = f"{file_name}.3mf"
elif f"{file_name}.gcode.3mf" in file_list:
file_name = f"{file_name}.gcode.3mf"
elif f"cache/{file_name}.3mf" in file_list:
file_name = f"cache/{file_name}.3mf"
elif f"cache/{file_name}.gcode.3mf" in file_list:
file_name = f"cache/{file_name}.gcode.3mf"
file_data = self.get_file_data_cached(file_name)
if file_data is None: if file_data is None:
self.update() self.update()
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes) return self.get_file_by_name(file_name)
return file_data return file_data
def _get_file_by_stem_cached(self, file_stem: str, allowed_suffixes: list[str]):
for file_path_str in list(self._file_data_cache.keys()) + list(self._file_alias_cache.keys()):
file_path = Path(file_path_str)
if file_stem == file_path.with_suffix("").stem and any(
suffix in allowed_suffixes for suffix in file_path.suffixes
):
return self.get_file_data_cached(file_path)
return None

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import queue import queue
import json import json
import math import math
@ -36,6 +37,7 @@ class WatchdogThread(threading.Thread):
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._last_received_data = time.time() self._last_received_data = time.time()
super().__init__() super().__init__()
self.daemon = True
self.setName(f"{self._client._device.info.device_type}-Watchdog-{threading.get_native_id()}") self.setName(f"{self._client._device.info.device_type}-Watchdog-{threading.get_native_id()}")
def stop(self): def stop(self):
@ -70,6 +72,7 @@ class ChamberImageThread(threading.Thread):
self._client = client self._client = client
self._stop_event = threading.Event() self._stop_event = threading.Event()
super().__init__() super().__init__()
self.daemon = True
self.setName(f"{self._client._device.info.device_type}-Chamber-{threading.get_native_id()}") self.setName(f"{self._client._device.info.device_type}-Chamber-{threading.get_native_id()}")
def stop(self): def stop(self):
@ -223,6 +226,7 @@ class MqttThread(threading.Thread):
self._client = client self._client = client
self._stop_event = threading.Event() self._stop_event = threading.Event()
super().__init__() super().__init__()
self.daemon = True
self.setName(f"{self._client._device.info.device_type}-Mqtt-{threading.get_native_id()}") self.setName(f"{self._client._device.info.device_type}-Mqtt-{threading.get_native_id()}")
def stop(self): def stop(self):
@ -281,23 +285,31 @@ class BambuClient:
_camera = None _camera = None
_usage_hours: float _usage_hours: float
def __init__(self, device_type: str, serial: str, host: str, local_mqtt: bool, region: str, email: str, def __init__(self, config):
username: str, auth_token: str, access_code: str, usage_hours: float = 0, manual_refresh_mode: bool = False): self.host = config['host']
self.callback = None self._callback = None
self.host = host
self._local_mqtt = local_mqtt self._access_code = config.get('access_code', '')
self._serial = serial self._auth_token = config.get('auth_token', '')
self._auth_token = auth_token self._device_type = config.get('device_type', 'unknown')
self._access_code = access_code self._local_mqtt = config.get('local_mqtt', False)
self._username = username self._manual_refresh_mode = config.get('manual_refresh_mode', False)
self._serial = config.get('serial', '')
self._usage_hours = config.get('usage_hours', 0)
self._username = config.get('username', '')
self._enable_camera = config.get('enable_camera', True)
self._connected = False self._connected = False
self._device_type = device_type
self._usage_hours = usage_hours
self._port = 1883 self._port = 1883
self._refreshed = False self._refreshed = False
self._manual_refresh_mode = manual_refresh_mode
self._device = Device(self) self._device = Device(self)
self.bambu_cloud = BambuCloud(region, email, username, auth_token) self.bambu_cloud = BambuCloud(
config.get('region', ''),
config.get('email', ''),
config.get('username', ''),
config.get('auth_token', '')
)
self.slicer_settings = SlicerSettings(self) self.slicer_settings = SlicerSettings(self)
@property @property
@ -317,20 +329,40 @@ class BambuClient:
self.disconnect() self.disconnect()
else: else:
# Reconnect normally # Reconnect normally
self.connect(self.callback) self.connect(self._callback)
@property
def camera_enabled(self):
return self._enable_camera
def callback(self, event: str):
if self._callback is not None:
self._callback(event)
def set_camera_enabled(self, enable):
self._enable_camera = enable
if self._enable_camera:
self._start_camera()
else:
self._stop_camera()
def setup_tls(self):
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
self.client.tls_insecure_set(True)
def connect(self, callback): def connect(self, callback):
"""Connect to the MQTT Broker""" """Connect to the MQTT Broker"""
self.client = mqtt.Client() self.client = mqtt.Client()
self.callback = callback self._callback = callback
self.client.on_connect = self.on_connect self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message self.client.on_message = self.on_message
# Set aggressive reconnect polling. # Set aggressive reconnect polling.
self.client.reconnect_delay_set(min_delay=1, max_delay=1) self.client.reconnect_delay_set(min_delay=1, max_delay=1)
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE) # Run the blocking tls_set method in a separate thread
self.client.tls_insecure_set(True) self.setup_tls()
self._port = 8883 self._port = 8883
if self._local_mqtt: if self._local_mqtt:
self.client.username_pw_set("bblp", password=self._access_code) self.client.username_pw_set("bblp", password=self._access_code)
@ -361,6 +393,22 @@ class BambuClient:
LOGGER.info("On Connect: Connected to printer") LOGGER.info("On Connect: Connected to printer")
self._on_connect() self._on_connect()
def _start_camera(self):
if not self._device.supports_feature(Features.CAMERA_RTSP):
if self._device.supports_feature(Features.CAMERA_IMAGE):
if self._enable_camera:
LOGGER.debug("Starting Chamber Image thread")
self._camera = ChamberImageThread(self)
self._camera.start()
elif (self.host == "") or (self._access_code == ""):
LOGGER.debug("Skipping camera setup as local access details not provided.")
def _stop_camera(self):
if self._camera is not None:
LOGGER.debug("Stopping camera thread")
self._camera.stop()
self._camera.join()
def _on_connect(self): def _on_connect(self):
self._connected = True self._connected = True
self.subscribe_and_request_info() self.subscribe_and_request_info()
@ -369,10 +417,7 @@ class BambuClient:
self._watchdog = WatchdogThread(self) self._watchdog = WatchdogThread(self)
self._watchdog.start() self._watchdog.start()
if self._device.supports_feature(Features.CAMERA_IMAGE): self._start_camera()
LOGGER.debug("Starting Chamber Image thread")
self._camera = ChamberImageThread(self)
self._camera.start()
def try_on_connect(self, def try_on_connect(self,
client_: mqtt.Client, client_: mqtt.Client,
@ -405,10 +450,7 @@ class BambuClient:
LOGGER.debug("Stopping watchdog thread") LOGGER.debug("Stopping watchdog thread")
self._watchdog.stop() self._watchdog.stop()
self._watchdog.join() self._watchdog.join()
if self._camera is not None: self._stop_camera()
LOGGER.debug("Stopping camera thread")
self._camera.stop()
self._camera.join()
def _on_watchdog_fired(self): def _on_watchdog_fired(self):
LOGGER.info("Watch dog fired") LOGGER.info("Watch dog fired")
@ -451,9 +493,7 @@ class BambuClient:
LOGGER.debug("Got Version Data") LOGGER.debug("Got Version Data")
self._device.info_update(data=json_data.get("info")) self._device.info_update(data=json_data.get("info"))
except Exception as e: except Exception as e:
LOGGER.error("An exception occurred processing a message:") LOGGER.error("An exception occurred processing a message:", exc_info=e)
LOGGER.error(f"Exception type: {type(e)}")
LOGGER.error(f"Exception data: {e}")
def subscribe(self): def subscribe(self):
"""Subscribe to report topic""" """Subscribe to report topic"""
@ -475,7 +515,7 @@ class BambuClient:
"""Force refresh data""" """Force refresh data"""
if self._manual_refresh_mode: if self._manual_refresh_mode:
self.connect(self.callback) self.connect(self._callback)
else: else:
LOGGER.debug("Force Refresh: Getting Version Info") LOGGER.debug("Force Refresh: Getting Version Info")
self._refreshed = True self._refreshed = True
@ -516,8 +556,10 @@ class BambuClient:
self.client.on_disconnect = self.on_disconnect self.client.on_disconnect = self.on_disconnect
self.client.on_message = on_message self.client.on_message = on_message
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE) # Run the blocking tls_set method in a separate thread
self.client.tls_insecure_set(True) loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.setup_tls)
if self._local_mqtt: if self._local_mqtt:
self.client.username_pw_set("bblp", password=self._access_code) self.client.username_pw_set("bblp", password=self._access_code)
else: else:

View File

@ -1,46 +1,307 @@
from __future__ import annotations from __future__ import annotations
from enum import (
Enum,
)
import base64 import base64
import cloudscraper
import json import json
import httpx import requests
class ConnectionMechanismEnum(Enum):
CLOUDSCRAPER = 1,
CURL_CFFI = 2,
REQUESTS = 3
CONNECTION_MECHANISM = ConnectionMechanismEnum.CLOUDSCRAPER
curl_available = False
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
try:
from curl_cffi import requests as curl_requests
curl_available = True
except ImportError:
curl_available = False
from dataclasses import dataclass from dataclasses import dataclass
from .const import LOGGER from .const import (
LOGGER,
BambuUrl
)
from .utils import get_Url
IMPERSONATE_BROWSER='chrome'
class CloudflareError(Exception):
def __init__(self):
super().__init__("Blocked by Cloudflare")
self.error_code = 403
class EmailCodeRequiredError(Exception):
def __init__(self):
super().__init__("Email code required")
self.error_code = 400
class EmailCodeExpiredError(Exception):
def __init__(self):
super().__init__("Email code expired")
self.error_code = 400
class EmailCodeIncorrectError(Exception):
def __init__(self):
super().__init__("Email code incorrect")
self.error_code = 400
class TfaCodeRequiredError(Exception):
def __init__(self):
super().__init__("Two factor authentication code required")
self.error_code = 400
class CurlUnavailableError(Exception):
def __init__(self):
super().__init__("curl library unavailable")
self.error_code = 400
@dataclass @dataclass
class BambuCloud: class BambuCloud:
def __init__(self, region: str, email: str, username: str, auth_token: str): def __init__(self, region: str, email: str, username: str, auth_token: str):
self._region = region self._region = region
self._email = email self._email = email
self._username = username self._username = username
self._auth_token = auth_token self._auth_token = auth_token
self._tfaKey = None
def _get_authentication_token(self) -> dict: def _get_headers(self):
LOGGER.debug("Getting accessToken from Bambu Cloud") return {
if self._region == "China": 'User-Agent': 'bambu_network_agent/01.09.05.01',
url = 'https://api.bambulab.cn/v1/user-service/user/login' 'X-BBL-Client-Name': 'OrcaSlicer',
'X-BBL-Client-Type': 'slicer',
'X-BBL-Client-Version': '01.09.05.51',
'X-BBL-Language': 'en-US',
'X-BBL-OS-Type': 'linux',
'X-BBL-OS-Version': '6.2.0',
'X-BBL-Agent-Version': '01.09.05.01',
'X-BBL-Executable-info': '{}',
'X-BBL-Agent-OS-Type': 'linux',
'accept': 'application/json',
'Content-Type': 'application/json'
}
# Orca/Bambu Studio also add this - need to work out what an appropriate ID is to put here:
# 'X-BBL-Device-ID': BBL_AUTH_UUID,
# Example: X-BBL-Device-ID: 370f9f43-c6fe-47d7-aec9-5fe5ef7e7673
def _get_headers_with_auth_token(self) -> dict:
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
headers = {}
else: else:
url = 'https://api.bambulab.com/v1/user-service/user/login' headers = self._get_headers()
headers = {'User-Agent' : "OctoPrint Plugin"} headers['Authorization'] = f"Bearer {self._auth_token}"
data = {'account': self._email, 'password': self._password} return headers
with httpx.Client(http2=True) as client:
response = client.post(url, headers=headers, json=data, timeout=10) def _test_response(self, response, return400=False):
if response.status_code >= 400: # Check specifically for cloudflare block
LOGGER.debug(f"Received error: {response.status_code}") if response.status_code == 403 and 'cloudflare' in response.text:
raise ValueError(response.status_code) LOGGER.debug("BLOCKED BY CLOUDFLARE")
raise CloudflareError()
if response.status_code == 400 and not return400:
LOGGER.error(f"Connection failed with error code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise PermissionError(response.status_code, response.text)
if response.status_code > 400:
LOGGER.error(f"Connection failed with error code: {response.status_code}")
LOGGER.debug(f"Response: '{response.text}'")
raise PermissionError(response.status_code, response.text)
LOGGER.debug(f"Response: {response.status_code}")
def _get(self, urlenum: BambuUrl):
url = get_Url(urlenum, self._region)
headers=self._get_headers_with_auth_token()
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
if not curl_available:
LOGGER.debug(f"Curl library is unavailable.")
raise CurlUnavailableError()
response = curl_requests.get(url, headers=headers, timeout=10, impersonate=IMPERSONATE_BROWSER)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.CLOUDSCRAPER:
if len(headers) == 0:
headers = self._get_headers()
scraper = cloudscraper.create_scraper()
response = scraper.get(url, headers=headers, timeout=10)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.REQUESTS:
if len(headers) == 0:
headers = self._get_headers()
response = requests.get(url, headers=headers, timeout=10)
else:
raise NotImplementedError()
self._test_response(response)
return response
def _post(self, urlenum: BambuUrl, json: str, headers={}, return400=False):
url = get_Url(urlenum, self._region)
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
if not curl_available:
LOGGER.debug(f"Curl library is unavailable.")
raise CurlUnavailableError()
response = curl_requests.post(url, headers=headers, json=json, impersonate=IMPERSONATE_BROWSER)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.CLOUDSCRAPER:
if len(headers) == 0:
headers = self._get_headers()
scraper = cloudscraper.create_scraper()
response = scraper.post(url, headers=headers, json=json)
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.REQUESTS:
if len(headers) == 0:
headers = self._get_headers()
response = requests.post(url, headers=headers, json=json)
else:
raise NotImplementedError()
self._test_response(response, return400)
return response
def _get_authentication_token(self) -> str:
LOGGER.debug("Getting accessToken from Bambu Cloud")
# First we need to find out how Bambu wants us to login.
data = {
"account": self._email,
"password": self._password,
"apiError": ""
}
response = self._post(BambuUrl.LOGIN, json=data)
auth_json = response.json()
accessToken = auth_json.get('accessToken', '')
if accessToken != '':
# We were provided the accessToken directly.
return accessToken
loginType = auth_json.get("loginType", None)
if loginType is None:
LOGGER.error(f"loginType not present")
LOGGER.error(f"Response not understood: '{response.text}'")
return ValueError(0) # FIXME
elif loginType == 'verifyCode':
LOGGER.debug(f"Received verifyCode response")
raise EmailCodeRequiredError()
elif loginType == 'tfa':
# Store the tfaKey for later use
LOGGER.debug(f"Received tfa response")
self._tfaKey = auth_json.get("tfaKey")
raise TfaCodeRequiredError()
else:
LOGGER.debug(f"Did not understand json. loginType = '{loginType}'")
LOGGER.error(f"Response not understood: '{response.text}'")
return ValueError(1) # FIXME
def _get_email_verification_code(self):
# Send the verification code request
data = {
"email": self._email,
"type": "codeLogin"
}
LOGGER.debug("Requesting verification code")
self._post(BambuUrl.EMAIL_CODE, json=data)
LOGGER.debug("Verification code requested successfully.")
def _get_authentication_token_with_verification_code(self, code) -> dict:
LOGGER.debug("Attempting to connect with provided verification code.")
data = {
"account": self._email,
"code": code
}
response = self._post(BambuUrl.LOGIN, json=data, return400=True)
status_code = response.status_code
if status_code == 200:
LOGGER.debug("Authentication successful.")
LOGGER.debug(f"Response = '{response.json()}'")
elif status_code == 400:
LOGGER.debug(f"Received response: {response.json()}")
if response.json()['code'] == 1:
# Code has expired. Request a new one.
self._get_email_verification_code()
raise EmailCodeExpiredError()
elif response.json()['code'] == 2:
# Code was incorrect. Let the user try again.
raise EmailCodeIncorrectError()
else:
LOGGER.error(f"Response not understood: '{response.json()}'")
raise ValueError(response.json()['code'])
return response.json()['accessToken'] return response.json()['accessToken']
def _get_authentication_token_with_2fa_code(self, code: str) -> dict:
LOGGER.debug("Attempting to connect with provided 2FA code.")
data = {
"tfaKey": self._tfaKey,
"tfaCode": code
}
response = self._post(BambuUrl.TFA_LOGIN, json=data)
LOGGER.debug(f"Response: {response.status_code}")
if response.status_code == 200:
LOGGER.debug("Authentication successful.")
cookies = response.cookies.get_dict()
token_from_tfa = cookies.get("token")
#LOGGER.debug(f"token_from_tfa: {token_from_tfa}")
return token_from_tfa
def _get_username_from_authentication_token(self) -> str: def _get_username_from_authentication_token(self) -> str:
LOGGER.debug("Trying to get username from authentication token.")
# User name is in 2nd portion of the auth token (delimited with periods) # User name is in 2nd portion of the auth token (delimited with periods)
b64_string = self._auth_token.split(".")[1] username = None
# String must be multiples of 4 chars in length. For decode pad with = character tokens = self._auth_token.split(".")
b64_string += "=" * ((4 - len(b64_string) % 4) % 4) if len(tokens) != 3:
jsonAuthToken = json.loads(base64.b64decode(b64_string)) LOGGER.debug("Received authToken is not a JWT.")
# Gives json payload with "username":"u_<digits>" within it LOGGER.debug("Trying to use project API to retrieve username instead")
return jsonAuthToken['username'] response = self.get_projects();
if response is not None:
projectsnode = response.get('projects', None)
if projectsnode is None:
LOGGER.debug("Failed to find projects node")
else:
if len(projectsnode) == 0:
LOGGER.debug("No projects node in response")
else:
project=projectsnode[0]
if project.get('user_id', None) is None:
LOGGER.debug("No user_id entry")
else:
username = f"u_{project['user_id']}"
LOGGER.debug(f"Found user_id of {username}")
else:
LOGGER.debug("Authentication token looks to be a JWT")
try:
b64_string = self._auth_token.split(".")[1]
# String must be multiples of 4 chars in length. For decode pad with = character
b64_string += "=" * ((4 - len(b64_string) % 4) % 4)
jsonAuthToken = json.loads(base64.b64decode(b64_string))
# Gives json payload with "username":"u_<digits>" within it
username = jsonAuthToken.get('username', None)
except:
LOGGER.debug("Unable to decode authToken to json to retrieve username.")
if username is None:
LOGGER.debug(f"Unable to decode authToken to retrieve username. AuthToken = {self._auth_token}")
return username
# Retrieves json description of devices in the form: # Retrieves json description of devices in the form:
# { # {
# 'message': 'success', # 'message': 'success',
@ -56,7 +317,7 @@ class BambuCloud:
# 'dev_product_name': 'P1S', # 'dev_product_name': 'P1S',
# 'dev_access_code': 'REDACTED', # 'dev_access_code': 'REDACTED',
# 'nozzle_diameter': 0.4 # 'nozzle_diameter': 0.4
# }, # },
# { # {
# 'dev_id': 'REDACTED', # 'dev_id': 'REDACTED',
# 'name': 'Bambu P1P', # 'name': 'Bambu P1P',
@ -66,7 +327,7 @@ class BambuCloud:
# 'dev_product_name': 'P1P', # 'dev_product_name': 'P1P',
# 'dev_access_code': 'REDACTED', # 'dev_access_code': 'REDACTED',
# 'nozzle_diameter': 0.4 # 'nozzle_diameter': 0.4
# }, # },
# { # {
# 'dev_id': 'REDACTED', # 'dev_id': 'REDACTED',
# 'name': 'Bambu X1C', # 'name': 'Bambu X1C',
@ -76,10 +337,10 @@ class BambuCloud:
# 'dev_product_name': 'X1 Carbon', # 'dev_product_name': 'X1 Carbon',
# 'dev_access_code': 'REDACTED', # 'dev_access_code': 'REDACTED',
# 'nozzle_diameter': 0.4 # 'nozzle_diameter': 0.4
# } # }
# ] # ]
# } # }
def test_authentication(self, region: str, email: str, username: str, auth_token: str) -> bool: def test_authentication(self, region: str, email: str, username: str, auth_token: str) -> bool:
self._region = region self._region = region
self._email = email self._email = email
@ -91,26 +352,31 @@ class BambuCloud:
return False return False
return True return True
def login(self, region: str, email: str, password: str): def login(self, region: str, email: str, password: str) -> str:
self._region = region self._region = region
self._email = email self._email = email
self._password = password self._password = password
self._auth_token = self._get_authentication_token() result = self._get_authentication_token()
self._auth_token = result
self._username = self._get_username_from_authentication_token()
def login_with_verification_code(self, code: str):
result = self._get_authentication_token_with_verification_code(code)
self._auth_token = result
self._username = self._get_username_from_authentication_token()
def login_with_2fa_code(self, code: str):
result = self._get_authentication_token_with_2fa_code(code)
self._auth_token = result
self._username = self._get_username_from_authentication_token() self._username = self._get_username_from_authentication_token()
def get_device_list(self) -> dict: def get_device_list(self) -> dict:
LOGGER.debug("Getting device list from Bambu Cloud") LOGGER.debug("Getting device list from Bambu Cloud")
if self._region == "China": try:
url = 'https://api.bambulab.cn/v1/iot-service/api/user/bind' response = self._get(BambuUrl.BIND)
else: except:
url = 'https://api.bambulab.com/v1/iot-service/api/user/bind' return None
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "OctoPrint Plugin"}
with httpx.Client(http2=True) as client:
response = client.get(url, headers=headers, timeout=10)
if response.status_code >= 400:
LOGGER.debug(f"Received error: {response.status_code}")
raise ValueError(response.status_code)
return response.json()['devices'] return response.json()['devices']
# The slicer settings are of the following form: # The slicer settings are of the following form:
@ -182,18 +448,13 @@ class BambuCloud:
def get_slicer_settings(self) -> dict: def get_slicer_settings(self) -> dict:
LOGGER.debug("Getting slicer settings from Bambu Cloud") LOGGER.debug("Getting slicer settings from Bambu Cloud")
if self._region == "China": try:
url = 'https://api.bambulab.cn/v1/iot-service/api/slicer/setting?version=undefined' response = self._get(BambuUrl.SLICER_SETTINGS)
else: except:
url = 'https://api.bambulab.com/v1/iot-service/api/slicer/setting?version=undefined'
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "OctoPrint Plugin"}
with httpx.Client(http2=True) as client:
response = client.get(url, headers=headers, timeout=10)
if response.status_code >= 400:
LOGGER.error(f"Slicer settings load failed: {response.status_code}")
return None return None
LOGGER.debug("Succeeded")
return response.json() return response.json()
# The task list is of the following form with a 'hits' array with typical 20 entries. # The task list is of the following form with a 'hits' array with typical 20 entries.
# #
# "total": 531, # "total": 531,
@ -237,28 +498,53 @@ class BambuCloud:
# }, # },
def get_tasklist(self) -> dict: def get_tasklist(self) -> dict:
if self._region == "China": LOGGER.debug("Getting full task list from Bambu Cloud")
url = 'https://api.bambulab.cn/v1/user-service/my/tasks' try:
else: response = self._get(BambuUrl.TASKS)
url = 'https://api.bambulab.com/v1/user-service/my/tasks' except:
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "OctoPrint Plugin"} return None
with httpx.Client(http2=True) as client: return response.json()
response = client.get(url, headers=headers, timeout=10)
if response.status_code >= 400: # Returns a list of projects for the account.
LOGGER.debug(f"Received error: {response.status_code}") #
raise ValueError(response.status_code) # {
# "message": "success",
# "code": null,
# "error": null,
# "projects": [
# {
# "project_id": "164995388",
# "user_id": "1688388450",
# "model_id": "US48e2103d939bf8",
# "status": "ACTIVE",
# "name": "Alcohol_Marker_Storage_for_Copic,_Ohuhu_and_the_like",
# "content": "{'printed_plates': [{'plate': 1}]}",
# "create_time": "2024-11-17 06:12:33",
# "update_time": "2024-11-17 06:12:40"
# },
# ...
#
def get_projects(self) -> dict:
LOGGER.debug("Getting projects list from Bambu Cloud")
try:
response = self._get(BambuUrl.PROJECTS)
except:
return None
return response.json() return response.json()
def get_latest_task_for_printer(self, deviceId: str) -> dict: def get_latest_task_for_printer(self, deviceId: str) -> dict:
LOGGER.debug(f"Getting latest task from Bambu Cloud for Printer: {deviceId}") LOGGER.debug(f"Getting latest task for printer from Bambu Cloud")
data = self.get_tasklist_for_printer(deviceId) try:
if len(data) != 0: data = self.get_tasklist_for_printer(deviceId)
return data[0] if len(data) != 0:
LOGGER.debug("No tasks found for printer") return data[0]
return None LOGGER.debug("No tasks found for printer")
return None
except:
return None
def get_tasklist_for_printer(self, deviceId: str) -> dict: def get_tasklist_for_printer(self, deviceId: str) -> dict:
LOGGER.debug(f"Getting task list from Bambu Cloud for Printer: {deviceId}") LOGGER.debug(f"Getting full task list for printer from Bambu Cloud")
tasks = [] tasks = []
data = self.get_tasklist() data = self.get_tasklist()
for task in data['hits']: for task in data['hits']:
@ -273,21 +559,25 @@ class BambuCloud:
def download(self, url: str) -> bytearray: def download(self, url: str) -> bytearray:
LOGGER.debug(f"Downloading cover image: {url}") LOGGER.debug(f"Downloading cover image: {url}")
with httpx.Client(http2=True) as client: try:
response = client.get(url, timeout=10) # This is just a standard download from an unauthenticated end point.
if response.status_code >= 400: response = requests.get(url)
LOGGER.debug(f"Received error: {response.status_code}") except:
raise ValueError(response.status_code) return None
return response.content return response.content
@property @property
def username(self): def username(self):
return self._username return self._username
@property @property
def auth_token(self): def auth_token(self):
return self._auth_token return self._auth_token
@property
def bambu_connected(self) -> bool:
return self._auth_token != "" and self._auth_token != None
@property @property
def cloud_mqtt_host(self): def cloud_mqtt_host(self):
return "cn.mqtt.bambulab.com" if self._region == "China" else "us.mqtt.bambulab.com" return "cn.mqtt.bambulab.com" if self._region == "China" else "us.mqtt.bambulab.com"

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@
"GFB01": "Bambu ASA", "GFB01": "Bambu ASA",
"GFB02": "Bambu ASA-Aero", "GFB02": "Bambu ASA-Aero",
"GFB50": "Bambu ABS-GF", "GFB50": "Bambu ABS-GF",
"GFB51": "Bambu ASA-CF",
"GFB60": "PolyLite ABS", "GFB60": "PolyLite ABS",
"GFB61": "PolyLite ASA", "GFB61": "PolyLite ASA",
"GFB98": "Generic ASA", "GFB98": "Generic ASA",
@ -26,6 +27,7 @@
"GFG02": "Bambu PETG HF", "GFG02": "Bambu PETG HF",
"GFG50": "Bambu PETG-CF", "GFG50": "Bambu PETG-CF",
"GFG60": "PolyLite PETG", "GFG60": "PolyLite PETG",
"GFG96": "Generic PETG HF",
"GFG97": "Generic PCTG", "GFG97": "Generic PCTG",
"GFG98": "Generic PETG-CF", "GFG98": "Generic PETG-CF",
"GFG99": "Generic PETG", "GFG99": "Generic PETG",
@ -34,6 +36,13 @@
"GFL03": "eSUN PLA+", "GFL03": "eSUN PLA+",
"GFL04": "Overture PLA", "GFL04": "Overture PLA",
"GFL05": "Overture Matte PLA", "GFL05": "Overture Matte PLA",
"GFL06": "Fiberon PETG-ESD",
"GFL50": "Fiberon PA6-CF",
"GFL51": "Fiberon PA6-GF",
"GFL52": "Fiberon PA12-CF",
"GFL53": "Fiberon PA612-CF",
"GFL54": "Fiberon PET-CF",
"GFL55": "Fiberon PETG-rCF",
"GFL95": "Generic PLA High Speed", "GFL95": "Generic PLA High Speed",
"GFL96": "Generic PLA Silk", "GFL96": "Generic PLA Silk",
"GFL98": "Generic PLA-CF", "GFL98": "Generic PLA-CF",
@ -41,6 +50,7 @@
"GFN03": "Bambu PA-CF", "GFN03": "Bambu PA-CF",
"GFN04": "Bambu PAHT-CF", "GFN04": "Bambu PAHT-CF",
"GFN05": "Bambu PA6-CF", "GFN05": "Bambu PA6-CF",
"GFN06": "Bambu PPA-CF",
"GFN08": "Bambu PA6-GF", "GFN08": "Bambu PA6-GF",
"GFN96": "Generic PPA-GF", "GFN96": "Generic PPA-GF",
"GFN97": "Generic PPA-CF", "GFN97": "Generic PPA-CF",
@ -64,9 +74,12 @@
"GFS98": "Generic HIPS", "GFS98": "Generic HIPS",
"GFS99": "Generic PVA", "GFS99": "Generic PVA",
"GFT01": "Bambu PET-CF", "GFT01": "Bambu PET-CF",
"GFT02": "Bambu PPS-CF",
"GFT97": "Generic PPS", "GFT97": "Generic PPS",
"GFT98": "Generic PPS-CF", "GFT98": "Generic PPS-CF",
"GFU00": "Bambu TPU 95A HF", "GFU00": "Bambu TPU 95A HF",
"GFU01": "Bambu TPU 95A", "GFU01": "Bambu TPU 95A",
"GFU02": "Bambu TPU for AMS",
"GFU98": "Generic TPU for AMS",
"GFU99": "Generic TPU" "GFU99": "Generic TPU"
} }

View File

@ -22,6 +22,7 @@ from .utils import (
get_generic_AMS_HMS_error_code, get_generic_AMS_HMS_error_code,
get_HMS_severity, get_HMS_severity,
get_HMS_module, get_HMS_module,
set_temperature_to_gcode,
) )
from .const import ( from .const import (
LOGGER, LOGGER,
@ -32,6 +33,7 @@ from .const import (
SPEED_PROFILE, SPEED_PROFILE,
GCODE_STATE_OPTIONS, GCODE_STATE_OPTIONS,
PRINT_TYPE_OPTIONS, PRINT_TYPE_OPTIONS,
TempEnum,
) )
from .commands import ( from .commands import (
CHAMBER_LIGHT_ON, CHAMBER_LIGHT_ON,
@ -42,7 +44,7 @@ from .commands import (
class Device: class Device:
def __init__(self, client): def __init__(self, client):
self._client = client self._client = client
self.temperature = Temperature() self.temperature = Temperature(client = client)
self.lights = Lights(client = client) self.lights = Lights(client = client)
self.info = Info(client = client) self.info = Info(client = client)
self.print_job = PrintJob(client = client) self.print_job = PrintJob(client = client)
@ -53,7 +55,7 @@ class Device:
self.external_spool = ExternalSpool(client = client) self.external_spool = ExternalSpool(client = client)
self.hms = HMSList(client = client) self.hms = HMSList(client = client)
self.print_error = PrintErrorList(client = client) self.print_error = PrintErrorList(client = client)
self.camera = Camera() self.camera = Camera(client = client)
self.home_flag = HomeFlag(client=client) self.home_flag = HomeFlag(client=client)
self.push_all_data = None self.push_all_data = None
self.get_version_data = None self.get_version_data = None
@ -77,9 +79,7 @@ class Device:
send_event = send_event | self.camera.print_update(data = data) send_event = send_event | self.camera.print_update(data = data)
send_event = send_event | self.home_flag.print_update(data = data) send_event = send_event | self.home_flag.print_update(data = data)
if send_event and self._client.callback is not None: self._client.callback("event_printer_data_update")
LOGGER.debug("event_printer_data_update")
self._client.callback("event_printer_data_update")
if data.get("msg", 0) == 0: if data.get("msg", 0) == 0:
self.push_all_data = data self.push_all_data = data
@ -91,9 +91,19 @@ class Device:
if data.get("command") == "get_version": if data.get("command") == "get_version":
self.get_version_data = data self.get_version_data = data
def _supports_temperature_set(self):
# When talking to the Bambu cloud mqtt, setting the temperatures is allowed.
if self.info.mqtt_mode == "bambu_cloud":
return True
# X1* have not yet blocked setting the temperatures when in nybrid connection mode.
if self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E":
return True
# What's left is P1 and A1 printers that we are connecting by local mqtt. These are supported only in pure Lan Mode.
return not self._client.bambu_cloud.bambu_connected
def supports_feature(self, feature): def supports_feature(self, feature):
if feature == Features.AUX_FAN: if feature == Features.AUX_FAN:
return True return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
elif feature == Features.CHAMBER_LIGHT: elif feature == Features.CHAMBER_LIGHT:
return True return True
elif feature == Features.CHAMBER_FAN: elif feature == Features.CHAMBER_FAN:
@ -124,6 +134,11 @@ class Device:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E" return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.MANUAL_MODE: elif feature == Features.MANUAL_MODE:
return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI" return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI"
elif feature == Features.AMS_FILAMENT_REMAINING:
# Technically this is not the AMS Lite but that's currently tied to only these printer types.
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
elif feature == Features.SET_TEMPERATURE:
return self._supports_temperature_set()
return False return False
@ -183,15 +198,13 @@ class Lights:
def TurnChamberLightOn(self): def TurnChamberLightOn(self):
self.chamber_light = "on" self.chamber_light = "on"
self.chamber_light_override = "on" self.chamber_light_override = "on"
if self._client.callback is not None: self._client.callback("event_light_update")
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_ON) self._client.publish(CHAMBER_LIGHT_ON)
def TurnChamberLightOff(self): def TurnChamberLightOff(self):
self.chamber_light = "off" self.chamber_light = "off"
self.chamber_light_override = "off" self.chamber_light_override = "off"
if self._client.callback is not None: self._client.callback("event_light_update")
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_OFF) self._client.publish(CHAMBER_LIGHT_OFF)
@ -203,7 +216,8 @@ class Camera:
rtsp_url: str rtsp_url: str
timelapse: str timelapse: str
def __init__(self): def __init__(self, client):
self._client = client
self.recording = '' self.recording = ''
self.resolution = '' self.resolution = ''
self.rtsp_url = None self.rtsp_url = None
@ -225,7 +239,10 @@ class Camera:
self.timelapse = data.get("ipcam", {}).get("timelapse", self.timelapse) self.timelapse = data.get("ipcam", {}).get("timelapse", self.timelapse)
self.recording = data.get("ipcam", {}).get("ipcam_record", self.recording) self.recording = data.get("ipcam", {}).get("ipcam_record", self.recording)
self.resolution = data.get("ipcam", {}).get("resolution", self.resolution) self.resolution = data.get("ipcam", {}).get("resolution", self.resolution)
self.rtsp_url = data.get("ipcam", {}).get("rtsp_url", self.rtsp_url) if self._client._enable_camera:
self.rtsp_url = data.get("ipcam", {}).get("rtsp_url", self.rtsp_url)
else:
self.rtsp_url = None
return (old_data != f"{self.__dict__}") return (old_data != f"{self.__dict__}")
@ -238,7 +255,8 @@ class Temperature:
nozzle_temp: int nozzle_temp: int
target_nozzle_temp: int target_nozzle_temp: int
def __init__(self): def __init__(self, client):
self._client = client
self.bed_temp = 0 self.bed_temp = 0
self.target_bed_temp = 0 self.target_bed_temp = 0
self.chamber_temp = 0 self.chamber_temp = 0
@ -256,6 +274,20 @@ class Temperature:
return (old_data != f"{self.__dict__}") return (old_data != f"{self.__dict__}")
def set_target_temp(self, temp: TempEnum, temperature: int):
command = set_temperature_to_gcode(temp, temperature)
# if type == TempEnum.HEATBED:
# self.bed_temp = temperature
# elif type == TempEnum.NOZZLE:
# self.nozzle_temp = temperature
LOGGER.debug(command)
self._client.publish(command)
self._client.callback("event_printer_data_update")
@dataclass @dataclass
class Fans: class Fans:
"""Return all fan related info""" """Return all fan related info"""
@ -335,8 +367,7 @@ class Fans:
LOGGER.debug(command) LOGGER.debug(command)
self._client.publish(command) self._client.publish(command)
if self._client.callback is not None: self._client.callback("event_printer_data_update")
self._client.callback("event_printer_data_update")
def get_fan_speed(self, fan: FansEnum) -> int: def get_fan_speed(self, fan: FansEnum) -> int:
if fan == FansEnum.PART_COOLING: if fan == FansEnum.PART_COOLING:
@ -384,7 +415,7 @@ class PrintJob:
values = {} values = {}
for i in range(16): for i in range(16):
if self._ams_print_weights[i] != 0: if self._ams_print_weights[i] != 0:
values[f"AMS Slot {i}"] = self._ams_print_weights[i] values[f"AMS Slot {i+1}"] = self._ams_print_weights[i]
return values return values
@property @property
@ -392,7 +423,7 @@ class PrintJob:
values = {} values = {}
for i in range(16): for i in range(16):
if self._ams_print_lengths[i] != 0: if self._ams_print_lengths[i] != 0:
values[f"AMS Slot {i}"] = self._ams_print_lengths[i] values[f"AMS Slot {i+1}"] = self._ams_print_lengths[i]
return values return values
def __init__(self, client): def __init__(self, client):
@ -450,7 +481,8 @@ class PrintJob:
self.gcode_file = data.get("gcode_file", self.gcode_file) self.gcode_file = data.get("gcode_file", self.gcode_file)
self.print_type = data.get("print_type", self.print_type) self.print_type = data.get("print_type", self.print_type)
if self.print_type.lower() not in PRINT_TYPE_OPTIONS: if self.print_type.lower() not in PRINT_TYPE_OPTIONS:
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'") if self.print_type != "":
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'")
self.print_type = "unknown" self.print_type = "unknown"
self.subtask_name = data.get("subtask_name", self.subtask_name) self.subtask_name = data.get("subtask_name", self.subtask_name)
self.file_type_icon = "mdi:file" if self.print_type != "cloud" else "mdi:cloud-outline" self.file_type_icon = "mdi:file" if self.print_type != "cloud" else "mdi:cloud-outline"
@ -471,9 +503,7 @@ class PrintJob:
if data.get("mc_remaining_time") is not None: if data.get("mc_remaining_time") is not None:
existing_remaining_time = self.remaining_time existing_remaining_time = self.remaining_time
self.remaining_time = data.get("mc_remaining_time") self.remaining_time = data.get("mc_remaining_time")
if self.start_time is None: if existing_remaining_time != self.remaining_time:
self.end_time = None
elif existing_remaining_time != self.remaining_time:
self.end_time = get_end_time(self.remaining_time) self.end_time = get_end_time(self.remaining_time)
LOGGER.debug(f"END TIME2: {self.end_time}") LOGGER.debug(f"END TIME2: {self.end_time}")
@ -482,8 +512,7 @@ class PrintJob:
currently_idle = self.gcode_state == "IDLE" or self.gcode_state == "FAILED" or self.gcode_state == "FINISH" currently_idle = self.gcode_state == "IDLE" or self.gcode_state == "FAILED" or self.gcode_state == "FINISH"
if previously_idle and not currently_idle: if previously_idle and not currently_idle:
if self._client.callback is not None: self._client.callback("event_print_started")
self._client.callback("event_print_started")
# Generate the start_time for P1P/S when printer moves from idle to another state. Original attempt with remaining time # Generate the start_time for P1P/S when printer moves from idle to another state. Original attempt with remaining time
# becoming non-zero didn't work as it never bounced to zero in at least the scenario where a print was canceled. # becoming non-zero didn't work as it never bounced to zero in at least the scenario where a print was canceled.
@ -507,20 +536,17 @@ class PrintJob:
isCanceledPrint = False isCanceledPrint = False
if data.get("print_error") == 50348044 and self.print_error == 0: if data.get("print_error") == 50348044 and self.print_error == 0:
isCanceledPrint = True isCanceledPrint = True
if self._client.callback is not None: self._client.callback("event_print_canceled")
self._client.callback("event_print_canceled")
self.print_error = data.get("print_error", self.print_error) self.print_error = data.get("print_error", self.print_error)
# Handle print failed # Handle print failed
if previous_gcode_state != "unknown" and previous_gcode_state != "FAILED" and self.gcode_state == "FAILED": if previous_gcode_state != "unknown" and previous_gcode_state != "FAILED" and self.gcode_state == "FAILED":
if not isCanceledPrint: if not isCanceledPrint:
if self._client.callback is not None: self._client.callback("event_print_failed")
self._client.callback("event_print_failed")
# Handle print finish # Handle print finish
if previous_gcode_state != "unknown" and previous_gcode_state != "FINISH" and self.gcode_state == "FINISH": if previous_gcode_state != "unknown" and previous_gcode_state != "FINISH" and self.gcode_state == "FINISH":
if self._client.callback is not None: self._client.callback("event_print_finished")
self._client.callback("event_print_finished")
if currently_idle and not previously_idle and previous_gcode_state != "unknown": if currently_idle and not previously_idle and previous_gcode_state != "unknown":
if self.start_time != None: if self.start_time != None:
@ -673,8 +699,7 @@ class Info:
def set_online(self, online): def set_online(self, online):
if self.online != online: if self.online != online:
self.online = online self.online = online
if self._client.callback is not None: self._client.callback("event_printer_data_update")
self._client.callback("event_printer_data_update")
def info_update(self, data): def info_update(self, data):
@ -703,8 +728,7 @@ class Info:
LOGGER.debug(f"Device is {self.device_type}") LOGGER.debug(f"Device is {self.device_type}")
self.hw_ver = get_hw_version(modules, self.hw_ver) self.hw_ver = get_hw_version(modules, self.hw_ver)
self.sw_ver = get_sw_version(modules, self.sw_ver) self.sw_ver = get_sw_version(modules, self.sw_ver)
if self._client.callback is not None: self._client.callback("event_printer_info_update")
self._client.callback("event_printer_info_update")
def print_update(self, data) -> bool: def print_update(self, data) -> bool:
old_data = f"{self.__dict__}" old_data = f"{self.__dict__}"
@ -796,6 +820,12 @@ class Info:
@dataclass @dataclass
class AMSInstance: class AMSInstance:
"""Return all AMS instance related info""" """Return all AMS instance related info"""
serial: str
sw_version: str
hw_version: str
humidity_index: int
temperature: int
tray: list["AMSTray"]
def __init__(self, client): def __init__(self, client):
self.serial = "" self.serial = ""
@ -813,11 +843,14 @@ class AMSInstance:
@dataclass @dataclass
class AMSList: class AMSList:
"""Return all AMS related info""" """Return all AMS related info"""
tray_now: int
data: list[AMSInstance]
def __init__(self, client): def __init__(self, client):
self._client = client self._client = client
self.tray_now = 0 self.tray_now = 0
self.data = [None] * 4 self.data = [None] * 4
self._first_initialization_done = False
def info_update(self, data): def info_update(self, data):
old_data = f"{self.__dict__}" old_data = f"{self.__dict__}"
@ -859,7 +892,7 @@ class AMSList:
if index != -1: if index != -1:
# Sometimes we get incomplete version data. We have to skip if that occurs since the serial number is # Sometimes we get incomplete version data. We have to skip if that occurs since the serial number is
# requires as part of the home assistant device identity. # required as part of the home assistant device identity.
if not module['sn'] == '': if not module['sn'] == '':
# May get data before info so create entries if necessary # May get data before info so create entries if necessary
if self.data[index] is None: if self.data[index] is None:
@ -874,12 +907,14 @@ class AMSList:
if self.data[index].hw_version != module['hw_ver']: if self.data[index].hw_version != module['hw_ver']:
data_changed = True data_changed = True
self.data[index].hw_version = module['hw_ver'] self.data[index].hw_version = module['hw_ver']
elif not self._first_initialization_done:
self._first_initialization_done = True
data_changed = True
data_changed = data_changed or (old_data != f"{self.__dict__}") data_changed = data_changed or (old_data != f"{self.__dict__}")
if data_changed: if data_changed:
if self._client.callback is not None: self._client.callback("event_ams_info_update")
self._client.callback("event_ams_info_update")
def print_update(self, data) -> bool: def print_update(self, data) -> bool:
old_data = f"{self.__dict__}" old_data = f"{self.__dict__}"
@ -969,6 +1004,19 @@ class AMSList:
@dataclass @dataclass
class AMSTray: class AMSTray:
"""Return all AMS tray related info""" """Return all AMS tray related info"""
empty: bool
idx: int
name: str
type: str
sub_brands: str
color: str
nozzle_temp_min: int
nozzle_temp_max: int
remain: int
k: float
tag_uid: str
tray_uuid: str
def __init__(self, client): def __init__(self, client):
self._client = client self._client = client
@ -982,7 +1030,8 @@ class AMSTray:
self.nozzle_temp_max = 0 self.nozzle_temp_max = 0
self.remain = 0 self.remain = 0
self.k = 0 self.k = 0
self.tag_uid = "0000000000000000" self.tag_uid = ""
self.tray_uuid = ""
def print_update(self, data) -> bool: def print_update(self, data) -> bool:
old_data = f"{self.__dict__}" old_data = f"{self.__dict__}"
@ -998,7 +1047,8 @@ class AMSTray:
self.nozzle_temp_min = 0 self.nozzle_temp_min = 0
self.nozzle_temp_max = 0 self.nozzle_temp_max = 0
self.remain = 0 self.remain = 0
self.tag_uid = "0000000000000000" self.tag_uid = ""
self.tray_uuid = ""
self.k = 0 self.k = 0
else: else:
self.empty = False self.empty = False
@ -1011,6 +1061,7 @@ class AMSTray:
self.nozzle_temp_max = data.get('nozzle_temp_max', self.nozzle_temp_max) self.nozzle_temp_max = data.get('nozzle_temp_max', self.nozzle_temp_max)
self.remain = data.get('remain', self.remain) self.remain = data.get('remain', self.remain)
self.tag_uid = data.get('tag_uid', self.tag_uid) self.tag_uid = data.get('tag_uid', self.tag_uid)
self.tray_uuid = data.get('tray_uuid', self.tray_uuid)
self.k = data.get('k', self.k) self.k = data.get('k', self.k)
return (old_data != f"{self.__dict__}") return (old_data != f"{self.__dict__}")
@ -1092,8 +1143,7 @@ class Speed:
command = SPEED_PROFILE_TEMPLATE command = SPEED_PROFILE_TEMPLATE
command['print']['param'] = f"{id}" command['print']['param'] = f"{id}"
self._client.publish(command) self._client.publish(command)
if self._client.callback is not None: self._client.callback("event_speed_update")
self._client.callback("event_speed_update")
@dataclass @dataclass
@ -1158,7 +1208,8 @@ class HMSList:
attr = int(hms['attr']) attr = int(hms['attr'])
code = int(hms['code']) code = int(hms['code'])
hms_notif = HMSNotification(attr=attr, code=code) hms_notif = HMSNotification(attr=attr, code=code)
errors[f"{index}-Error"] = f"HMS_{hms_notif.hms_code}: {get_HMS_error_text(hms_notif.hms_code)}" errors[f"{index}-Code"] = f"HMS_{hms_notif.hms_code}"
errors[f"{index}-Error"] = get_HMS_error_text(hms_notif.hms_code)
errors[f"{index}-Wiki"] = hms_notif.wiki_url errors[f"{index}-Wiki"] = hms_notif.wiki_url
errors[f"{index}-Severity"] = hms_notif.severity errors[f"{index}-Severity"] = hms_notif.severity
#LOGGER.debug(f"HMS error for '{hms_notif.module}' and severity '{hms_notif.severity}': HMS_{hms_notif.hms_code}") #LOGGER.debug(f"HMS error for '{hms_notif.module}' and severity '{hms_notif.severity}': HMS_{hms_notif.hms_code}")
@ -1169,8 +1220,7 @@ class HMSList:
self._errors = errors self._errors = errors
if self._count != 0: if self._count != 0:
LOGGER.warning(f"HMS ERRORS: {errors}") LOGGER.warning(f"HMS ERRORS: {errors}")
if self._client.callback is not None: self._client.callback("event_printer_error")
self._client.callback("event_hms_errors")
return True return True
return False return False
@ -1188,11 +1238,10 @@ class HMSList:
class PrintErrorList: class PrintErrorList:
"""Return all print_error related info""" """Return all print_error related info"""
_error: dict _error: dict
_count: int
def __init__(self, client): def __init__(self, client):
self._error = None
self._client = client self._client = client
self._error = {}
def print_update(self, data) -> bool: def print_update(self, data) -> bool:
# Example payload: # Example payload:
@ -1202,20 +1251,19 @@ class PrintErrorList:
# 'Unable to feed filament into the extruder. This could be due to entangled filament or a stuck spool. If not, please check if the AMS PTFE tube is connected.' # 'Unable to feed filament into the extruder. This could be due to entangled filament or a stuck spool. If not, please check if the AMS PTFE tube is connected.'
if 'print_error' in data.keys(): if 'print_error' in data.keys():
errors = {} errors = None
print_error_code = data.get('print_error') print_error_code = data.get('print_error')
if print_error_code != 0: if print_error_code != 0:
hex_conversion = f'0{int(print_error_code):x}' hex_conversion = f'0{int(print_error_code):x}'
print_error_code_hex = hex_conversion[slice(0,4,1)] + "_" + hex_conversion[slice(4,8,1)] print_error_code_hex = hex_conversion[slice(0,4,1)] + "_" + hex_conversion[slice(4,8,1)]
errors = {} errors = {}
errors[f"Code"] = f"{print_error_code_hex.upper()}" errors[f"code"] = print_error_code_hex.upper()
errors[f"Error"] = f"{print_error_code_hex.upper()}: {get_print_error_text(print_error_code)}" errors[f"error"] = get_print_error_text(print_error_code)
# LOGGER.warning(f"PRINT ERRORS: {errors}") # This will emit a message to home assistant log every 1 second if enabled # LOGGER.warning(f"PRINT ERRORS: {errors}") # This will emit a message to home assistant log every 1 second if enabled
if self._error != errors: if self._error != errors:
self._error = errors self._error = errors
if self._client.callback is not None: self._client.callback("event_print_error")
self._client.callback("event_print_error")
# We send the error event directly so always return False for the general data event. # We send the error event directly so always return False for the general data event.
return False return False
@ -1232,6 +1280,8 @@ class PrintErrorList:
@dataclass @dataclass
class HMSNotification: class HMSNotification:
"""Return an HMS object and all associated details""" """Return an HMS object and all associated details"""
attr: int
code: int
def __init__(self, attr: int = 0, code: int = 0): def __init__(self, attr: int = 0, code: int = 0):
self.attr = attr self.attr = attr
@ -1264,13 +1314,23 @@ class ChamberImage:
def __init__(self, client): def __init__(self, client):
self._client = client self._client = client
self._bytes = bytearray() self._bytes = bytearray()
self._image_last_updated = datetime.now()
def set_jpeg(self, bytes): def set_jpeg(self, bytes):
self._bytes = bytes self._bytes = bytes
self._image_last_updated = datetime.now()
self._client.callback("event_printer_chamber_image_update")
def get_jpeg(self) -> bytearray: def get_jpeg(self) -> bytearray:
return self._bytes.copy() return self._bytes.copy()
def get_last_update_time(self) -> datetime:
return self._image_last_updated
@property
def available(self):
return self._client._enable_camera
@dataclass @dataclass
class CoverImage: class CoverImage:
"""Returns the cover image from the Bambu API""" """Returns the cover image from the Bambu API"""
@ -1279,8 +1339,7 @@ class CoverImage:
self._client = client self._client = client
self._bytes = bytearray() self._bytes = bytearray()
self._image_last_updated = datetime.now() self._image_last_updated = datetime.now()
if self._client.callback is not None: self._client.callback("event_printer_cover_image_update")
self._client.callback("event_printer_cover_image_update")
def set_jpeg(self, bytes): def set_jpeg(self, bytes):
self._bytes = bytes self._bytes = bytes

View File

@ -11,7 +11,9 @@ from .const import (
HMS_SEVERITY_LEVELS, HMS_SEVERITY_LEVELS,
HMS_MODULES, HMS_MODULES,
LOGGER, LOGGER,
BAMBU_URL,
FansEnum, FansEnum,
TempEnum
) )
from .commands import SEND_GCODE_TEMPLATE from .commands import SEND_GCODE_TEMPLATE
@ -48,6 +50,18 @@ def fan_percentage_to_gcode(fan: FansEnum, percentage: int):
return command return command
def set_temperature_to_gcode(temp: TempEnum, temperature: int):
"""Converts a temperature to the gcode command to set that"""
if temp == TempEnum.NOZZLE:
tempCommand = "M104"
elif temp == TempEnum.HEATBED:
tempCommand = "M140"
command = SEND_GCODE_TEMPLATE
command['print']['param'] = f"{tempCommand} S{temperature}\n"
return command
def to_whole(number): def to_whole(number):
if not number: if not number:
return 0 return 0
@ -59,8 +73,8 @@ def get_filament_name(idx, custom_filaments: dict):
result = FILAMENT_NAMES.get(idx, "unknown") result = FILAMENT_NAMES.get(idx, "unknown")
if result == "unknown" and idx != "": if result == "unknown" and idx != "":
result = custom_filaments.get(idx, "unknown") result = custom_filaments.get(idx, "unknown")
if result == "unknown" and idx != "": # if result == "unknown" and idx != "":
LOGGER.debug(f"UNKNOWN FILAMENT IDX: '{idx}'") # LOGGER.debug(f"UNKNOWN FILAMENT IDX: '{idx}'")
return result return result
@ -225,3 +239,10 @@ def round_minute(date: datetime = None, round_to: int = 1):
date = date.replace(second=0, microsecond=0) date = date.replace(second=0, microsecond=0)
delta = date.minute % round_to delta = date.minute % round_to
return date.replace(minute=date.minute - delta) return date.replace(minute=date.minute - delta)
def get_Url(url: str, region: str):
urlstr = BAMBU_URL[url]
if region == "China":
urlstr = urlstr.replace('.com', '.cn')
return urlstr

View File

@ -22,7 +22,7 @@ class PrintingState(APrinterState):
def __init__(self, printer: BambuVirtualPrinter) -> None: def __init__(self, printer: BambuVirtualPrinter) -> None:
super().__init__(printer) super().__init__(printer)
self._current_print_job = None self._printer.current_print_job = None
self._is_printing = False self._is_printing = False
self._sd_printing_thread = None self._sd_printing_thread = None
@ -40,12 +40,15 @@ class PrintingState(APrinterState):
self._printer.current_print_job = None self._printer.current_print_job = None
def _start_worker_thread(self): def _start_worker_thread(self):
self._is_printing = True
if self._sd_printing_thread is None: if self._sd_printing_thread is None:
self._is_printing = True
self._sd_printing_thread = threading.Thread(target=self._printing_worker) self._sd_printing_thread = threading.Thread(target=self._printing_worker)
self._sd_printing_thread.start() self._sd_printing_thread.start()
else:
self._sd_printing_thread.join()
def _printing_worker(self): def _printing_worker(self):
self._log.debug(f"_printing_worker before while loop: {self._printer.current_print_job}")
while ( while (
self._is_printing self._is_printing
and self._printer.current_print_job is not None and self._printer.current_print_job is not None
@ -55,6 +58,7 @@ class PrintingState(APrinterState):
self._printer.report_print_job_status() self._printer.report_print_job_status()
time.sleep(3) time.sleep(3)
self._log.debug(f"_printing_worker after while loop: {self._printer.current_print_job}")
self.update_print_job_info() self.update_print_job_info()
if ( if (
self._printer.current_print_job is not None self._printer.current_print_job is not None
@ -64,17 +68,21 @@ class PrintingState(APrinterState):
def update_print_job_info(self): def update_print_job_info(self):
print_job_info = self._printer.bambu_client.get_device().print_job print_job_info = self._printer.bambu_client.get_device().print_job
task_name: str = print_job_info.subtask_name subtask_name: str = print_job_info.subtask_name
project_file_info = self._printer.project_files.get_file_by_stem( gcode_file: str = print_job_info.gcode_file
task_name, [".gcode", ".3mf"]
) self._log.debug(f"update_print_job_info: {print_job_info}")
project_file_info = self._printer.project_files.get_file_by_name(subtask_name) or self._printer.project_files.get_file_by_name(gcode_file)
if project_file_info is None: if project_file_info is None:
self._log.debug(f"No 3mf file found for {print_job_info}") self._log.debug(f"No 3mf file found for {print_job_info}")
self._current_print_job = None self._printer.current_print_job = None
self._printer.change_state(self._printer._state_idle) self._printer.change_state(self._printer._state_idle)
return return
progress = print_job_info.print_percentage progress = print_job_info.print_percentage
if print_job_info.gcode_state == "PREPARE" and progress == 100:
progress = 0
self._printer.current_print_job = PrintJob(project_file_info, progress, print_job_info.remaining_time, print_job_info.current_layer, print_job_info.total_layers) self._printer.current_print_job = PrintJob(project_file_info, progress, print_job_info.remaining_time, print_job_info.current_layer, print_job_info.total_layers)
self._printer.select_project_file(project_file_info.path.as_posix()) self._printer.select_project_file(project_file_info.path.as_posix())

View File

@ -20,6 +20,16 @@ $(function () {
self.job_info = ko.observable(); self.job_info = ko.observable();
self.auth_type = ko.observable("");
self.show_password = ko.pureComputed(function(){
return self.settingsViewModel.settings.plugins.bambu_printer.auth_token() === '';
});
self.show_verification = ko.pureComputed(function(){
return self.auth_type() !== '';
});
self.ams_mapping_computed = function(){ self.ams_mapping_computed = function(){
var output_list = []; var output_list = [];
var index = 0; var index = 0;
@ -40,16 +50,34 @@ $(function () {
self.getAuthToken = function (data) { self.getAuthToken = function (data) {
self.settingsViewModel.settings.plugins.bambu_printer.auth_token(""); self.settingsViewModel.settings.plugins.bambu_printer.auth_token("");
self.auth_type("");
OctoPrint.simpleApiCommand("bambu_printer", "register", { OctoPrint.simpleApiCommand("bambu_printer", "register", {
"email": self.settingsViewModel.settings.plugins.bambu_printer.email(), "email": self.settingsViewModel.settings.plugins.bambu_printer.email(),
"password": $("#bambu_cloud_password").val(), "password": $("#bambu_cloud_password").val(),
"region": self.settingsViewModel.settings.plugins.bambu_printer.region(), "region": self.settingsViewModel.settings.plugins.bambu_printer.region(),
"auth_token": self.settingsViewModel.settings.plugins.bambu_printer.auth_token() "auth_token": self.settingsViewModel.settings.plugins.bambu_printer.auth_token()
})
.done(function (response) {
self.auth_type(response.auth_response);
});
};
self.verifyCode = function (data) {
self.settingsViewModel.settings.plugins.bambu_printer.auth_token("");
OctoPrint.simpleApiCommand("bambu_printer", "verify", {
"password": $("#bambu_cloud_verify_code").val(),
"auth_type": self.auth_type(),
}) })
.done(function (response) { .done(function (response) {
console.log(response); console.log(response);
self.settingsViewModel.settings.plugins.bambu_printer.auth_token(response.auth_token); if (response.auth_token) {
self.settingsViewModel.settings.plugins.bambu_printer.username(response.username); self.settingsViewModel.settings.plugins.bambu_printer.auth_token(response.auth_token);
self.settingsViewModel.settings.plugins.bambu_printer.username(response.username);
self.auth_type("");
} else if (response.error) {
self.settingsViewModel.settings.plugins.bambu_printer.auth_token("");
$("#bambu_cloud_verify_code").val("");
}
}); });
}; };

View File

@ -40,15 +40,24 @@
<div class="control-group" data-bind="visible: !settingsViewModel.settings.plugins.bambu_printer.local_mqtt()"> <div class="control-group" data-bind="visible: !settingsViewModel.settings.plugins.bambu_printer.local_mqtt()">
<label class="control-label">{{ _('Email') }}</label> <label class="control-label">{{ _('Email') }}</label>
<div class="controls"> <div class="controls">
<input type="text" class="input-block-level" data-bind="value: settingsViewModel.settings.plugins.bambu_printer.email" title="{{ _('Registered email address') }}"></input> <input type="text" class="input-block-level" data-bind="value: settingsViewModel.settings.plugins.bambu_printer.email" title="{{ _('Registered email address') }}" autocomplete="off"></input>
</div> </div>
</div> </div>
<div class="control-group" data-bind="visible: !settingsViewModel.settings.plugins.bambu_printer.local_mqtt()"> <div class="control-group" data-bind="visible: !settingsViewModel.settings.plugins.bambu_printer.local_mqtt() && show_password()">
<label class="control-label">{{ _('Password') }}</label> <label class="control-label">{{ _('Password') }}</label>
<div class="controls"> <div class="controls">
<div class="input-block-level input-append" data-bind="css: {'input-append': !show_verification()}">
<input id="bambu_cloud_password" type="password" class="input-text input-block-level" title="{{ _('Password to generate verification code') }}" autocomplete="new-password"></input>
<span class="btn btn-primary add-on" data-bind="visible: !show_verification(), click: getAuthToken">{{ _('Login') }}</span>
</div>
</div>
</div>
<div class="control-group" data-bind="visible: show_verification()">
<label class="control-label">{{ _('Verify') }}</label>
<div class="controls">
<div class="input-block-level input-append"> <div class="input-block-level input-append">
<input id="bambu_cloud_password" type="password" class="input-text input-block-level" title="{{ _('Password to generate Auth Token') }}"></input> <input id="bambu_cloud_verify_code" type="password" class="input-text input-block-level" title="{{ _('Verification code to generate auth token') }}"></input>
<span class="btn btn-primary add-on" data-bind="click: getAuthToken">{{ _('Login') }}</span> <span class="btn btn-primary add-on" data-bind="click: verifyCode">{{ _('Verify') }}</span>
</div> </div>
</div> </div>
</div> </div>

View File

@ -14,7 +14,7 @@ plugin_package = "octoprint_bambu_printer"
plugin_name = "OctoPrint-BambuPrinter" plugin_name = "OctoPrint-BambuPrinter"
# The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module # The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module
plugin_version = "0.1.8rc6" plugin_version = "0.1.8rc12"
# The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin # The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin
# module # module
@ -33,7 +33,7 @@ plugin_url = "https://github.com/jneilliii/OctoPrint-BambuPrinter"
plugin_license = "AGPLv3" plugin_license = "AGPLv3"
# Any additional requirements besides OctoPrint should be listed here # Any additional requirements besides OctoPrint should be listed here
plugin_requires = ["paho-mqtt<2", "python-dateutil", "httpx[http2]>=0.27.0"] plugin_requires = ["paho-mqtt<2", "python-dateutil", "cloudscraper"]
### -------------------------------------------------------------------------------------------------------------------- ### --------------------------------------------------------------------------------------------------------------------
### More advanced options that you usually shouldn't have to touch follow after this point ### More advanced options that you usually shouldn't have to touch follow after this point