from __future__ import annotations import asyncio import queue import json import math import re import socket import ssl import struct import threading import time from dataclasses import dataclass from typing import Any import paho.mqtt.client as mqtt from .bambu_cloud import BambuCloud from .const import ( LOGGER, Features, ) from .models import Device, SlicerSettings from .commands import ( GET_VERSION, PUSH_ALL, START_PUSH, ) class WatchdogThread(threading.Thread): def __init__(self, client): self._client = client self._watchdog_fired = False self._stop_event = threading.Event() self._last_received_data = time.time() super().__init__() self.daemon = True self.setName(f"{self._client._device.info.device_type}-Watchdog-{threading.get_native_id()}") def stop(self): self._stop_event.set() def received_data(self): self._last_received_data = time.time() def run(self): LOGGER.info("Watchdog thread started.") WATCHDOG_TIMER = 30 while True: # Wait out the remainder of the watchdog delay or 1s, whichever is higher. interval = time.time() - self._last_received_data wait_time = max(1, WATCHDOG_TIMER - interval) if self._stop_event.wait(wait_time): # Stop event has been set. Exit thread. break interval = time.time() - self._last_received_data if not self._watchdog_fired and (interval > WATCHDOG_TIMER): LOGGER.debug(f"Watchdog fired. No data received for {math.floor(interval)} seconds for {self._client._serial}.") self._watchdog_fired = True self._client._on_watchdog_fired() elif interval < WATCHDOG_TIMER: self._watchdog_fired = False LOGGER.info("Watchdog thread exited.") class ChamberImageThread(threading.Thread): def __init__(self, client): self._client = client self._stop_event = threading.Event() super().__init__() self.daemon = True self.setName(f"{self._client._device.info.device_type}-Chamber-{threading.get_native_id()}") def stop(self): self._stop_event.set() def run(self): LOGGER.debug("Chamber image thread started.") auth_data = bytearray() username = 'bblp' access_code = self._client._access_code hostname = self._client.host port = 6000 MAX_CONNECT_ATTEMPTS = 12 connect_attempts = 0 auth_data += struct.pack(" 0: img += dr if len(img) > payload_size: # We got more data than we expected. LOGGER.error(f"Unexpected image payload received: {len(img)} > {payload_size}") # Reset buffer img = None elif len(img) == payload_size: # We should have the full image now. if img[:4] != jpeg_start: LOGGER.error("JPEG start magic bytes missing.") elif img[-2:] != jpeg_end: LOGGER.error("JPEG end magic bytes missing.") else: # Content is as expected. Send it. self._client.on_jpeg_received(img) # Reset buffer img = None # else: # Otherwise we need to continue looping without reseting the buffer to receive the remaining data # and without delaying. elif len(dr) == 16: # We got the header bytes. Get the expected payload size from it and create the image buffer bytearray. # Reset connect_attempts now we know the connect was successful. connect_attempts = 0 img = bytearray() payload_size = int.from_bytes(dr[0:3], byteorder='little') elif len(dr) == 0: # This occurs if the wrong access code was provided. LOGGER.error("Chamber image connection rejected by the printer. Check provided access code and IP address.") # Sleep for a short while and then re-attempt the connection. time.sleep(5) break else: LOGGER.error(f"UNEXPECTED DATA RECEIVED: {len(dr)}") time.sleep(1) except OSError as e: if e.errno == 113: LOGGER.debug("Host is unreachable") else: LOGGER.error("A Chamber Image thread outer exception occurred:") LOGGER.error(f"Exception. Type: {type(e)} Args: {e}") if not self._stop_event.is_set(): time.sleep(1) # Avoid a tight loop if this is a persistent error. except Exception as e: LOGGER.error(f"A Chamber Image thread outer exception occurred:") LOGGER.error(f"Exception. Type: {type(e)} Args: {e}") if not self._stop_event.is_set(): time.sleep(1) # Avoid a tight loop if this is a persistent error. LOGGER.debug("Chamber image thread exited.") class MqttThread(threading.Thread): def __init__(self, client): self._client = client self._stop_event = threading.Event() super().__init__() self.daemon = True self.setName(f"{self._client._device.info.device_type}-Mqtt-{threading.get_native_id()}") def stop(self): self._stop_event.set() def run(self): LOGGER.info("MQTT listener thread started.") exceptionSeen = "" while True: try: host = self._client.host if self._client.local_mqtt else self._client.bambu_cloud.cloud_mqtt_host LOGGER.debug(f"Connect: Attempting Connection to {host}") self._client.client.connect(host, self._client._port, keepalive=5) LOGGER.debug("Starting listen loop") self._client.client.loop_forever() LOGGER.debug("Ended listen loop.") break except TimeoutError as e: if exceptionSeen != "TimeoutError": LOGGER.debug(f"TimeoutError: {e}.") exceptionSeen = "TimeoutError" time.sleep(5) except ConnectionError as e: if exceptionSeen != "ConnectionError": LOGGER.debug(f"ConnectionError: {e}.") exceptionSeen = "ConnectionError" time.sleep(5) except OSError as e: if e.errno == 113: if exceptionSeen != "OSError113": LOGGER.debug(f"OSError: {e}.") exceptionSeen = "OSError113" time.sleep(5) else: LOGGER.error("A listener loop thread exception occurred:") LOGGER.error(f"Exception. Type: {type(e)} Args: {e}") time.sleep(1) # Avoid a tight loop if this is a persistent error. except Exception as e: LOGGER.error("A listener loop thread exception occurred:") LOGGER.error(f"Exception. Type: {type(e)} Args: {e}") time.sleep(1) # Avoid a tight loop if this is a persistent error. if self._client.client is None: break self._client.client.disconnect() LOGGER.info("MQTT listener thread exited.") @dataclass class BambuClient: """Initialize Bambu Client to connect to MQTT Broker""" _watchdog = None _camera = None _usage_hours: float def __init__(self, device_type: str, serial: str, host: str, local_mqtt: bool, region: str, email: str, username: str, auth_token: str, access_code: str, usage_hours: float = 0, manual_refresh_mode: bool = False, chamber_image: bool = True): self.callback = None self.host = host self.local_mqtt = local_mqtt self._serial = serial self._auth_token = auth_token self._access_code = access_code self._username = username self._connected = False self._device_type = device_type self._usage_hours = usage_hours self._port = 1883 self._refreshed = False self._manual_refresh_mode = manual_refresh_mode self._device = Device(self) self.bambu_cloud = BambuCloud(region, email, username, auth_token) self.slicer_settings = SlicerSettings(self) self.use_chamber_image = chamber_image @property def connected(self): """Return if connected to server""" return self._connected @property def manual_refresh_mode(self): """Return if the integration is running in poll mode""" return self._manual_refresh_mode async def set_manual_refresh_mode(self, on): self._manual_refresh_mode = on if self._manual_refresh_mode: # Disconnect from the server. User must manually hit the refresh button to connect to refresh and then it will immediately disconnect. self.disconnect() else: # Reconnect normally self.connect(self.callback) def setup_tls(self): self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE) self.client.tls_insecure_set(True) def connect(self, callback): """Connect to the MQTT Broker""" self.client = mqtt.Client() self.callback = callback self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_message # Set aggressive reconnect polling. self.client.reconnect_delay_set(min_delay=1, max_delay=1) # Run the blocking tls_set method in a separate thread self.setup_tls() self._port = 8883 if self.local_mqtt: self.client.username_pw_set("bblp", password=self._access_code) else: self.client.username_pw_set(self._username, password=self._auth_token) LOGGER.debug("Starting MQTT listener thread") self._mqtt = MqttThread(self) self._mqtt.start() def subscribe_and_request_info(self): LOGGER.debug("Loading slicer settings...") self.slicer_settings.update() LOGGER.debug("Now subscribing...") self.subscribe() LOGGER.debug("On Connect: Getting version info") self.publish(GET_VERSION) LOGGER.debug("On Connect: Request push all") self.publish(PUSH_ALL) def on_connect(self, client_: mqtt.Client, userdata: None, flags: dict[str, Any], result_code: int, properties: mqtt.Properties | None = None, ): """Handle connection""" LOGGER.info("On Connect: Connected to printer") self._on_connect() def _on_connect(self): self._connected = True self.subscribe_and_request_info() LOGGER.debug("Starting watchdog thread") self._watchdog = WatchdogThread(self) self._watchdog.start() if not self._device.supports_feature(Features.CAMERA_RTSP): if self._device.supports_feature(Features.CAMERA_IMAGE): if self.use_chamber_image: LOGGER.debug("Starting Chamber Image thread") self._camera = ChamberImageThread(self) self._camera.start() elif (self.host == "") or (self._access_code == ""): LOGGER.debug("Skipping camera setup as local access details not provided.") def try_on_connect(self, client_: mqtt.Client, userdata: None, flags: dict[str, Any], result_code: int, properties: mqtt.Properties | None = None, ): """Handle connection""" LOGGER.info("On Connect: Connected to printer") self._connected = True LOGGER.debug("Now test subscribing...") self.subscribe() # For the initial configuration connection attempt, we just need version info. LOGGER.debug("On Connect: Getting version info") self.publish(GET_VERSION) def on_disconnect(self, client_: mqtt.Client, userdata: None, result_code: int): """Called when MQTT Disconnects""" LOGGER.warn(f"On Disconnect: Printer disconnected with error code: {result_code}") self._on_disconnect() def _on_disconnect(self): LOGGER.debug("_on_disconnect: Lost connection to the printer") self._connected = False self._device.info.set_online(False) if self._watchdog is not None: LOGGER.debug("Stopping watchdog thread") self._watchdog.stop() self._watchdog.join() if self._camera is not None: LOGGER.debug("Stopping camera thread") self._camera.stop() self._camera.join() def _on_watchdog_fired(self): LOGGER.info("Watch dog fired") self._device.info.set_online(False) self.publish(START_PUSH) def on_jpeg_received(self, bytes): self._device.chamber_image.set_jpeg(bytes) def on_message(self, client, userdata, message): """Return the payload when received""" try: # X1 mqtt payload is inconsistent. Adjust it for consistent logging. clean_msg = re.sub(r"\\n *", "", str(message.payload)) if self._refreshed: LOGGER.debug(f"Received data: {clean_msg}") json_data = json.loads(message.payload) if json_data.get("event"): # These are events from the bambu cloud mqtt feed and allow us to detect when a local # device has connected/disconnected (e.g. turned on/off) if json_data.get("event").get("event") == "client.connected": LOGGER.debug("Client connected event received.") self._on_disconnect() # We aren't guaranteed to recieve a client.disconnected event. self._on_connect() elif json_data.get("event").get("event") == "client.disconnected": LOGGER.debug("Client disconnected event received.") self._on_disconnect() else: self._device.info.set_online(True) self._watchdog.received_data() if json_data.get("print"): self._device.print_update(data=json_data.get("print")) # Once we receive data, if in manual refresh mode, we disconnect again. if self._manual_refresh_mode: self.disconnect() if json_data.get("print").get("msg", 0) == 0: self._refreshed= False elif json_data.get("info") and json_data.get("info").get("command") == "get_version": LOGGER.debug("Got Version Data") self._device.info_update(data=json_data.get("info")) except Exception as e: LOGGER.error("An exception occurred processing a message:", exc_info=e) def subscribe(self): """Subscribe to report topic""" LOGGER.debug(f"Subscribing: device/{self._serial}/report") self.client.subscribe(f"device/{self._serial}/report") def publish(self, msg): """Publish a custom message""" result = self.client.publish(f"device/{self._serial}/request", json.dumps(msg)) status = result[0] if status == 0: LOGGER.debug(f"Sent {msg} to topic device/{self._serial}/request") return True LOGGER.error(f"Failed to send message to topic device/{self._serial}/request") return False async def refresh(self): """Force refresh data""" if self._manual_refresh_mode: self.connect(self.callback) else: LOGGER.debug("Force Refresh: Getting Version Info") self._refreshed = True self.publish(GET_VERSION) LOGGER.debug("Force Refresh: Request Push All") self._refreshed = True self.publish(PUSH_ALL) self.slicer_settings.update() def get_device(self): """Return device""" return self._device def disconnect(self): """Disconnect the Bambu Client from server""" LOGGER.debug(" Disconnect: Client Disconnecting") if self.client is not None: self.client.disconnect() self.client = None async def try_connection(self): """Test if we can connect to an MQTT broker.""" LOGGER.debug("Try Connection") result: queue.Queue[bool] = queue.Queue(maxsize=1) def on_message(client, userdata, message): json_data = json.loads(message.payload) LOGGER.debug(f"Try Connection: Got '{json_data}'") if json_data.get("info") and json_data.get("info").get("command") == "get_version": LOGGER.debug("Got Version Command Data") self._device.info_update(data=json_data.get("info")) result.put(True) self.client = mqtt.Client() self.client.on_connect = self.try_on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = on_message # Run the blocking tls_set method in a separate thread loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.setup_tls) if self.local_mqtt: self.client.username_pw_set("bblp", password=self._access_code) else: self.client.username_pw_set(self._username, password=self._auth_token) self._port = 8883 LOGGER.debug("Test connection: Connecting to %s", self.host) try: self.client.connect(self.host, self._port) self.client.loop_start() if result.get(timeout=10): return True except OSError as e: return False except queue.Empty: return False finally: self.disconnect() async def __aenter__(self): """Async enter. Returns: The BambuLab object. """ return self async def __aexit__(self, *_exc_info): """Async exit. Args: _exc_info: Exec type. """ self.disconnect()