Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
52ba3ff214 | |||
98ab94b371 | |||
b54e372342 | |||
76f706df19 | |||
5c8a9787d4 | |||
e3fda73dd3 | |||
5633d6f6ea | |||
884101c0ba | |||
7c87ba9482 | |||
1d9f874560 | |||
21e30034d0 | |||
3c8b904a26 | |||
55ad4c1718 | |||
4ef8e40702 | |||
2537bc8f57 | |||
28be048300 | |||
eaa0ed94c0 | |||
14af93b1d0 |
@ -2,3 +2,4 @@ include README.md
|
||||
recursive-include octoprint_bambu_printer/templates *
|
||||
recursive-include octoprint_bambu_printer/translations *
|
||||
recursive-include octoprint_bambu_printer/static *
|
||||
include octoprint_bambu_printer/printer/pybambu/filaments.json
|
||||
|
@ -1,4 +1,6 @@
|
||||
from __future__ import absolute_import, annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
import threading
|
||||
from time import perf_counter
|
||||
@ -6,6 +8,8 @@ from contextlib import contextmanager
|
||||
import flask
|
||||
import logging.handlers
|
||||
from urllib.parse import quote as urlquote
|
||||
import os
|
||||
import zipfile
|
||||
|
||||
import octoprint.printer
|
||||
import octoprint.server
|
||||
@ -57,6 +61,7 @@ class BambuPrintPlugin(
|
||||
_plugin_manager: octoprint.plugin.PluginManager
|
||||
_bambu_file_system: RemoteSDCardFileList
|
||||
_timelapse_files_view: CachedFileView
|
||||
_bambu_cloud: None
|
||||
|
||||
def on_settings_initialized(self):
|
||||
self._bambu_file_system = RemoteSDCardFileList(self._settings)
|
||||
@ -101,14 +106,20 @@ class BambuPrintPlugin(
|
||||
"always_use_default_options": False,
|
||||
"ams_data": [],
|
||||
"ams_mapping": [],
|
||||
"ams_current_tray": None,
|
||||
"ams_current_tray": 255,
|
||||
}
|
||||
|
||||
def on_settings_save(self, data):
|
||||
if data.get("local_mqtt", False) is True:
|
||||
data["auth_token"] = ""
|
||||
octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
|
||||
|
||||
def is_api_adminonly(self):
|
||||
return True
|
||||
|
||||
def get_api_commands(self):
|
||||
return {"register": ["email", "password", "region", "auth_token"]}
|
||||
return {"register": ["email", "password", "region", "auth_token"],
|
||||
"verify": ["auth_type", "password"]}
|
||||
|
||||
def on_api_command(self, command, data):
|
||||
if command == "register":
|
||||
@ -119,20 +130,57 @@ class BambuPrintPlugin(
|
||||
and "auth_token" in data
|
||||
):
|
||||
self._logger.info(f"Registering user {data['email']}")
|
||||
bambu_cloud = BambuCloud(
|
||||
self._bambu_cloud = BambuCloud(
|
||||
data["region"], data["email"], data["password"], data["auth_token"]
|
||||
)
|
||||
bambu_cloud.login(data["region"], data["email"], data["password"])
|
||||
auth_response = self._bambu_cloud.login(data["region"], data["email"], data["password"])
|
||||
return flask.jsonify(
|
||||
{
|
||||
"auth_token": bambu_cloud.auth_token,
|
||||
"username": bambu_cloud.username,
|
||||
"auth_response": auth_response,
|
||||
}
|
||||
)
|
||||
elif command == "verify":
|
||||
auth_response = None
|
||||
if (
|
||||
"auth_type" in data
|
||||
and "password" in data
|
||||
and self._bambu_cloud is not None
|
||||
):
|
||||
self._logger.info(f"Verifying user {self._bambu_cloud._email}")
|
||||
if data["auth_type"] == "verifyCode":
|
||||
auth_response = self._bambu_cloud.login_with_verification_code(data["password"])
|
||||
elif data["auth_type"] == "tfa":
|
||||
auth_response = self._bambu_cloud.login_with_2fa_code(data["password"])
|
||||
else:
|
||||
self._logger.warning(f"Unknown verification type: {data['auth_type']}")
|
||||
|
||||
if auth_response == "success":
|
||||
return flask.jsonify(
|
||||
{
|
||||
"auth_token": self._bambu_cloud.auth_token,
|
||||
"username": self._bambu_cloud.username
|
||||
}
|
||||
)
|
||||
else:
|
||||
self._logger.info(f"Error verifying: {auth_response}")
|
||||
return flask.jsonify(
|
||||
{
|
||||
"error": "Unable to verify"
|
||||
}
|
||||
)
|
||||
|
||||
def on_event(self, event, payload):
|
||||
if event == Events.TRANSFER_DONE:
|
||||
self._printer.commands("M20 L T", force=True)
|
||||
elif event == Events.FILE_ADDED:
|
||||
if payload["operation"] == "add" and "3mf" in payload["type"]:
|
||||
file_container = os.path.join(self._settings.getBaseFolder("uploads"), payload["path"])
|
||||
with zipfile.ZipFile(file_container) as z:
|
||||
with z.open("Metadata/plate_1.json", "r") as json_data:
|
||||
plate_data = json.load(json_data)
|
||||
|
||||
if plate_data:
|
||||
self._file_manager.set_additional_metadata("sdcard", payload["path"], "plate_data", plate_data, overwrite=True)
|
||||
|
||||
def support_3mf_files(self):
|
||||
return {"machinecode": {"3mf": ["3mf"]}}
|
||||
|
@ -43,7 +43,7 @@ class BambuPrinterTelemetry:
|
||||
lastTempAt: float = time.monotonic()
|
||||
firmwareName: str = "Bambu"
|
||||
extruderCount: int = 1
|
||||
ams_current_tray: int = -1
|
||||
ams_current_tray: int = 255
|
||||
|
||||
|
||||
# noinspection PyBroadException
|
||||
@ -170,22 +170,6 @@ class BambuVirtualPrinter:
|
||||
def change_state(self, new_state: APrinterState):
|
||||
self._state_change_queue.put(new_state)
|
||||
|
||||
def _convert2serialize(self, obj):
|
||||
if isinstance(obj, dict):
|
||||
return {k: self._convert2serialize(v) for k, v in obj.items()}
|
||||
elif hasattr(obj, "_ast"):
|
||||
return self._convert2serialize(obj._ast())
|
||||
elif not isinstance(obj, str) and hasattr(obj, "__iter__"):
|
||||
return [self._convert2serialize(v) for v in obj]
|
||||
elif hasattr(obj, "__dict__"):
|
||||
return {
|
||||
k: self._convert2serialize(v)
|
||||
for k, v in obj.__dict__.items()
|
||||
if not callable(v) and not k.startswith('_')
|
||||
}
|
||||
else:
|
||||
return obj
|
||||
|
||||
def new_update(self, event_type):
|
||||
if event_type == "event_hms_errors":
|
||||
self._update_hms_errors()
|
||||
@ -196,7 +180,8 @@ class BambuVirtualPrinter:
|
||||
device_data = self.bambu_client.get_device()
|
||||
print_job_state = device_data.print_job.gcode_state
|
||||
temperatures = device_data.temperature
|
||||
ams_data = self._convert2serialize(device_data.ams.data)
|
||||
# strip out extra data to avoid unneeded settings updates
|
||||
ams_data = [{"tray": asdict(x).pop("tray", None)} for x in device_data.ams.data if x is not None]
|
||||
|
||||
if self.ams_data != ams_data:
|
||||
self._log.debug(f"Recieveid AMS Update: {ams_data}")
|
||||
@ -210,7 +195,8 @@ class BambuVirtualPrinter:
|
||||
self._telemetry.bedTemp = temperatures.bed_temp
|
||||
self._telemetry.bedTargetTemp = temperatures.target_bed_temp
|
||||
self._telemetry.chamberTemp = temperatures.chamber_temp
|
||||
self._telemetry.ams_current_tray = device_data.push_all_data["ams"]["tray_now"] or -1
|
||||
if device_data.push_all_data and "ams" in device_data.push_all_data:
|
||||
self._telemetry.ams_current_tray = device_data.push_all_data["ams"]["tray_now"] or 255
|
||||
|
||||
if self._telemetry.ams_current_tray != self._settings.get_int(["ams_current_tray"]):
|
||||
self._settings.set_int(["ams_current_tray"], self._telemetry.ams_current_tray)
|
||||
@ -273,19 +259,20 @@ class BambuVirtualPrinter:
|
||||
f"connecting via local mqtt: {self._settings.get_boolean(['local_mqtt'])}"
|
||||
)
|
||||
bambu_client = BambuClient(
|
||||
device_type=self._settings.get(["device_type"]),
|
||||
serial=self._settings.get(["serial"]),
|
||||
host=self._settings.get(["host"]),
|
||||
username=(
|
||||
{"device_type": self._settings.get(["device_type"]),
|
||||
"serial": self._settings.get(["serial"]),
|
||||
"host": self._settings.get(["host"]),
|
||||
"username": (
|
||||
"bblp"
|
||||
if self._settings.get_boolean(["local_mqtt"])
|
||||
else self._settings.get(["username"])
|
||||
),
|
||||
access_code=self._settings.get(["access_code"]),
|
||||
local_mqtt=self._settings.get_boolean(["local_mqtt"]),
|
||||
region=self._settings.get(["region"]),
|
||||
email=self._settings.get(["email"]),
|
||||
auth_token=self._settings.get(["auth_token"]),
|
||||
"access_code": self._settings.get(["access_code"]),
|
||||
"local_mqtt": self._settings.get_boolean(["local_mqtt"]),
|
||||
"region": self._settings.get(["region"]),
|
||||
"email": self._settings.get(["email"]),
|
||||
"auth_token": self._settings.get(["auth_token"]) if self._settings.get_boolean(["local_mqtt"]) is False else "",
|
||||
}
|
||||
)
|
||||
bambu_client.on_disconnect = self.on_disconnect(bambu_client.on_disconnect)
|
||||
bambu_client.on_connect = self.on_connect(bambu_client.on_connect)
|
||||
@ -344,21 +331,21 @@ class BambuVirtualPrinter:
|
||||
self._selected_project_file = None
|
||||
|
||||
def select_project_file(self, file_path: str) -> bool:
|
||||
self._log.debug(f"Select project file: {file_path}")
|
||||
file_info = self._project_files_view.get_file_by_stem(
|
||||
file_path, [".gcode", ".3mf"]
|
||||
)
|
||||
file_info = self._project_files_view.get_file_by_name(file_path)
|
||||
if (
|
||||
self._selected_project_file is not None
|
||||
and file_info is not None
|
||||
and self._selected_project_file.path == file_info.path
|
||||
):
|
||||
self._log.debug(f"File already selected: {file_path}")
|
||||
return True
|
||||
|
||||
if file_info is None:
|
||||
self._log.error(f"Cannot select not existing file: {file_path}")
|
||||
self._log.error(f"Cannot select non-existent file: {file_path}")
|
||||
return False
|
||||
|
||||
self._log.debug(f"Select project file: {file_path}")
|
||||
|
||||
self._selected_project_file = file_info
|
||||
self._send_file_selected_message()
|
||||
return True
|
||||
@ -366,8 +353,9 @@ class BambuVirtualPrinter:
|
||||
##~~ command implementations
|
||||
|
||||
@gcode_executor.register_no_data("M21")
|
||||
def _sd_status(self) -> None:
|
||||
def _sd_status(self) -> bool:
|
||||
self.sendIO("SD card ok")
|
||||
return True
|
||||
|
||||
@gcode_executor.register("M23")
|
||||
def _select_sd_file(self, data: str) -> bool:
|
||||
@ -468,6 +456,9 @@ class BambuVirtualPrinter:
|
||||
# noinspection PyUnusedLocal
|
||||
@gcode_executor.register_no_data("M115")
|
||||
def _report_firmware_info(self) -> bool:
|
||||
# wait for connection to be established before sending back firmware info
|
||||
while self.bambu_client.connected is False:
|
||||
time.sleep(1)
|
||||
self.sendIO("Bambu Printer Integration")
|
||||
self.sendIO("Cap:AUTOREPORT_SD_STATUS:1")
|
||||
self.sendIO("Cap:AUTOREPORT_TEMP:1")
|
||||
@ -680,7 +671,7 @@ class BambuVirtualPrinter:
|
||||
self._state_change_queue.join()
|
||||
|
||||
def _printer_worker(self):
|
||||
self._create_client_connection_async()
|
||||
# self._create_client_connection_async()
|
||||
self.sendIO("Printer connection complete")
|
||||
while self._running:
|
||||
try:
|
||||
|
@ -35,8 +35,8 @@ class CachedFileView:
|
||||
result: list[FileInfo] = []
|
||||
|
||||
with self.file_system.get_ftps_client() as ftp:
|
||||
for filter in self.folder_view.keys():
|
||||
result.extend(self.file_system.list_files(*filter, ftp, existing_files))
|
||||
for key in self.folder_view.keys():
|
||||
result.extend(self.file_system.list_files(*key, ftp, existing_files))
|
||||
return result
|
||||
|
||||
def update(self):
|
||||
@ -56,6 +56,9 @@ class CachedFileView:
|
||||
def get_all_cached_info(self):
|
||||
return list(self._file_data_cache.values())
|
||||
|
||||
def get_keys_as_list(self):
|
||||
return list(self._file_data_cache.keys()) + list(self._file_alias_cache.keys())
|
||||
|
||||
def get_file_data(self, file_path: str | Path) -> FileInfo | None:
|
||||
file_data = self.get_file_data_cached(file_path)
|
||||
if file_data is None:
|
||||
@ -73,22 +76,23 @@ class CachedFileView:
|
||||
file_path = self._file_alias_cache.get(file_path, file_path)
|
||||
return self._file_data_cache.get(file_path, None)
|
||||
|
||||
def get_file_by_stem(self, file_stem: str, allowed_suffixes: list[str]):
|
||||
if file_stem == "":
|
||||
def get_file_by_name(self, file_name: str):
|
||||
if file_name == "":
|
||||
return None
|
||||
|
||||
file_stem = Path(file_stem).with_suffix("").stem
|
||||
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes)
|
||||
file_list = self.get_keys_as_list()
|
||||
if not file_name in file_list:
|
||||
if f"{file_name}.3mf" in file_list:
|
||||
file_name = f"{file_name}.3mf"
|
||||
elif f"{file_name}.gcode.3mf" in file_list:
|
||||
file_name = f"{file_name}.gcode.3mf"
|
||||
elif f"cache/{file_name}.3mf" in file_list:
|
||||
file_name = f"cache/{file_name}.3mf"
|
||||
elif f"cache/{file_name}.gcode.3mf" in file_list:
|
||||
file_name = f"cache/{file_name}.gcode.3mf"
|
||||
|
||||
file_data = self.get_file_data_cached(file_name)
|
||||
if file_data is None:
|
||||
self.update()
|
||||
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes)
|
||||
return self.get_file_by_name(file_name)
|
||||
return file_data
|
||||
|
||||
def _get_file_by_stem_cached(self, file_stem: str, allowed_suffixes: list[str]):
|
||||
for file_path_str in list(self._file_data_cache.keys()) + list(self._file_alias_cache.keys()):
|
||||
file_path = Path(file_path_str)
|
||||
if file_stem == file_path.with_suffix("").stem and all(
|
||||
suffix in allowed_suffixes for suffix in file_path.suffixes
|
||||
):
|
||||
return self.get_file_data_cached(file_path)
|
||||
return None
|
||||
|
@ -10,6 +10,9 @@ from octoprint_bambu_printer.printer.file_system.remote_sd_card_file_list import
|
||||
class PrintJob:
|
||||
file_info: FileInfo
|
||||
progress: int
|
||||
remaining_time: int
|
||||
current_layer: int
|
||||
total_layers: int
|
||||
|
||||
@property
|
||||
def file_position(self):
|
||||
|
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import queue
|
||||
import json
|
||||
import math
|
||||
@ -36,6 +37,7 @@ class WatchdogThread(threading.Thread):
|
||||
self._stop_event = threading.Event()
|
||||
self._last_received_data = time.time()
|
||||
super().__init__()
|
||||
self.daemon = True
|
||||
self.setName(f"{self._client._device.info.device_type}-Watchdog-{threading.get_native_id()}")
|
||||
|
||||
def stop(self):
|
||||
@ -70,6 +72,7 @@ class ChamberImageThread(threading.Thread):
|
||||
self._client = client
|
||||
self._stop_event = threading.Event()
|
||||
super().__init__()
|
||||
self.daemon = True
|
||||
self.setName(f"{self._client._device.info.device_type}-Chamber-{threading.get_native_id()}")
|
||||
|
||||
def stop(self):
|
||||
@ -223,6 +226,7 @@ class MqttThread(threading.Thread):
|
||||
self._client = client
|
||||
self._stop_event = threading.Event()
|
||||
super().__init__()
|
||||
self.daemon = True
|
||||
self.setName(f"{self._client._device.info.device_type}-Mqtt-{threading.get_native_id()}")
|
||||
|
||||
def stop(self):
|
||||
@ -281,23 +285,31 @@ class BambuClient:
|
||||
_camera = None
|
||||
_usage_hours: float
|
||||
|
||||
def __init__(self, device_type: str, serial: str, host: str, local_mqtt: bool, region: str, email: str,
|
||||
username: str, auth_token: str, access_code: str, usage_hours: float = 0, manual_refresh_mode: bool = False):
|
||||
self.callback = None
|
||||
self.host = host
|
||||
self._local_mqtt = local_mqtt
|
||||
self._serial = serial
|
||||
self._auth_token = auth_token
|
||||
self._access_code = access_code
|
||||
self._username = username
|
||||
def __init__(self, config):
|
||||
self.host = config['host']
|
||||
self._callback = None
|
||||
|
||||
self._access_code = config.get('access_code', '')
|
||||
self._auth_token = config.get('auth_token', '')
|
||||
self._device_type = config.get('device_type', 'unknown')
|
||||
self._local_mqtt = config.get('local_mqtt', False)
|
||||
self._manual_refresh_mode = config.get('manual_refresh_mode', False)
|
||||
self._serial = config.get('serial', '')
|
||||
self._usage_hours = config.get('usage_hours', 0)
|
||||
self._username = config.get('username', '')
|
||||
self._enable_camera = config.get('enable_camera', True)
|
||||
|
||||
self._connected = False
|
||||
self._device_type = device_type
|
||||
self._usage_hours = usage_hours
|
||||
self._port = 1883
|
||||
self._refreshed = False
|
||||
self._manual_refresh_mode = manual_refresh_mode
|
||||
|
||||
self._device = Device(self)
|
||||
self.bambu_cloud = BambuCloud(region, email, username, auth_token)
|
||||
self.bambu_cloud = BambuCloud(
|
||||
config.get('region', ''),
|
||||
config.get('email', ''),
|
||||
config.get('username', ''),
|
||||
config.get('auth_token', '')
|
||||
)
|
||||
self.slicer_settings = SlicerSettings(self)
|
||||
|
||||
@property
|
||||
@ -317,20 +329,40 @@ class BambuClient:
|
||||
self.disconnect()
|
||||
else:
|
||||
# Reconnect normally
|
||||
self.connect(self.callback)
|
||||
self.connect(self._callback)
|
||||
|
||||
@property
|
||||
def camera_enabled(self):
|
||||
return self._enable_camera
|
||||
|
||||
def callback(self, event: str):
|
||||
if self._callback is not None:
|
||||
self._callback(event)
|
||||
|
||||
def set_camera_enabled(self, enable):
|
||||
self._enable_camera = enable
|
||||
if self._enable_camera:
|
||||
self._start_camera()
|
||||
else:
|
||||
self._stop_camera()
|
||||
|
||||
def setup_tls(self):
|
||||
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
|
||||
self.client.tls_insecure_set(True)
|
||||
|
||||
def connect(self, callback):
|
||||
"""Connect to the MQTT Broker"""
|
||||
self.client = mqtt.Client()
|
||||
self.callback = callback
|
||||
self._callback = callback
|
||||
self.client.on_connect = self.on_connect
|
||||
self.client.on_disconnect = self.on_disconnect
|
||||
self.client.on_message = self.on_message
|
||||
# Set aggressive reconnect polling.
|
||||
self.client.reconnect_delay_set(min_delay=1, max_delay=1)
|
||||
|
||||
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
|
||||
self.client.tls_insecure_set(True)
|
||||
# Run the blocking tls_set method in a separate thread
|
||||
self.setup_tls()
|
||||
|
||||
self._port = 8883
|
||||
if self._local_mqtt:
|
||||
self.client.username_pw_set("bblp", password=self._access_code)
|
||||
@ -361,6 +393,22 @@ class BambuClient:
|
||||
LOGGER.info("On Connect: Connected to printer")
|
||||
self._on_connect()
|
||||
|
||||
def _start_camera(self):
|
||||
if not self._device.supports_feature(Features.CAMERA_RTSP):
|
||||
if self._device.supports_feature(Features.CAMERA_IMAGE):
|
||||
if self._enable_camera:
|
||||
LOGGER.debug("Starting Chamber Image thread")
|
||||
self._camera = ChamberImageThread(self)
|
||||
self._camera.start()
|
||||
elif (self.host == "") or (self._access_code == ""):
|
||||
LOGGER.debug("Skipping camera setup as local access details not provided.")
|
||||
|
||||
def _stop_camera(self):
|
||||
if self._camera is not None:
|
||||
LOGGER.debug("Stopping camera thread")
|
||||
self._camera.stop()
|
||||
self._camera.join()
|
||||
|
||||
def _on_connect(self):
|
||||
self._connected = True
|
||||
self.subscribe_and_request_info()
|
||||
@ -369,10 +417,7 @@ class BambuClient:
|
||||
self._watchdog = WatchdogThread(self)
|
||||
self._watchdog.start()
|
||||
|
||||
if self._device.supports_feature(Features.CAMERA_IMAGE):
|
||||
LOGGER.debug("Starting Chamber Image thread")
|
||||
self._camera = ChamberImageThread(self)
|
||||
self._camera.start()
|
||||
self._start_camera()
|
||||
|
||||
def try_on_connect(self,
|
||||
client_: mqtt.Client,
|
||||
@ -405,10 +450,7 @@ class BambuClient:
|
||||
LOGGER.debug("Stopping watchdog thread")
|
||||
self._watchdog.stop()
|
||||
self._watchdog.join()
|
||||
if self._camera is not None:
|
||||
LOGGER.debug("Stopping camera thread")
|
||||
self._camera.stop()
|
||||
self._camera.join()
|
||||
self._stop_camera()
|
||||
|
||||
def _on_watchdog_fired(self):
|
||||
LOGGER.info("Watch dog fired")
|
||||
@ -451,9 +493,7 @@ class BambuClient:
|
||||
LOGGER.debug("Got Version Data")
|
||||
self._device.info_update(data=json_data.get("info"))
|
||||
except Exception as e:
|
||||
LOGGER.error("An exception occurred processing a message:")
|
||||
LOGGER.error(f"Exception type: {type(e)}")
|
||||
LOGGER.error(f"Exception data: {e}")
|
||||
LOGGER.error("An exception occurred processing a message:", exc_info=e)
|
||||
|
||||
def subscribe(self):
|
||||
"""Subscribe to report topic"""
|
||||
@ -475,7 +515,7 @@ class BambuClient:
|
||||
"""Force refresh data"""
|
||||
|
||||
if self._manual_refresh_mode:
|
||||
self.connect(self.callback)
|
||||
self.connect(self._callback)
|
||||
else:
|
||||
LOGGER.debug("Force Refresh: Getting Version Info")
|
||||
self._refreshed = True
|
||||
@ -516,8 +556,10 @@ class BambuClient:
|
||||
self.client.on_disconnect = self.on_disconnect
|
||||
self.client.on_message = on_message
|
||||
|
||||
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
|
||||
self.client.tls_insecure_set(True)
|
||||
# Run the blocking tls_set method in a separate thread
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, self.setup_tls)
|
||||
|
||||
if self._local_mqtt:
|
||||
self.client.username_pw_set("bblp", password=self._access_code)
|
||||
else:
|
||||
|
@ -1,12 +1,68 @@
|
||||
from __future__ import annotations
|
||||
from enum import (
|
||||
Enum,
|
||||
)
|
||||
|
||||
import base64
|
||||
import cloudscraper
|
||||
import json
|
||||
import httpx
|
||||
import requests
|
||||
|
||||
class ConnectionMechanismEnum(Enum):
|
||||
CLOUDSCRAPER = 1,
|
||||
CURL_CFFI = 2,
|
||||
REQUESTS = 3
|
||||
|
||||
CONNECTION_MECHANISM = ConnectionMechanismEnum.CLOUDSCRAPER
|
||||
|
||||
curl_available = False
|
||||
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
|
||||
try:
|
||||
from curl_cffi import requests as curl_requests
|
||||
curl_available = True
|
||||
except ImportError:
|
||||
curl_available = False
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .const import LOGGER
|
||||
from .const import (
|
||||
LOGGER,
|
||||
BambuUrl
|
||||
)
|
||||
|
||||
from .utils import get_Url
|
||||
|
||||
IMPERSONATE_BROWSER='chrome'
|
||||
|
||||
class CloudflareError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__("Blocked by Cloudflare")
|
||||
self.error_code = 403
|
||||
|
||||
class EmailCodeRequiredError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__("Email code required")
|
||||
self.error_code = 400
|
||||
|
||||
class EmailCodeExpiredError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__("Email code expired")
|
||||
self.error_code = 400
|
||||
|
||||
class EmailCodeIncorrectError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__("Email code incorrect")
|
||||
self.error_code = 400
|
||||
|
||||
class TfaCodeRequiredError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__("Two factor authentication code required")
|
||||
self.error_code = 400
|
||||
|
||||
class CurlUnavailableError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__("curl library unavailable")
|
||||
self.error_code = 400
|
||||
|
||||
@dataclass
|
||||
class BambuCloud:
|
||||
@ -16,30 +72,235 @@ class BambuCloud:
|
||||
self._email = email
|
||||
self._username = username
|
||||
self._auth_token = auth_token
|
||||
self._tfaKey = None
|
||||
|
||||
def _get_authentication_token(self) -> dict:
|
||||
LOGGER.debug("Getting accessToken from Bambu Cloud")
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/user-service/user/login'
|
||||
def _get_headers(self):
|
||||
return {
|
||||
'User-Agent': 'bambu_network_agent/01.09.05.01',
|
||||
'X-BBL-Client-Name': 'OrcaSlicer',
|
||||
'X-BBL-Client-Type': 'slicer',
|
||||
'X-BBL-Client-Version': '01.09.05.51',
|
||||
'X-BBL-Language': 'en-US',
|
||||
'X-BBL-OS-Type': 'linux',
|
||||
'X-BBL-OS-Version': '6.2.0',
|
||||
'X-BBL-Agent-Version': '01.09.05.01',
|
||||
'X-BBL-Executable-info': '{}',
|
||||
'X-BBL-Agent-OS-Type': 'linux',
|
||||
'accept': 'application/json',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
# Orca/Bambu Studio also add this - need to work out what an appropriate ID is to put here:
|
||||
# 'X-BBL-Device-ID': BBL_AUTH_UUID,
|
||||
# Example: X-BBL-Device-ID: 370f9f43-c6fe-47d7-aec9-5fe5ef7e7673
|
||||
|
||||
def _get_headers_with_auth_token(self) -> dict:
|
||||
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
|
||||
headers = {}
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/user-service/user/login'
|
||||
headers = {'User-Agent' : "HA Bambulab"}
|
||||
data = {'account': self._email, 'password': self._password}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.post(url, headers=headers, json=data, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
headers = self._get_headers()
|
||||
headers['Authorization'] = f"Bearer {self._auth_token}"
|
||||
return headers
|
||||
|
||||
def _test_response(self, response, return400=False):
|
||||
# Check specifically for cloudflare block
|
||||
if response.status_code == 403 and 'cloudflare' in response.text:
|
||||
LOGGER.debug("BLOCKED BY CLOUDFLARE")
|
||||
raise CloudflareError()
|
||||
|
||||
if response.status_code == 400 and not return400:
|
||||
LOGGER.error(f"Connection failed with error code: {response.status_code}")
|
||||
LOGGER.debug(f"Response: '{response.text}'")
|
||||
raise PermissionError(response.status_code, response.text)
|
||||
|
||||
if response.status_code > 400:
|
||||
LOGGER.error(f"Connection failed with error code: {response.status_code}")
|
||||
LOGGER.debug(f"Response: '{response.text}'")
|
||||
raise PermissionError(response.status_code, response.text)
|
||||
|
||||
LOGGER.debug(f"Response: {response.status_code}")
|
||||
|
||||
def _get(self, urlenum: BambuUrl):
|
||||
url = get_Url(urlenum, self._region)
|
||||
headers=self._get_headers_with_auth_token()
|
||||
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
|
||||
if not curl_available:
|
||||
LOGGER.debug(f"Curl library is unavailable.")
|
||||
raise CurlUnavailableError()
|
||||
response = curl_requests.get(url, headers=headers, timeout=10, impersonate=IMPERSONATE_BROWSER)
|
||||
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.CLOUDSCRAPER:
|
||||
if len(headers) == 0:
|
||||
headers = self._get_headers()
|
||||
scraper = cloudscraper.create_scraper()
|
||||
response = scraper.get(url, headers=headers, timeout=10)
|
||||
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.REQUESTS:
|
||||
if len(headers) == 0:
|
||||
headers = self._get_headers()
|
||||
response = requests.get(url, headers=headers, timeout=10)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
self._test_response(response)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def _post(self, urlenum: BambuUrl, json: str, headers={}, return400=False):
|
||||
url = get_Url(urlenum, self._region)
|
||||
if CONNECTION_MECHANISM == ConnectionMechanismEnum.CURL_CFFI:
|
||||
if not curl_available:
|
||||
LOGGER.debug(f"Curl library is unavailable.")
|
||||
raise CurlUnavailableError()
|
||||
response = curl_requests.post(url, headers=headers, json=json, impersonate=IMPERSONATE_BROWSER)
|
||||
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.CLOUDSCRAPER:
|
||||
if len(headers) == 0:
|
||||
headers = self._get_headers()
|
||||
scraper = cloudscraper.create_scraper()
|
||||
response = scraper.post(url, headers=headers, json=json)
|
||||
elif CONNECTION_MECHANISM == ConnectionMechanismEnum.REQUESTS:
|
||||
if len(headers) == 0:
|
||||
headers = self._get_headers()
|
||||
response = requests.post(url, headers=headers, json=json)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
self._test_response(response, return400)
|
||||
|
||||
return response
|
||||
|
||||
def _get_authentication_token(self) -> str:
|
||||
LOGGER.debug("Getting accessToken from Bambu Cloud")
|
||||
|
||||
# First we need to find out how Bambu wants us to login.
|
||||
data = {
|
||||
"account": self._email,
|
||||
"password": self._password,
|
||||
"apiError": ""
|
||||
}
|
||||
|
||||
response = self._post(BambuUrl.LOGIN, json=data)
|
||||
|
||||
auth_json = response.json()
|
||||
accessToken = auth_json.get('accessToken', '')
|
||||
if accessToken != '':
|
||||
# We were provided the accessToken directly.
|
||||
return accessToken
|
||||
|
||||
loginType = auth_json.get("loginType", None)
|
||||
if loginType is None:
|
||||
LOGGER.error(f"loginType not present")
|
||||
LOGGER.error(f"Response not understood: '{response.text}'")
|
||||
return ValueError(0) # FIXME
|
||||
elif loginType == 'verifyCode':
|
||||
LOGGER.debug(f"Received verifyCode response")
|
||||
raise EmailCodeRequiredError()
|
||||
elif loginType == 'tfa':
|
||||
# Store the tfaKey for later use
|
||||
LOGGER.debug(f"Received tfa response")
|
||||
self._tfaKey = auth_json.get("tfaKey")
|
||||
raise TfaCodeRequiredError()
|
||||
else:
|
||||
LOGGER.debug(f"Did not understand json. loginType = '{loginType}'")
|
||||
LOGGER.error(f"Response not understood: '{response.text}'")
|
||||
return ValueError(1) # FIXME
|
||||
|
||||
def _get_email_verification_code(self):
|
||||
# Send the verification code request
|
||||
data = {
|
||||
"email": self._email,
|
||||
"type": "codeLogin"
|
||||
}
|
||||
|
||||
LOGGER.debug("Requesting verification code")
|
||||
self._post(BambuUrl.EMAIL_CODE, json=data)
|
||||
LOGGER.debug("Verification code requested successfully.")
|
||||
|
||||
def _get_authentication_token_with_verification_code(self, code) -> dict:
|
||||
LOGGER.debug("Attempting to connect with provided verification code.")
|
||||
data = {
|
||||
"account": self._email,
|
||||
"code": code
|
||||
}
|
||||
|
||||
response = self._post(BambuUrl.LOGIN, json=data, return400=True)
|
||||
status_code = response.status_code
|
||||
|
||||
if status_code == 200:
|
||||
LOGGER.debug("Authentication successful.")
|
||||
LOGGER.debug(f"Response = '{response.json()}'")
|
||||
elif status_code == 400:
|
||||
LOGGER.debug(f"Received response: {response.json()}")
|
||||
if response.json()['code'] == 1:
|
||||
# Code has expired. Request a new one.
|
||||
self._get_email_verification_code()
|
||||
raise EmailCodeExpiredError()
|
||||
elif response.json()['code'] == 2:
|
||||
# Code was incorrect. Let the user try again.
|
||||
raise EmailCodeIncorrectError()
|
||||
else:
|
||||
LOGGER.error(f"Response not understood: '{response.json()}'")
|
||||
raise ValueError(response.json()['code'])
|
||||
|
||||
return response.json()['accessToken']
|
||||
|
||||
def _get_authentication_token_with_2fa_code(self, code: str) -> dict:
|
||||
LOGGER.debug("Attempting to connect with provided 2FA code.")
|
||||
|
||||
data = {
|
||||
"tfaKey": self._tfaKey,
|
||||
"tfaCode": code
|
||||
}
|
||||
|
||||
response = self._post(BambuUrl.TFA_LOGIN, json=data)
|
||||
|
||||
LOGGER.debug(f"Response: {response.status_code}")
|
||||
if response.status_code == 200:
|
||||
LOGGER.debug("Authentication successful.")
|
||||
|
||||
cookies = response.cookies.get_dict()
|
||||
token_from_tfa = cookies.get("token")
|
||||
#LOGGER.debug(f"token_from_tfa: {token_from_tfa}")
|
||||
|
||||
return token_from_tfa
|
||||
|
||||
def _get_username_from_authentication_token(self) -> str:
|
||||
LOGGER.debug("Trying to get username from authentication token.")
|
||||
# User name is in 2nd portion of the auth token (delimited with periods)
|
||||
b64_string = self._auth_token.split(".")[1]
|
||||
# String must be multiples of 4 chars in length. For decode pad with = character
|
||||
b64_string += "=" * ((4 - len(b64_string) % 4) % 4)
|
||||
jsonAuthToken = json.loads(base64.b64decode(b64_string))
|
||||
# Gives json payload with "username":"u_<digits>" within it
|
||||
return jsonAuthToken['username']
|
||||
username = None
|
||||
tokens = self._auth_token.split(".")
|
||||
if len(tokens) != 3:
|
||||
LOGGER.debug("Received authToken is not a JWT.")
|
||||
LOGGER.debug("Trying to use project API to retrieve username instead")
|
||||
response = self.get_projects();
|
||||
if response is not None:
|
||||
projectsnode = response.get('projects', None)
|
||||
if projectsnode is None:
|
||||
LOGGER.debug("Failed to find projects node")
|
||||
else:
|
||||
if len(projectsnode) == 0:
|
||||
LOGGER.debug("No projects node in response")
|
||||
else:
|
||||
project=projectsnode[0]
|
||||
if project.get('user_id', None) is None:
|
||||
LOGGER.debug("No user_id entry")
|
||||
else:
|
||||
username = f"u_{project['user_id']}"
|
||||
LOGGER.debug(f"Found user_id of {username}")
|
||||
else:
|
||||
LOGGER.debug("Authentication token looks to be a JWT")
|
||||
try:
|
||||
b64_string = self._auth_token.split(".")[1]
|
||||
# String must be multiples of 4 chars in length. For decode pad with = character
|
||||
b64_string += "=" * ((4 - len(b64_string) % 4) % 4)
|
||||
jsonAuthToken = json.loads(base64.b64decode(b64_string))
|
||||
# Gives json payload with "username":"u_<digits>" within it
|
||||
username = jsonAuthToken.get('username', None)
|
||||
except:
|
||||
LOGGER.debug("Unable to decode authToken to json to retrieve username.")
|
||||
|
||||
if username is None:
|
||||
LOGGER.debug(f"Unable to decode authToken to retrieve username. AuthToken = {self._auth_token}")
|
||||
|
||||
return username
|
||||
|
||||
# Retrieves json description of devices in the form:
|
||||
# {
|
||||
@ -56,7 +317,7 @@ class BambuCloud:
|
||||
# 'dev_product_name': 'P1S',
|
||||
# 'dev_access_code': 'REDACTED',
|
||||
# 'nozzle_diameter': 0.4
|
||||
# },
|
||||
# },
|
||||
# {
|
||||
# 'dev_id': 'REDACTED',
|
||||
# 'name': 'Bambu P1P',
|
||||
@ -66,7 +327,7 @@ class BambuCloud:
|
||||
# 'dev_product_name': 'P1P',
|
||||
# 'dev_access_code': 'REDACTED',
|
||||
# 'nozzle_diameter': 0.4
|
||||
# },
|
||||
# },
|
||||
# {
|
||||
# 'dev_id': 'REDACTED',
|
||||
# 'name': 'Bambu X1C',
|
||||
@ -76,7 +337,7 @@ class BambuCloud:
|
||||
# 'dev_product_name': 'X1 Carbon',
|
||||
# 'dev_access_code': 'REDACTED',
|
||||
# 'nozzle_diameter': 0.4
|
||||
# }
|
||||
# }
|
||||
# ]
|
||||
# }
|
||||
|
||||
@ -91,26 +352,31 @@ class BambuCloud:
|
||||
return False
|
||||
return True
|
||||
|
||||
def login(self, region: str, email: str, password: str):
|
||||
def login(self, region: str, email: str, password: str) -> str:
|
||||
self._region = region
|
||||
self._email = email
|
||||
self._password = password
|
||||
|
||||
self._auth_token = self._get_authentication_token()
|
||||
result = self._get_authentication_token()
|
||||
self._auth_token = result
|
||||
self._username = self._get_username_from_authentication_token()
|
||||
|
||||
def login_with_verification_code(self, code: str):
|
||||
result = self._get_authentication_token_with_verification_code(code)
|
||||
self._auth_token = result
|
||||
self._username = self._get_username_from_authentication_token()
|
||||
|
||||
def login_with_2fa_code(self, code: str):
|
||||
result = self._get_authentication_token_with_2fa_code(code)
|
||||
self._auth_token = result
|
||||
self._username = self._get_username_from_authentication_token()
|
||||
|
||||
def get_device_list(self) -> dict:
|
||||
LOGGER.debug("Getting device list from Bambu Cloud")
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/iot-service/api/user/bind'
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/iot-service/api/user/bind'
|
||||
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, headers=headers, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
try:
|
||||
response = self._get(BambuUrl.BIND)
|
||||
except:
|
||||
return None
|
||||
return response.json()['devices']
|
||||
|
||||
# The slicer settings are of the following form:
|
||||
@ -182,16 +448,11 @@ class BambuCloud:
|
||||
|
||||
def get_slicer_settings(self) -> dict:
|
||||
LOGGER.debug("Getting slicer settings from Bambu Cloud")
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/iot-service/api/slicer/setting?version=undefined'
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/iot-service/api/slicer/setting?version=undefined'
|
||||
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, headers=headers, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.error(f"Slicer settings load failed: {response.status_code}")
|
||||
try:
|
||||
response = self._get(BambuUrl.SLICER_SETTINGS)
|
||||
except:
|
||||
return None
|
||||
LOGGER.debug("Succeeded")
|
||||
return response.json()
|
||||
|
||||
# The task list is of the following form with a 'hits' array with typical 20 entries.
|
||||
@ -237,28 +498,53 @@ class BambuCloud:
|
||||
# },
|
||||
|
||||
def get_tasklist(self) -> dict:
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/user-service/my/tasks'
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/user-service/my/tasks'
|
||||
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, headers=headers, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
LOGGER.debug("Getting full task list from Bambu Cloud")
|
||||
try:
|
||||
response = self._get(BambuUrl.TASKS)
|
||||
except:
|
||||
return None
|
||||
return response.json()
|
||||
|
||||
# Returns a list of projects for the account.
|
||||
#
|
||||
# {
|
||||
# "message": "success",
|
||||
# "code": null,
|
||||
# "error": null,
|
||||
# "projects": [
|
||||
# {
|
||||
# "project_id": "164995388",
|
||||
# "user_id": "1688388450",
|
||||
# "model_id": "US48e2103d939bf8",
|
||||
# "status": "ACTIVE",
|
||||
# "name": "Alcohol_Marker_Storage_for_Copic,_Ohuhu_and_the_like",
|
||||
# "content": "{'printed_plates': [{'plate': 1}]}",
|
||||
# "create_time": "2024-11-17 06:12:33",
|
||||
# "update_time": "2024-11-17 06:12:40"
|
||||
# },
|
||||
# ...
|
||||
#
|
||||
def get_projects(self) -> dict:
|
||||
LOGGER.debug("Getting projects list from Bambu Cloud")
|
||||
try:
|
||||
response = self._get(BambuUrl.PROJECTS)
|
||||
except:
|
||||
return None
|
||||
return response.json()
|
||||
|
||||
def get_latest_task_for_printer(self, deviceId: str) -> dict:
|
||||
LOGGER.debug(f"Getting latest task from Bambu Cloud for Printer: {deviceId}")
|
||||
data = self.get_tasklist_for_printer(deviceId)
|
||||
if len(data) != 0:
|
||||
return data[0]
|
||||
LOGGER.debug("No tasks found for printer")
|
||||
return None
|
||||
LOGGER.debug(f"Getting latest task for printer from Bambu Cloud")
|
||||
try:
|
||||
data = self.get_tasklist_for_printer(deviceId)
|
||||
if len(data) != 0:
|
||||
return data[0]
|
||||
LOGGER.debug("No tasks found for printer")
|
||||
return None
|
||||
except:
|
||||
return None
|
||||
|
||||
def get_tasklist_for_printer(self, deviceId: str) -> dict:
|
||||
LOGGER.debug(f"Getting task list from Bambu Cloud for Printer: {deviceId}")
|
||||
LOGGER.debug(f"Getting full task list for printer from Bambu Cloud")
|
||||
tasks = []
|
||||
data = self.get_tasklist()
|
||||
for task in data['hits']:
|
||||
@ -273,11 +559,11 @@ class BambuCloud:
|
||||
|
||||
def download(self, url: str) -> bytearray:
|
||||
LOGGER.debug(f"Downloading cover image: {url}")
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
try:
|
||||
# This is just a standard download from an unauthenticated end point.
|
||||
response = requests.get(url)
|
||||
except:
|
||||
return None
|
||||
return response.content
|
||||
|
||||
@property
|
||||
@ -288,6 +574,10 @@ class BambuCloud:
|
||||
def auth_token(self):
|
||||
return self._auth_token
|
||||
|
||||
@property
|
||||
def bambu_connected(self) -> bool:
|
||||
return self._auth_token != "" and self._auth_token != None
|
||||
|
||||
@property
|
||||
def cloud_mqtt_host(self):
|
||||
return "cn.mqtt.bambulab.com" if self._region == "China" else "us.mqtt.bambulab.com"
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -15,6 +15,7 @@
|
||||
"GFB01": "Bambu ASA",
|
||||
"GFB02": "Bambu ASA-Aero",
|
||||
"GFB50": "Bambu ABS-GF",
|
||||
"GFB51": "Bambu ASA-CF",
|
||||
"GFB60": "PolyLite ABS",
|
||||
"GFB61": "PolyLite ASA",
|
||||
"GFB98": "Generic ASA",
|
||||
@ -26,6 +27,7 @@
|
||||
"GFG02": "Bambu PETG HF",
|
||||
"GFG50": "Bambu PETG-CF",
|
||||
"GFG60": "PolyLite PETG",
|
||||
"GFG96": "Generic PETG HF",
|
||||
"GFG97": "Generic PCTG",
|
||||
"GFG98": "Generic PETG-CF",
|
||||
"GFG99": "Generic PETG",
|
||||
@ -34,6 +36,13 @@
|
||||
"GFL03": "eSUN PLA+",
|
||||
"GFL04": "Overture PLA",
|
||||
"GFL05": "Overture Matte PLA",
|
||||
"GFL06": "Fiberon PETG-ESD",
|
||||
"GFL50": "Fiberon PA6-CF",
|
||||
"GFL51": "Fiberon PA6-GF",
|
||||
"GFL52": "Fiberon PA12-CF",
|
||||
"GFL53": "Fiberon PA612-CF",
|
||||
"GFL54": "Fiberon PET-CF",
|
||||
"GFL55": "Fiberon PETG-rCF",
|
||||
"GFL95": "Generic PLA High Speed",
|
||||
"GFL96": "Generic PLA Silk",
|
||||
"GFL98": "Generic PLA-CF",
|
||||
@ -41,6 +50,7 @@
|
||||
"GFN03": "Bambu PA-CF",
|
||||
"GFN04": "Bambu PAHT-CF",
|
||||
"GFN05": "Bambu PA6-CF",
|
||||
"GFN06": "Bambu PPA-CF",
|
||||
"GFN08": "Bambu PA6-GF",
|
||||
"GFN96": "Generic PPA-GF",
|
||||
"GFN97": "Generic PPA-CF",
|
||||
@ -64,9 +74,12 @@
|
||||
"GFS98": "Generic HIPS",
|
||||
"GFS99": "Generic PVA",
|
||||
"GFT01": "Bambu PET-CF",
|
||||
"GFT02": "Bambu PPS-CF",
|
||||
"GFT97": "Generic PPS",
|
||||
"GFT98": "Generic PPS-CF",
|
||||
"GFU00": "Bambu TPU 95A HF",
|
||||
"GFU01": "Bambu TPU 95A",
|
||||
"GFU02": "Bambu TPU for AMS",
|
||||
"GFU98": "Generic TPU for AMS",
|
||||
"GFU99": "Generic TPU"
|
||||
}
|
@ -22,6 +22,7 @@ from .utils import (
|
||||
get_generic_AMS_HMS_error_code,
|
||||
get_HMS_severity,
|
||||
get_HMS_module,
|
||||
set_temperature_to_gcode,
|
||||
)
|
||||
from .const import (
|
||||
LOGGER,
|
||||
@ -32,6 +33,7 @@ from .const import (
|
||||
SPEED_PROFILE,
|
||||
GCODE_STATE_OPTIONS,
|
||||
PRINT_TYPE_OPTIONS,
|
||||
TempEnum,
|
||||
)
|
||||
from .commands import (
|
||||
CHAMBER_LIGHT_ON,
|
||||
@ -42,7 +44,7 @@ from .commands import (
|
||||
class Device:
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self.temperature = Temperature()
|
||||
self.temperature = Temperature(client = client)
|
||||
self.lights = Lights(client = client)
|
||||
self.info = Info(client = client)
|
||||
self.print_job = PrintJob(client = client)
|
||||
@ -53,7 +55,7 @@ class Device:
|
||||
self.external_spool = ExternalSpool(client = client)
|
||||
self.hms = HMSList(client = client)
|
||||
self.print_error = PrintErrorList(client = client)
|
||||
self.camera = Camera()
|
||||
self.camera = Camera(client = client)
|
||||
self.home_flag = HomeFlag(client=client)
|
||||
self.push_all_data = None
|
||||
self.get_version_data = None
|
||||
@ -77,9 +79,7 @@ class Device:
|
||||
send_event = send_event | self.camera.print_update(data = data)
|
||||
send_event = send_event | self.home_flag.print_update(data = data)
|
||||
|
||||
if send_event and self._client.callback is not None:
|
||||
LOGGER.debug("event_printer_data_update")
|
||||
self._client.callback("event_printer_data_update")
|
||||
self._client.callback("event_printer_data_update")
|
||||
|
||||
if data.get("msg", 0) == 0:
|
||||
self.push_all_data = data
|
||||
@ -91,9 +91,19 @@ class Device:
|
||||
if data.get("command") == "get_version":
|
||||
self.get_version_data = data
|
||||
|
||||
def _supports_temperature_set(self):
|
||||
# When talking to the Bambu cloud mqtt, setting the temperatures is allowed.
|
||||
if self.info.mqtt_mode == "bambu_cloud":
|
||||
return True
|
||||
# X1* have not yet blocked setting the temperatures when in nybrid connection mode.
|
||||
if self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E":
|
||||
return True
|
||||
# What's left is P1 and A1 printers that we are connecting by local mqtt. These are supported only in pure Lan Mode.
|
||||
return not self._client.bambu_cloud.bambu_connected
|
||||
|
||||
def supports_feature(self, feature):
|
||||
if feature == Features.AUX_FAN:
|
||||
return True
|
||||
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
|
||||
elif feature == Features.CHAMBER_LIGHT:
|
||||
return True
|
||||
elif feature == Features.CHAMBER_FAN:
|
||||
@ -124,6 +134,11 @@ class Device:
|
||||
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
|
||||
elif feature == Features.MANUAL_MODE:
|
||||
return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI"
|
||||
elif feature == Features.AMS_FILAMENT_REMAINING:
|
||||
# Technically this is not the AMS Lite but that's currently tied to only these printer types.
|
||||
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
|
||||
elif feature == Features.SET_TEMPERATURE:
|
||||
return self._supports_temperature_set()
|
||||
|
||||
return False
|
||||
|
||||
@ -183,15 +198,13 @@ class Lights:
|
||||
def TurnChamberLightOn(self):
|
||||
self.chamber_light = "on"
|
||||
self.chamber_light_override = "on"
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_light_update")
|
||||
self._client.callback("event_light_update")
|
||||
self._client.publish(CHAMBER_LIGHT_ON)
|
||||
|
||||
def TurnChamberLightOff(self):
|
||||
self.chamber_light = "off"
|
||||
self.chamber_light_override = "off"
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_light_update")
|
||||
self._client.callback("event_light_update")
|
||||
self._client.publish(CHAMBER_LIGHT_OFF)
|
||||
|
||||
|
||||
@ -203,7 +216,8 @@ class Camera:
|
||||
rtsp_url: str
|
||||
timelapse: str
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self.recording = ''
|
||||
self.resolution = ''
|
||||
self.rtsp_url = None
|
||||
@ -225,7 +239,10 @@ class Camera:
|
||||
self.timelapse = data.get("ipcam", {}).get("timelapse", self.timelapse)
|
||||
self.recording = data.get("ipcam", {}).get("ipcam_record", self.recording)
|
||||
self.resolution = data.get("ipcam", {}).get("resolution", self.resolution)
|
||||
self.rtsp_url = data.get("ipcam", {}).get("rtsp_url", self.rtsp_url)
|
||||
if self._client._enable_camera:
|
||||
self.rtsp_url = data.get("ipcam", {}).get("rtsp_url", self.rtsp_url)
|
||||
else:
|
||||
self.rtsp_url = None
|
||||
|
||||
return (old_data != f"{self.__dict__}")
|
||||
|
||||
@ -238,7 +255,8 @@ class Temperature:
|
||||
nozzle_temp: int
|
||||
target_nozzle_temp: int
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self.bed_temp = 0
|
||||
self.target_bed_temp = 0
|
||||
self.chamber_temp = 0
|
||||
@ -256,6 +274,20 @@ class Temperature:
|
||||
|
||||
return (old_data != f"{self.__dict__}")
|
||||
|
||||
def set_target_temp(self, temp: TempEnum, temperature: int):
|
||||
command = set_temperature_to_gcode(temp, temperature)
|
||||
|
||||
# if type == TempEnum.HEATBED:
|
||||
# self.bed_temp = temperature
|
||||
# elif type == TempEnum.NOZZLE:
|
||||
# self.nozzle_temp = temperature
|
||||
|
||||
LOGGER.debug(command)
|
||||
self._client.publish(command)
|
||||
|
||||
self._client.callback("event_printer_data_update")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Fans:
|
||||
"""Return all fan related info"""
|
||||
@ -335,8 +367,7 @@ class Fans:
|
||||
LOGGER.debug(command)
|
||||
self._client.publish(command)
|
||||
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_printer_data_update")
|
||||
self._client.callback("event_printer_data_update")
|
||||
|
||||
def get_fan_speed(self, fan: FansEnum) -> int:
|
||||
if fan == FansEnum.PART_COOLING:
|
||||
@ -384,7 +415,7 @@ class PrintJob:
|
||||
values = {}
|
||||
for i in range(16):
|
||||
if self._ams_print_weights[i] != 0:
|
||||
values[f"AMS Slot {i}"] = self._ams_print_weights[i]
|
||||
values[f"AMS Slot {i+1}"] = self._ams_print_weights[i]
|
||||
return values
|
||||
|
||||
@property
|
||||
@ -392,7 +423,7 @@ class PrintJob:
|
||||
values = {}
|
||||
for i in range(16):
|
||||
if self._ams_print_lengths[i] != 0:
|
||||
values[f"AMS Slot {i}"] = self._ams_print_lengths[i]
|
||||
values[f"AMS Slot {i+1}"] = self._ams_print_lengths[i]
|
||||
return values
|
||||
|
||||
def __init__(self, client):
|
||||
@ -450,7 +481,8 @@ class PrintJob:
|
||||
self.gcode_file = data.get("gcode_file", self.gcode_file)
|
||||
self.print_type = data.get("print_type", self.print_type)
|
||||
if self.print_type.lower() not in PRINT_TYPE_OPTIONS:
|
||||
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'")
|
||||
if self.print_type != "":
|
||||
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'")
|
||||
self.print_type = "unknown"
|
||||
self.subtask_name = data.get("subtask_name", self.subtask_name)
|
||||
self.file_type_icon = "mdi:file" if self.print_type != "cloud" else "mdi:cloud-outline"
|
||||
@ -471,9 +503,7 @@ class PrintJob:
|
||||
if data.get("mc_remaining_time") is not None:
|
||||
existing_remaining_time = self.remaining_time
|
||||
self.remaining_time = data.get("mc_remaining_time")
|
||||
if self.start_time is None:
|
||||
self.end_time = None
|
||||
elif existing_remaining_time != self.remaining_time:
|
||||
if existing_remaining_time != self.remaining_time:
|
||||
self.end_time = get_end_time(self.remaining_time)
|
||||
LOGGER.debug(f"END TIME2: {self.end_time}")
|
||||
|
||||
@ -482,8 +512,7 @@ class PrintJob:
|
||||
currently_idle = self.gcode_state == "IDLE" or self.gcode_state == "FAILED" or self.gcode_state == "FINISH"
|
||||
|
||||
if previously_idle and not currently_idle:
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_print_started")
|
||||
self._client.callback("event_print_started")
|
||||
|
||||
# Generate the start_time for P1P/S when printer moves from idle to another state. Original attempt with remaining time
|
||||
# becoming non-zero didn't work as it never bounced to zero in at least the scenario where a print was canceled.
|
||||
@ -507,20 +536,17 @@ class PrintJob:
|
||||
isCanceledPrint = False
|
||||
if data.get("print_error") == 50348044 and self.print_error == 0:
|
||||
isCanceledPrint = True
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_print_canceled")
|
||||
self._client.callback("event_print_canceled")
|
||||
self.print_error = data.get("print_error", self.print_error)
|
||||
|
||||
# Handle print failed
|
||||
if previous_gcode_state != "unknown" and previous_gcode_state != "FAILED" and self.gcode_state == "FAILED":
|
||||
if not isCanceledPrint:
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_print_failed")
|
||||
self._client.callback("event_print_failed")
|
||||
|
||||
# Handle print finish
|
||||
if previous_gcode_state != "unknown" and previous_gcode_state != "FINISH" and self.gcode_state == "FINISH":
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_print_finished")
|
||||
self._client.callback("event_print_finished")
|
||||
|
||||
if currently_idle and not previously_idle and previous_gcode_state != "unknown":
|
||||
if self.start_time != None:
|
||||
@ -673,8 +699,7 @@ class Info:
|
||||
def set_online(self, online):
|
||||
if self.online != online:
|
||||
self.online = online
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_printer_data_update")
|
||||
self._client.callback("event_printer_data_update")
|
||||
|
||||
def info_update(self, data):
|
||||
|
||||
@ -703,8 +728,7 @@ class Info:
|
||||
LOGGER.debug(f"Device is {self.device_type}")
|
||||
self.hw_ver = get_hw_version(modules, self.hw_ver)
|
||||
self.sw_ver = get_sw_version(modules, self.sw_ver)
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_printer_info_update")
|
||||
self._client.callback("event_printer_info_update")
|
||||
|
||||
def print_update(self, data) -> bool:
|
||||
old_data = f"{self.__dict__}"
|
||||
@ -796,6 +820,12 @@ class Info:
|
||||
@dataclass
|
||||
class AMSInstance:
|
||||
"""Return all AMS instance related info"""
|
||||
serial: str
|
||||
sw_version: str
|
||||
hw_version: str
|
||||
humidity_index: int
|
||||
temperature: int
|
||||
tray: list["AMSTray"]
|
||||
|
||||
def __init__(self, client):
|
||||
self.serial = ""
|
||||
@ -813,11 +843,14 @@ class AMSInstance:
|
||||
@dataclass
|
||||
class AMSList:
|
||||
"""Return all AMS related info"""
|
||||
tray_now: int
|
||||
data: list[AMSInstance]
|
||||
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self.tray_now = 0
|
||||
self.data = [None] * 4
|
||||
self._first_initialization_done = False
|
||||
|
||||
def info_update(self, data):
|
||||
old_data = f"{self.__dict__}"
|
||||
@ -859,7 +892,7 @@ class AMSList:
|
||||
|
||||
if index != -1:
|
||||
# Sometimes we get incomplete version data. We have to skip if that occurs since the serial number is
|
||||
# requires as part of the home assistant device identity.
|
||||
# required as part of the home assistant device identity.
|
||||
if not module['sn'] == '':
|
||||
# May get data before info so create entries if necessary
|
||||
if self.data[index] is None:
|
||||
@ -874,12 +907,14 @@ class AMSList:
|
||||
if self.data[index].hw_version != module['hw_ver']:
|
||||
data_changed = True
|
||||
self.data[index].hw_version = module['hw_ver']
|
||||
elif not self._first_initialization_done:
|
||||
self._first_initialization_done = True
|
||||
data_changed = True
|
||||
|
||||
data_changed = data_changed or (old_data != f"{self.__dict__}")
|
||||
|
||||
if data_changed:
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_ams_info_update")
|
||||
self._client.callback("event_ams_info_update")
|
||||
|
||||
def print_update(self, data) -> bool:
|
||||
old_data = f"{self.__dict__}"
|
||||
@ -969,6 +1004,19 @@ class AMSList:
|
||||
@dataclass
|
||||
class AMSTray:
|
||||
"""Return all AMS tray related info"""
|
||||
empty: bool
|
||||
idx: int
|
||||
name: str
|
||||
type: str
|
||||
sub_brands: str
|
||||
color: str
|
||||
nozzle_temp_min: int
|
||||
nozzle_temp_max: int
|
||||
remain: int
|
||||
k: float
|
||||
tag_uid: str
|
||||
tray_uuid: str
|
||||
|
||||
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
@ -982,7 +1030,8 @@ class AMSTray:
|
||||
self.nozzle_temp_max = 0
|
||||
self.remain = 0
|
||||
self.k = 0
|
||||
self.tag_uid = "0000000000000000"
|
||||
self.tag_uid = ""
|
||||
self.tray_uuid = ""
|
||||
|
||||
def print_update(self, data) -> bool:
|
||||
old_data = f"{self.__dict__}"
|
||||
@ -998,7 +1047,8 @@ class AMSTray:
|
||||
self.nozzle_temp_min = 0
|
||||
self.nozzle_temp_max = 0
|
||||
self.remain = 0
|
||||
self.tag_uid = "0000000000000000"
|
||||
self.tag_uid = ""
|
||||
self.tray_uuid = ""
|
||||
self.k = 0
|
||||
else:
|
||||
self.empty = False
|
||||
@ -1011,6 +1061,7 @@ class AMSTray:
|
||||
self.nozzle_temp_max = data.get('nozzle_temp_max', self.nozzle_temp_max)
|
||||
self.remain = data.get('remain', self.remain)
|
||||
self.tag_uid = data.get('tag_uid', self.tag_uid)
|
||||
self.tray_uuid = data.get('tray_uuid', self.tray_uuid)
|
||||
self.k = data.get('k', self.k)
|
||||
|
||||
return (old_data != f"{self.__dict__}")
|
||||
@ -1092,8 +1143,7 @@ class Speed:
|
||||
command = SPEED_PROFILE_TEMPLATE
|
||||
command['print']['param'] = f"{id}"
|
||||
self._client.publish(command)
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_speed_update")
|
||||
self._client.callback("event_speed_update")
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -1158,7 +1208,8 @@ class HMSList:
|
||||
attr = int(hms['attr'])
|
||||
code = int(hms['code'])
|
||||
hms_notif = HMSNotification(attr=attr, code=code)
|
||||
errors[f"{index}-Error"] = f"HMS_{hms_notif.hms_code}: {get_HMS_error_text(hms_notif.hms_code)}"
|
||||
errors[f"{index}-Code"] = f"HMS_{hms_notif.hms_code}"
|
||||
errors[f"{index}-Error"] = get_HMS_error_text(hms_notif.hms_code)
|
||||
errors[f"{index}-Wiki"] = hms_notif.wiki_url
|
||||
errors[f"{index}-Severity"] = hms_notif.severity
|
||||
#LOGGER.debug(f"HMS error for '{hms_notif.module}' and severity '{hms_notif.severity}': HMS_{hms_notif.hms_code}")
|
||||
@ -1169,8 +1220,7 @@ class HMSList:
|
||||
self._errors = errors
|
||||
if self._count != 0:
|
||||
LOGGER.warning(f"HMS ERRORS: {errors}")
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_hms_errors")
|
||||
self._client.callback("event_printer_error")
|
||||
return True
|
||||
|
||||
return False
|
||||
@ -1188,11 +1238,10 @@ class HMSList:
|
||||
class PrintErrorList:
|
||||
"""Return all print_error related info"""
|
||||
_error: dict
|
||||
_count: int
|
||||
|
||||
def __init__(self, client):
|
||||
self._error = None
|
||||
self._client = client
|
||||
self._error = {}
|
||||
|
||||
def print_update(self, data) -> bool:
|
||||
# Example payload:
|
||||
@ -1202,20 +1251,19 @@ class PrintErrorList:
|
||||
# 'Unable to feed filament into the extruder. This could be due to entangled filament or a stuck spool. If not, please check if the AMS PTFE tube is connected.'
|
||||
|
||||
if 'print_error' in data.keys():
|
||||
errors = {}
|
||||
errors = None
|
||||
print_error_code = data.get('print_error')
|
||||
if print_error_code != 0:
|
||||
hex_conversion = f'0{int(print_error_code):x}'
|
||||
print_error_code_hex = hex_conversion[slice(0,4,1)] + "_" + hex_conversion[slice(4,8,1)]
|
||||
errors = {}
|
||||
errors[f"Code"] = f"{print_error_code_hex.upper()}"
|
||||
errors[f"Error"] = f"{print_error_code_hex.upper()}: {get_print_error_text(print_error_code)}"
|
||||
errors[f"code"] = print_error_code_hex.upper()
|
||||
errors[f"error"] = get_print_error_text(print_error_code)
|
||||
# LOGGER.warning(f"PRINT ERRORS: {errors}") # This will emit a message to home assistant log every 1 second if enabled
|
||||
|
||||
if self._error != errors:
|
||||
self._error = errors
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_print_error")
|
||||
self._client.callback("event_print_error")
|
||||
|
||||
# We send the error event directly so always return False for the general data event.
|
||||
return False
|
||||
@ -1232,6 +1280,8 @@ class PrintErrorList:
|
||||
@dataclass
|
||||
class HMSNotification:
|
||||
"""Return an HMS object and all associated details"""
|
||||
attr: int
|
||||
code: int
|
||||
|
||||
def __init__(self, attr: int = 0, code: int = 0):
|
||||
self.attr = attr
|
||||
@ -1264,13 +1314,23 @@ class ChamberImage:
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self._bytes = bytearray()
|
||||
self._image_last_updated = datetime.now()
|
||||
|
||||
def set_jpeg(self, bytes):
|
||||
self._bytes = bytes
|
||||
self._image_last_updated = datetime.now()
|
||||
self._client.callback("event_printer_chamber_image_update")
|
||||
|
||||
def get_jpeg(self) -> bytearray:
|
||||
return self._bytes.copy()
|
||||
|
||||
def get_last_update_time(self) -> datetime:
|
||||
return self._image_last_updated
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
return self._client._enable_camera
|
||||
|
||||
@dataclass
|
||||
class CoverImage:
|
||||
"""Returns the cover image from the Bambu API"""
|
||||
@ -1279,8 +1339,7 @@ class CoverImage:
|
||||
self._client = client
|
||||
self._bytes = bytearray()
|
||||
self._image_last_updated = datetime.now()
|
||||
if self._client.callback is not None:
|
||||
self._client.callback("event_printer_cover_image_update")
|
||||
self._client.callback("event_printer_cover_image_update")
|
||||
|
||||
def set_jpeg(self, bytes):
|
||||
self._bytes = bytes
|
||||
|
@ -11,7 +11,9 @@ from .const import (
|
||||
HMS_SEVERITY_LEVELS,
|
||||
HMS_MODULES,
|
||||
LOGGER,
|
||||
BAMBU_URL,
|
||||
FansEnum,
|
||||
TempEnum
|
||||
)
|
||||
from .commands import SEND_GCODE_TEMPLATE
|
||||
|
||||
@ -48,6 +50,18 @@ def fan_percentage_to_gcode(fan: FansEnum, percentage: int):
|
||||
return command
|
||||
|
||||
|
||||
def set_temperature_to_gcode(temp: TempEnum, temperature: int):
|
||||
"""Converts a temperature to the gcode command to set that"""
|
||||
if temp == TempEnum.NOZZLE:
|
||||
tempCommand = "M104"
|
||||
elif temp == TempEnum.HEATBED:
|
||||
tempCommand = "M140"
|
||||
|
||||
command = SEND_GCODE_TEMPLATE
|
||||
command['print']['param'] = f"{tempCommand} S{temperature}\n"
|
||||
return command
|
||||
|
||||
|
||||
def to_whole(number):
|
||||
if not number:
|
||||
return 0
|
||||
@ -59,8 +73,8 @@ def get_filament_name(idx, custom_filaments: dict):
|
||||
result = FILAMENT_NAMES.get(idx, "unknown")
|
||||
if result == "unknown" and idx != "":
|
||||
result = custom_filaments.get(idx, "unknown")
|
||||
if result == "unknown" and idx != "":
|
||||
LOGGER.debug(f"UNKNOWN FILAMENT IDX: '{idx}'")
|
||||
# if result == "unknown" and idx != "":
|
||||
# LOGGER.debug(f"UNKNOWN FILAMENT IDX: '{idx}'")
|
||||
return result
|
||||
|
||||
|
||||
@ -225,3 +239,10 @@ def round_minute(date: datetime = None, round_to: int = 1):
|
||||
date = date.replace(second=0, microsecond=0)
|
||||
delta = date.minute % round_to
|
||||
return date.replace(minute=date.minute - delta)
|
||||
|
||||
|
||||
def get_Url(url: str, region: str):
|
||||
urlstr = BAMBU_URL[url]
|
||||
if region == "China":
|
||||
urlstr = urlstr.replace('.com', '.cn')
|
||||
return urlstr
|
||||
|
@ -25,7 +25,7 @@ class IdleState(APrinterState):
|
||||
filesystem_root = (
|
||||
"file:///mnt/sdcard/"
|
||||
if self._printer._settings.get(["device_type"]) in ["X1", "X1C"]
|
||||
else "file:///"
|
||||
else "file:///sdcard/"
|
||||
)
|
||||
|
||||
print_command = {
|
||||
|
@ -37,14 +37,14 @@ class PausedState(APrinterState):
|
||||
|
||||
def start_new_print(self):
|
||||
if self._printer.bambu_client.connected:
|
||||
if self._printer.bambu_client.publish(pybambu.commands.RESUME):
|
||||
if self._printer.bambu_client.publish(octoprint_bambu_printer.printer.pybambu.commands.RESUME):
|
||||
self._log.info("print resumed")
|
||||
else:
|
||||
self._log.info("print resume failed")
|
||||
|
||||
def cancel_print(self):
|
||||
if self._printer.bambu_client.connected:
|
||||
if self._printer.bambu_client.publish(pybambu.commands.STOP):
|
||||
if self._printer.bambu_client.publish(octoprint_bambu_printer.printer.pybambu.commands.STOP):
|
||||
self._log.info("print cancelled")
|
||||
self._printer.finalize_print_job()
|
||||
else:
|
||||
|
@ -22,7 +22,7 @@ class PrintingState(APrinterState):
|
||||
|
||||
def __init__(self, printer: BambuVirtualPrinter) -> None:
|
||||
super().__init__(printer)
|
||||
self._current_print_job = None
|
||||
self._printer.current_print_job = None
|
||||
self._is_printing = False
|
||||
self._sd_printing_thread = None
|
||||
|
||||
@ -40,12 +40,15 @@ class PrintingState(APrinterState):
|
||||
self._printer.current_print_job = None
|
||||
|
||||
def _start_worker_thread(self):
|
||||
self._is_printing = True
|
||||
if self._sd_printing_thread is None:
|
||||
self._is_printing = True
|
||||
self._sd_printing_thread = threading.Thread(target=self._printing_worker)
|
||||
self._sd_printing_thread.start()
|
||||
else:
|
||||
self._sd_printing_thread.join()
|
||||
|
||||
def _printing_worker(self):
|
||||
self._log.debug(f"_printing_worker before while loop: {self._printer.current_print_job}")
|
||||
while (
|
||||
self._is_printing
|
||||
and self._printer.current_print_job is not None
|
||||
@ -55,6 +58,7 @@ class PrintingState(APrinterState):
|
||||
self._printer.report_print_job_status()
|
||||
time.sleep(3)
|
||||
|
||||
self._log.debug(f"_printing_worker after while loop: {self._printer.current_print_job}")
|
||||
self.update_print_job_info()
|
||||
if (
|
||||
self._printer.current_print_job is not None
|
||||
@ -64,30 +68,34 @@ class PrintingState(APrinterState):
|
||||
|
||||
def update_print_job_info(self):
|
||||
print_job_info = self._printer.bambu_client.get_device().print_job
|
||||
task_name: str = print_job_info.subtask_name
|
||||
project_file_info = self._printer.project_files.get_file_by_stem(
|
||||
task_name, [".gcode", ".3mf"]
|
||||
)
|
||||
subtask_name: str = print_job_info.subtask_name
|
||||
gcode_file: str = print_job_info.gcode_file
|
||||
|
||||
self._log.debug(f"update_print_job_info: {print_job_info}")
|
||||
|
||||
project_file_info = self._printer.project_files.get_file_by_name(subtask_name) or self._printer.project_files.get_file_by_name(gcode_file)
|
||||
if project_file_info is None:
|
||||
self._log.debug(f"No 3mf file found for {print_job_info}")
|
||||
self._current_print_job = None
|
||||
self._printer.current_print_job = None
|
||||
self._printer.change_state(self._printer._state_idle)
|
||||
return
|
||||
|
||||
progress = print_job_info.print_percentage
|
||||
self._printer.current_print_job = PrintJob(project_file_info, progress)
|
||||
if print_job_info.gcode_state == "PREPARE" and progress == 100:
|
||||
progress = 0
|
||||
self._printer.current_print_job = PrintJob(project_file_info, progress, print_job_info.remaining_time, print_job_info.current_layer, print_job_info.total_layers)
|
||||
self._printer.select_project_file(project_file_info.path.as_posix())
|
||||
|
||||
def pause_print(self):
|
||||
if self._printer.bambu_client.connected:
|
||||
if self._printer.bambu_client.publish(pybambu.commands.PAUSE):
|
||||
if self._printer.bambu_client.publish(octoprint_bambu_printer.printer.pybambu.commands.PAUSE):
|
||||
self._log.info("print paused")
|
||||
else:
|
||||
self._log.info("print pause failed")
|
||||
|
||||
def cancel_print(self):
|
||||
if self._printer.bambu_client.connected:
|
||||
if self._printer.bambu_client.publish(pybambu.commands.STOP):
|
||||
if self._printer.bambu_client.publish(octoprint_bambu_printer.printer.pybambu.commands.STOP):
|
||||
self._log.info("print cancelled")
|
||||
self._printer.finalize_print_job()
|
||||
else:
|
||||
|
@ -18,6 +18,18 @@ $(function () {
|
||||
self.use_ams = true;
|
||||
self.ams_mapping = ko.observableArray([]);
|
||||
|
||||
self.job_info = ko.observable();
|
||||
|
||||
self.auth_type = ko.observable("");
|
||||
|
||||
self.show_password = ko.pureComputed(function(){
|
||||
return self.settingsViewModel.settings.plugins.bambu_printer.auth_token() === '';
|
||||
});
|
||||
|
||||
self.show_verification = ko.pureComputed(function(){
|
||||
return self.auth_type() !== '';
|
||||
});
|
||||
|
||||
self.ams_mapping_computed = function(){
|
||||
var output_list = [];
|
||||
var index = 0;
|
||||
@ -38,16 +50,34 @@ $(function () {
|
||||
|
||||
self.getAuthToken = function (data) {
|
||||
self.settingsViewModel.settings.plugins.bambu_printer.auth_token("");
|
||||
self.auth_type("");
|
||||
OctoPrint.simpleApiCommand("bambu_printer", "register", {
|
||||
"email": self.settingsViewModel.settings.plugins.bambu_printer.email(),
|
||||
"password": $("#bambu_cloud_password").val(),
|
||||
"region": self.settingsViewModel.settings.plugins.bambu_printer.region(),
|
||||
"auth_token": self.settingsViewModel.settings.plugins.bambu_printer.auth_token()
|
||||
})
|
||||
.done(function (response) {
|
||||
self.auth_type(response.auth_response);
|
||||
});
|
||||
};
|
||||
|
||||
self.verifyCode = function (data) {
|
||||
self.settingsViewModel.settings.plugins.bambu_printer.auth_token("");
|
||||
OctoPrint.simpleApiCommand("bambu_printer", "verify", {
|
||||
"password": $("#bambu_cloud_verify_code").val(),
|
||||
"auth_type": self.auth_type(),
|
||||
})
|
||||
.done(function (response) {
|
||||
console.log(response);
|
||||
self.settingsViewModel.settings.plugins.bambu_printer.auth_token(response.auth_token);
|
||||
self.settingsViewModel.settings.plugins.bambu_printer.username(response.username);
|
||||
if (response.auth_token) {
|
||||
self.settingsViewModel.settings.plugins.bambu_printer.auth_token(response.auth_token);
|
||||
self.settingsViewModel.settings.plugins.bambu_printer.username(response.username);
|
||||
self.auth_type("");
|
||||
} else if (response.error) {
|
||||
self.settingsViewModel.settings.plugins.bambu_printer.auth_token("");
|
||||
$("#bambu_cloud_verify_code").val("");
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
@ -92,6 +122,10 @@ $(function () {
|
||||
self.listHelper.updateItems(data.files);
|
||||
self.listHelper.resetPage();
|
||||
}
|
||||
|
||||
if (data.job_info !== undefined) {
|
||||
self.job_info(data.job_info);
|
||||
}
|
||||
};
|
||||
|
||||
self.onBeforeBinding = function () {
|
||||
|
@ -40,15 +40,24 @@
|
||||
<div class="control-group" data-bind="visible: !settingsViewModel.settings.plugins.bambu_printer.local_mqtt()">
|
||||
<label class="control-label">{{ _('Email') }}</label>
|
||||
<div class="controls">
|
||||
<input type="text" class="input-block-level" data-bind="value: settingsViewModel.settings.plugins.bambu_printer.email" title="{{ _('Registered email address') }}"></input>
|
||||
<input type="text" class="input-block-level" data-bind="value: settingsViewModel.settings.plugins.bambu_printer.email" title="{{ _('Registered email address') }}" autocomplete="off"></input>
|
||||
</div>
|
||||
</div>
|
||||
<div class="control-group" data-bind="visible: !settingsViewModel.settings.plugins.bambu_printer.local_mqtt()">
|
||||
<div class="control-group" data-bind="visible: !settingsViewModel.settings.plugins.bambu_printer.local_mqtt() && show_password()">
|
||||
<label class="control-label">{{ _('Password') }}</label>
|
||||
<div class="controls">
|
||||
<div class="input-block-level input-append" data-bind="css: {'input-append': !show_verification()}">
|
||||
<input id="bambu_cloud_password" type="password" class="input-text input-block-level" title="{{ _('Password to generate verification code') }}" autocomplete="new-password"></input>
|
||||
<span class="btn btn-primary add-on" data-bind="visible: !show_verification(), click: getAuthToken">{{ _('Login') }}</span>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="control-group" data-bind="visible: show_verification()">
|
||||
<label class="control-label">{{ _('Verify') }}</label>
|
||||
<div class="controls">
|
||||
<div class="input-block-level input-append">
|
||||
<input id="bambu_cloud_password" type="password" class="input-text input-block-level" title="{{ _('Password to generate Auth Token') }}"></input>
|
||||
<span class="btn btn-primary add-on" data-bind="click: getAuthToken">{{ _('Login') }}</span>
|
||||
<input id="bambu_cloud_verify_code" type="password" class="input-text input-block-level" title="{{ _('Verification code to generate auth token') }}"></input>
|
||||
<span class="btn btn-primary add-on" data-bind="click: verifyCode">{{ _('Verify') }}</span>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
@ -10,3 +10,6 @@
|
||||
</div>
|
||||
<!-- /ko -->
|
||||
</div>
|
||||
<div class="row-fluid" data-bind="visible: job_info">
|
||||
<div class="span6">{{ _('Layer') }}:</div><div class="span6" data-bind="text: function(){return (job_info.current_layer() + ' of ' + job_info.total_layers);}"></div>
|
||||
</div>
|
||||
|
6
setup.py
6
setup.py
@ -14,7 +14,7 @@ plugin_package = "octoprint_bambu_printer"
|
||||
plugin_name = "OctoPrint-BambuPrinter"
|
||||
|
||||
# The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module
|
||||
plugin_version = "0.1.8rc1"
|
||||
plugin_version = "0.1.8rc12"
|
||||
|
||||
# The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin
|
||||
# module
|
||||
@ -33,7 +33,7 @@ plugin_url = "https://github.com/jneilliii/OctoPrint-BambuPrinter"
|
||||
plugin_license = "AGPLv3"
|
||||
|
||||
# Any additional requirements besides OctoPrint should be listed here
|
||||
plugin_requires = ["paho-mqtt<2", "python-dateutil", "httpx[http2]>=0.27.0"]
|
||||
plugin_requires = ["paho-mqtt<2", "python-dateutil", "cloudscraper"]
|
||||
|
||||
### --------------------------------------------------------------------------------------------------------------------
|
||||
### More advanced options that you usually shouldn't have to touch follow after this point
|
||||
@ -43,7 +43,7 @@ plugin_requires = ["paho-mqtt<2", "python-dateutil", "httpx[http2]>=0.27.0"]
|
||||
# already be installed automatically if they exist. Note that if you add something here you'll also need to update
|
||||
# MANIFEST.in to match to ensure that python setup.py sdist produces a source distribution that contains all your
|
||||
# files. This is sadly due to how python's setup.py works, see also http://stackoverflow.com/a/14159430/2028598
|
||||
plugin_additional_data = []
|
||||
plugin_additional_data = ["octoprint_bambu_printer/printer/pybambu/filaments.json"]
|
||||
|
||||
# Any additional python packages you need to install with your plugin that are not contained in <plugin_package>.*
|
||||
plugin_additional_packages = []
|
||||
|
Reference in New Issue
Block a user