from __future__ import annotations import collections from dataclasses import dataclass, field import math from pathlib import Path import queue import re import threading import time from octoprint_bambu_printer.printer.file_system.cached_file_view import CachedFileView from octoprint_bambu_printer.printer.file_system.file_info import FileInfo from octoprint_bambu_printer.printer.print_job import PrintJob from pybambu import BambuClient, commands import logging import logging.handlers import paho.mqtt.client as mqtt import json import ssl from octoprint.util import RepeatedTimer from octoprint_bambu_printer.printer.states.a_printer_state import APrinterState from octoprint_bambu_printer.printer.states.idle_state import IdleState from .printer_serial_io import PrinterSerialIO from .states.paused_state import PausedState from .states.printing_state import PrintingState from .gcode_executor import GCodeExecutor from .file_system.remote_sd_card_file_list import RemoteSDCardFileList AMBIENT_TEMPERATURE: float = 21.3 @dataclass class BambuPrinterTelemetry: temp: list[float] = field(default_factory=lambda: [AMBIENT_TEMPERATURE]) targetTemp: list[float] = field(default_factory=lambda: [0.0]) bedTemp: float = AMBIENT_TEMPERATURE bedTargetTemp = 0.0 hasChamber: bool = False chamberTemp: float = AMBIENT_TEMPERATURE chamberTargetTemp: float = 0.0 lastTempAt: float = time.monotonic() firmwareName: str = "Bambu" extruderCount: int = 1 # noinspection PyBroadException class BambuVirtualPrinter: gcode_executor = GCodeExecutor() def __init__( self, settings, printer_profile_manager, data_folder, serial_log_handler=None, read_timeout=5.0, faked_baudrate=115200, ): self._settings = settings self._printer_profile_manager = printer_profile_manager self._faked_baudrate = faked_baudrate self._data_folder = data_folder self._last_hms_errors = None self._log = logging.getLogger("octoprint.plugins.bambu_printer.BambuPrinter") self._state_idle = IdleState(self) self._state_printing = PrintingState(self) self._state_paused = PausedState(self) self._current_state = self._state_idle self._running = True self._print_status_reporter = None self._print_temp_reporter = None self._printer_thread = threading.Thread( target=self._printer_worker, name="octoprint.plugins.bambu_printer.printer_state", ) self._state_change_queue = queue.Queue() self._current_print_job: PrintJob | None = None self._serial_io = PrinterSerialIO( handle_command_callback=self._process_gcode_serial_command, settings=settings, serial_log_handler=serial_log_handler, read_timeout=read_timeout, write_timeout=10.0, ) self._telemetry = BambuPrinterTelemetry() self._telemetry.hasChamber = printer_profile_manager.get_current().get( "heatedChamber" ) self.file_system = RemoteSDCardFileList(settings) self._selected_project_file: FileInfo | None = None self._project_files_view = ( CachedFileView(self.file_system, on_update=self._list_cached_project_files) .with_filter("", ".3mf") .with_filter("cache/", ".3mf") ) self._serial_io.start() self._printer_thread.start() self._mqtt_client = None self._mqtt_connected = False self._bambu_client = None self._custom_connected = False self._bambu_client: BambuClient = self._create_client_connection_async() @property def bambu_client(self): return self._bambu_client @property def is_running(self): return self._running @property def current_state(self): return self._current_state @property def current_print_job(self): return self._current_print_job @current_print_job.setter def current_print_job(self, value): self._current_print_job = value @property def selected_file(self): return self._selected_project_file @property def has_selected_file(self): return self._selected_project_file is not None @property def timeout(self): return self._serial_io._read_timeout @timeout.setter def timeout(self, value): self._log.debug(f"Setting read timeout to {value}s") self._serial_io._read_timeout = value @property def write_timeout(self): return self._serial_io._write_timeout @write_timeout.setter def write_timeout(self, value): self._log.debug(f"Setting write timeout to {value}s") self._serial_io._write_timeout = value @property def port(self): return "BAMBU" @property def baudrate(self): return self._faked_baudrate @property def project_files(self): return self._project_files_view @property def is_connected(self): """Custom property to track connection status without modifying BambuClient directly""" connection_status = self._custom_connected and self._mqtt_connected self._log.debug(f"Connection status check: custom_connected={self._custom_connected}, mqtt_connected={self._mqtt_connected}, result={connection_status}") return connection_status def change_state(self, new_state: APrinterState): self._state_change_queue.put(new_state) def new_update(self, event_type): if event_type == "event_hms_errors": self._update_hms_errors() elif event_type == "event_printer_data_update": self._update_printer_info() def _update_printer_info(self): # Verwende direkt die Telemetrie-Daten statt der BambuClient-Struktur self.lastTempAt = time.monotonic() # Der Rest der Methode kann unverändert bleiben, da wir die Telemetrie # direkt in _process_print_data aktualisieren # Gib den aktuellen Status detaillierter aus self._log.debug(f"Current temperatures - Nozzle: {self._telemetry.temp[0]}/{self._telemetry.targetTemp[0]}, " + f"Bed: {self._telemetry.bedTemp}/{self._telemetry.bedTargetTemp}, " + f"Chamber: {self._telemetry.chamberTemp}") # Rufe trotzdem die BambuClient-Daten ab, falls verfügbar try: device_data = self.bambu_client.get_device() print_job_state = device_data.print_job.gcode_state self._log.debug(f"BambuClient printer state: {print_job_state}") if ( print_job_state == "IDLE" or print_job_state == "FINISH" or print_job_state == "FAILED" ): self.change_state(self._state_idle) elif print_job_state == "RUNNING" or print_job_state == "PREPARE": self.change_state(self._state_printing) elif print_job_state == "PAUSE": self.change_state(self._state_paused) else: self._log.warn(f"Unknown print job state: {print_job_state}") except Exception as e: self._log.error(f"Error reading BambuClient device state: {e}") def _update_hms_errors(self): bambu_printer = self.bambu_client.get_device() if ( bambu_printer.hms.errors != self._last_hms_errors and bambu_printer.hms.errors["Count"] > 0 ): self._log.debug(f"HMS Error: {bambu_printer.hms.errors}") for n in range(1, bambu_printer.hms.errors["Count"] + 1): error = bambu_printer.hms.errors[f"{n}-Error"].strip() self.sendIO(f"// action:notification {error}") self._last_hms_errors = bambu_printer.hms.errors def on_disconnect(self, on_disconnect): self._log.debug(f"on disconnect called") return on_disconnect def on_connect(self, on_connect): self._log.debug(f"on connect called") return on_connect def _on_mqtt_connect(self, client, userdata, flags, rc): self._log.debug(f"MQTT connected with result code: {rc}") if rc == 0: self._mqtt_connected = True self._custom_connected = True # Subscribe to the relevant topics for the Bambu printer device_topic = f"device/{self._settings.get(['serial'])}/report" client.subscribe(device_topic) self._log.debug(f"Subscribed to topic: {device_topic}") self._log.info(f"MQTT connection successful. Connected: {self.is_connected}") # Notify that we're connected self.sendOk() else: self._mqtt_connected = False self._custom_connected = False self._log.error(f"Failed to connect to MQTT broker with result code: {rc}") def _on_mqtt_disconnect(self, client, userdata, rc): self._mqtt_connected = False self._custom_connected = False self._log.debug(f"MQTT disconnected with result code: {rc}") def _on_mqtt_message(self, client, userdata, msg): try: # Decode message and update client data payload = json.loads(msg.payload.decode('utf-8')) self._log.debug(f"MQTT message received on topic {msg.topic}: {list(payload.keys())}") # Direkte Verarbeitung der Daten self._process_mqtt_payload(payload) # Auch an Bambu Client weiterleiten try: # Wenn der BambuClient eine eigene Verarbeitungsmethode hat, nutzen wir diese if hasattr(self._bambu_client, '_process_message') and callable(self._bambu_client._process_message): self._bambu_client._process_message(msg.topic, payload) self._log.debug("Message forwarded to pybambu via _process_message") elif hasattr(self._bambu_client, '_handle_mqtt_message') and callable(self._bambu_client._handle_mqtt_message): self._bambu_client._handle_mqtt_message(client, userdata, msg) self._log.debug("Message forwarded to pybambu via _handle_mqtt_message") else: # Wenn keine Methode zur Verarbeitung verfügbar ist, aktualisieren wir die Datenstruktur manuell self._log.debug("No message handler found in BambuClient, updating state manually") self._update_bambu_client_state(payload) except Exception as e: self._log.error(f"Error forwarding to pybambu: {e}", exc_info=True) except Exception as e: self._log.error(f"Error processing MQTT message: {e}", exc_info=True) def _process_mqtt_payload(self, payload): """Zentrale Methode zur Verarbeitung von MQTT-Payloads""" try: # Verarbeite print-Daten if 'print' in payload: print_data = payload['print'] self._log.info(f"Processing print data with keys: {list(print_data.keys())}") # Temperaturdaten verarbeiten if 'temperature' in print_data: self._process_temperature_data(print_data['temperature']) # Status verarbeiten if 'gcode_state' in print_data: self._process_print_state(print_data['gcode_state']) # Trigger update self.new_update("event_printer_data_update") # Verarbeite info-Daten if 'info' in payload: info_data = payload['info'] self._log.info(f"Processing info data with keys: {list(info_data.keys())}") # HMS-Fehler verarbeiten if 'hms' in info_data: self._process_hms_errors(info_data['hms']) self.new_update("event_hms_errors") except Exception as e: self._log.error(f"Error processing MQTT payload: {e}", exc_info=True) def _process_temperature_data(self, temp_data): """Verarbeitet Temperaturdaten aus MQTT-Nachrichten""" try: # Extruder Temperatur if 'nozzle_temp' in temp_data: self._telemetry.temp[0] = float(temp_data['nozzle_temp']) if 'target_nozzle_temp' in temp_data: self._telemetry.targetTemp[0] = float(temp_data['target_nozzle_temp']) # Bett Temperatur if 'bed_temp' in temp_data: self._telemetry.bedTemp = float(temp_data['bed_temp']) if 'target_bed_temp' in temp_data: self._telemetry.bedTargetTemp = float(temp_data['target_bed_temp']) # Kammer Temperatur if 'chamber_temp' in temp_data: self._telemetry.chamberTemp = float(temp_data['chamber_temp']) self._log.debug(f"Updated temperatures - Nozzle: {self._telemetry.temp[0]}/{self._telemetry.targetTemp[0]}, " + f"Bed: {self._telemetry.bedTemp}/{self._telemetry.bedTargetTemp}, " + f"Chamber: {self._telemetry.chamberTemp}") except Exception as e: self._log.error(f"Error processing temperature data: {e}", exc_info=True) def _process_print_state(self, print_job_state): """Verarbeitet den Druckerstatus aus MQTT-Nachrichten""" try: self._log.debug(f"Received printer state update: {print_job_state}") if print_job_state in ["IDLE", "FINISH", "FAILED"]: self.change_state(self._state_idle) elif print_job_state in ["RUNNING", "PREPARE"]: self.change_state(self._state_printing) elif print_job_state == "PAUSE": self.change_state(self._state_paused) else: self._log.warn(f"Unknown print job state: {print_job_state}") # Aktualisiere auch die pybambu-Datenstruktur if hasattr(self._bambu_client, 'device') and hasattr(self._bambu_client.device, 'print_job'): self._bambu_client.device.print_job.gcode_state = print_job_state except Exception as e: self._log.error(f"Error processing print state: {e}", exc_info=True) def _process_hms_errors(self, hms_data): """Verarbeitet HMS-Fehlerdaten aus MQTT-Nachrichten""" try: if hasattr(self._bambu_client, 'device') and hasattr(self._bambu_client.device, 'hms'): self._bambu_client.device.hms.update_from_payload(hms_data) self._log.debug("HMS error data updated") # Überprüfe auf Fehler und zeige sie an if self._bambu_client.device.hms.errors != self._last_hms_errors and self._bambu_client.device.hms.errors["Count"] > 0: self._log.debug(f"HMS Error: {self._bambu_client.device.hms.errors}") for n in range(1, self._bambu_client.device.hms.errors["Count"] + 1): error = self._bambu_client.device.hms.errors[f"{n}-Error"].strip() self.sendIO(f"// action:notification {error}") self._last_hms_errors = self._bambu_client.device.hms.errors except Exception as e: self._log.error(f"Error processing HMS errors: {e}", exc_info=True) def _update_bambu_client_state(self, payload): """Aktualisiert die internen Zustände des BambuClient""" try: if not hasattr(self._bambu_client, 'device'): self._log.debug("BambuClient has no device attribute, initializing") return if 'print' in payload: print_data = payload['print'] # Temperatur aktualisieren if 'temperature' in print_data and hasattr(self._bambu_client.device, 'temperature'): temp_obj = self._bambu_client.device.temperature temp_data = print_data['temperature'] # Direkte Zuweisung der Temperaturen if 'nozzle_temp' in temp_data: temp_obj.nozzle_temp = float(temp_data['nozzle_temp']) if 'target_nozzle_temp' in temp_data: temp_obj.target_nozzle_temp = float(temp_data['target_nozzle_temp']) if 'bed_temp' in temp_data: temp_obj.bed_temp = float(temp_data['bed_temp']) if 'target_bed_temp' in temp_data: temp_obj.target_bed_temp = float(temp_data['target_bed_temp']) if 'chamber_temp' in temp_data: temp_obj.chamber_temp = float(temp_data['chamber_temp']) except Exception as e: self._log.error(f"Error updating BambuClient state: {e}") def _create_client_connection_async(self): self._create_client_connection() if self._bambu_client is None: raise RuntimeError("Connection with Bambu Client not established") return self._bambu_client def _create_client_connection(self): if ( self._settings.get(["device_type"]) == "" or self._settings.get(["serial"]) == "" or self._settings.get(["username"]) == "" or self._settings.get(["access_code"]) == "" ): msg = "invalid settings to start connection with Bambu Printer" self._log.debug(msg) raise ValueError(msg) use_local_mqtt = self._settings.get_boolean(['local_mqtt']) self._log.debug(f"connecting via local mqtt: {use_local_mqtt}") # Create a BambuClient but don't let it handle the MQTT connection bambu_client = BambuClient( device_type=self._settings.get(["device_type"]), serial=self._settings.get(["serial"]), host=self._settings.get(["host"]), username="bambuocto", access_code=self._settings.get(["access_code"]), local_mqtt=use_local_mqtt, region=self._settings.get(["region"]), email=self._settings.get(["email"]), auth_token=self._settings.get(["auth_token"]), ) # Initialisiere die device-Eigenschaft manuell, ohne connect() zu benutzen # da die connect()-Methode ein Callback als Parameter erwartet if not hasattr(bambu_client, 'device'): self._log.debug("BambuClient has no device attribute, initializing manually") # Statt bambu_client.connect() zu verwenden, initialisieren wir die Attribute direkt try: # Wir prüfen, ob wir auf private Attribute zugreifen können if hasattr(bambu_client, '_init_attributes'): bambu_client._init_attributes() self._log.debug("Initialized BambuClient attributes manually via _init_attributes") else: # Wenn keine _init_attributes Methode vorhanden ist, erstellen wir die grundlegenden Attribute manuell from pybambu import BambuDevice bambu_client.device = BambuDevice() self._log.debug("Created device attribute manually") except Exception as e: self._log.error(f"Error initializing BambuClient: {e}", exc_info=True) # Set up our own MQTT client self._mqtt_client = mqtt.Client() self._mqtt_client.on_connect = self._on_mqtt_connect self._mqtt_client.on_disconnect = self._on_mqtt_disconnect self._mqtt_client.on_message = self._on_mqtt_message # Configure connection based on local or cloud if use_local_mqtt: host = self._settings.get(["host"]) port = 1883 username = "octobambu" self._mqtt_client.username_pw_set(username) else: # Cloud connection settings region = self._settings.get(["region"]) host = f"mqtt-{region}.bambulab.com" port = 8883 username = self._settings.get(["email"]) password = self._settings.get(["auth_token"]) self._mqtt_client.username_pw_set(username, password) self._mqtt_client.tls_set() # Connect MQTT try: self._mqtt_client.connect(host, port, 60) self._mqtt_client.loop_start() self._log.info(f"MQTT client started with {host}:{port}") # Explicitly set the connection status self._custom_connected = True except Exception as e: self._log.error(f"Failed to connect to MQTT broker: {e}") raise # Inject our MQTT client into the BambuClient without modifying 'connected' bambu_client._mqtt_client = self._mqtt_client # Instead of modifying bambu_client.connected, we'll use our custom property self._custom_connected = True # Store the Bambu client self._bambu_client = bambu_client self._log.info(f"Custom connection status: {self.is_connected}") self.sendOk() def publish_mqtt(self, topic, payload): """Publish a message to the MQTT broker""" if self._mqtt_client and self._mqtt_connected: return self._mqtt_client.publish(topic, json.dumps(payload)) return False # Override BambuClient's publish method to use our MQTT client def publish(self, command): """Publish a command using our MQTT client""" if not self.is_connected: self._log.error("Cannot publish command: MQTT not connected") return False serial = self._settings.get(["serial"]) topic = f"device/{serial}/request" return self.publish_mqtt(topic, command) def __str__(self): return "BAMBU(read_timeout={read_timeout},write_timeout={write_timeout},options={options})".format( read_timeout=self.timeout, write_timeout=self.write_timeout, options={ "device_type": self._settings.get(["device_type"]), "host": self._settings.get(["host"]), }, ) def _reset(self): with self._serial_io.incoming_lock: self.lastN = 0 self._running = False if self._print_status_reporter is not None: self._print_status_reporter.cancel() self._print_status_reporter = None if self._settings.get_boolean(["simulateReset"]): for item in self._settings.get(["resetLines"]): self.sendIO(item + "\n") self._serial_io.reset() def write(self, data: bytes) -> int: return self._serial_io.write(data) def readline(self) -> bytes: return self._serial_io.readline() def readlines(self) -> list[bytes]: return self._serial_io.readlines() def sendIO(self, line: str): self._serial_io.send(line) def sendOk(self): self._serial_io.sendOk() def flush(self): self._serial_io.flush() self._wait_for_state_change() ##~~ project file functions def remove_project_selection(self): self._selected_project_file = None def select_project_file(self, file_path: str) -> bool: self._log.debug(f"Select project file: {file_path}") file_info = self._project_files_view.get_file_by_stem( file_path, [".gcode", ".3mf"] ) if ( self._selected_project_file is not None and file_info is not None and self._selected_project_file.path == file_info.path ): return True if file_info is None: self._log.error(f"Cannot select not existing file: {file_path}") return False self._selected_project_file = file_info self._send_file_selected_message() return True ##~~ command implementations @gcode_executor.register_no_data("M21") def _sd_status(self) -> None: self.sendIO("SD card ok") @gcode_executor.register("M23") def _select_sd_file(self, data: str) -> bool: filename = data.split(maxsplit=1)[1].strip() return self.select_project_file(filename) def _send_file_selected_message(self): if self.selected_file is None: return self.sendIO( f"File opened: {self.selected_file.file_name} " f"Size: {self.selected_file.size}" ) self.sendIO("File selected") @gcode_executor.register("M26") def _set_sd_position(self, data: str) -> bool: if data == "M26 S0": return self._cancel_print() else: self._log.debug("ignoring M26 command.") self.sendIO("M26 disabled for Bambu") return True @gcode_executor.register("M27") def _report_sd_print_status(self, data: str) -> bool: matchS = re.search(r"S([0-9]+)", data) if matchS: interval = int(matchS.group(1)) if interval > 0: self.start_continuous_status_report(interval) return False else: self.stop_continuous_status_report() return False self.report_print_job_status() return True def start_continuous_status_report(self, interval: int): if self._print_status_reporter is not None: self._print_status_reporter.cancel() self._print_status_reporter = RepeatedTimer( interval, self.report_print_job_status ) self._print_status_reporter.start() def stop_continuous_status_report(self): if self._print_status_reporter is not None: self._print_status_reporter.cancel() self._print_status_reporter = None @gcode_executor.register("M30") def _delete_project_file(self, data: str) -> bool: file_path = data.split(maxsplit=1)[1].strip() file_info = self.project_files.get_file_data(file_path) if file_info is not None: self.file_system.delete_file(file_info.path) self._update_project_file_list() else: self._log.error(f"File not found to delete {file_path}") return True @gcode_executor.register("M105") def _report_temperatures(self, data: str) -> bool: self._processTemperatureQuery() return True @gcode_executor.register("M155") def _auto_report_temperatures(self, data: str) -> bool: matchS = re.search(r"S([0-9]+)", data) if matchS: interval = int(matchS.group(1)) if interval > 0: self.start_continuous_temp_report(interval) else: self.stop_continuous_temp_report() self.report_print_job_status() return True def start_continuous_temp_report(self, interval: int): if self._print_temp_reporter is not None: self._print_temp_reporter.cancel() self._print_temp_reporter = RepeatedTimer( interval, self._processTemperatureQuery ) self._print_temp_reporter.start() def stop_continuous_temp_report(self): if self._print_temp_reporter is not None: self._print_temp_reporter.cancel() self._print_temp_reporter = None # noinspection PyUnusedLocal @gcode_executor.register_no_data("M115") def _report_firmware_info(self) -> bool: self.sendIO("Bambu Printer Integration") self.sendIO("Cap:AUTOREPORT_SD_STATUS:1") self.sendIO("Cap:AUTOREPORT_TEMP:1") self.sendIO("Cap:EXTENDED_M20:1") self.sendIO("Cap:LFN_WRITE:1") return True @gcode_executor.register("M117") def _get_lcd_message(self, data: str) -> bool: result = re.search(r"M117\s+(.*)", data).group(1) self.sendIO(f"echo:{result}") return True @gcode_executor.register("M118") def _serial_print(self, data: str) -> bool: match = re.search(r"M118 (?:(?PA1|E1|Pn[012])\s)?(?P.*)", data) if not match: self.sendIO("Unrecognized command parameters for M118") else: result = match.groupdict() text = result["text"] parameter = result["parameter"] if parameter == "A1": self.sendIO(f"//{text}") elif parameter == "E1": self.sendIO(f"echo:{text}") else: self.sendIO(text) return True # noinspection PyUnusedLocal @gcode_executor.register("M220") def _set_feedrate_percent(self, data: str) -> bool: if self.bambu_client.connected: gcode_command = commands.SEND_GCODE_TEMPLATE percent = int(data.replace("M220 S", "")) def speed_fraction(speed_percent): return math.floor(10000 / speed_percent) / 100 def acceleration_magnitude(speed_percent): return math.exp((speed_fraction(speed_percent) - 1.0191) / -0.8139) def feed_rate(speed_percent): return 6.426e-5 * speed_percent ** 2 - 2.484e-3 * speed_percent + 0.654 def linear_interpolate(x, x_points, y_points): if x <= x_points[0]: return y_points[0] if x >= x_points[-1]: return y_points[-1] for i in range(len(x_points) - 1): if x_points[i] <= x < x_points[i + 1]: t = (x - x_points[i]) / (x_points[i + 1] - x_points[i]) return y_points[i] * (1 - t) + y_points[i + 1] * t def scale_to_data_points(func, data_points): data_points.sort(key=lambda x: x[0]) speeds, values = zip(*data_points) scaling_factors = [v / func(s) for s, v in zip(speeds, values)] return lambda x: func(x) * linear_interpolate(x, speeds, scaling_factors) def speed_adjust(speed_percentage): if not 30 <= speed_percentage <= 180: speed_percentage = 100 bambu_params = { "speed": [50, 100, 124, 166], "acceleration": [0.3, 1.0, 1.4, 1.6], "feed_rate": [0.7, 1.0, 1.4, 2.0] } acc_mag_scaled = scale_to_data_points(acceleration_magnitude, list(zip(bambu_params["speed"], bambu_params["acceleration"]))) feed_rate_scaled = scale_to_data_points(feed_rate, list(zip(bambu_params["speed"], bambu_params["feed_rate"]))) speed_frac = speed_fraction(speed_percentage) acc_mag = acc_mag_scaled(speed_percentage) feed = feed_rate_scaled(speed_percentage) # speed_level = 1.539 * (acc_mag**2) - 0.7032 * acc_mag + 4.0834 return f"M204.2 K{acc_mag:.2f}\nM220 K{feed:.2f}\nM73.2 R{speed_frac:.2f}\n" # M1002 set_gcode_claim_speed_level ${speed_level:.0f}\n speed_command = speed_adjust(percent) gcode_command["print"]["param"] = speed_command if self.bambu_client.publish(gcode_command): self._log.info(f"{percent}% speed adjustment command sent successfully") return True def _process_gcode_serial_command(self, gcode: str, full_command: str): self._log.debug(f"processing gcode {gcode} command = {full_command}") handled = self.gcode_executor.execute(self, gcode, full_command) if handled: self.sendOk() return # post gcode to printer otherwise if self.is_connected: GCODE_COMMAND = commands.SEND_GCODE_TEMPLATE GCODE_COMMAND["print"]["param"] = full_command + "\n" if self.publish(GCODE_COMMAND): self._log.info("command sent successfully") self.sendOk() @gcode_executor.register_no_data("M112") def _shutdown(self): self._running = True if self.bambu_client.connected: self.bambu_client.disconnect() self.sendIO("echo:EMERGENCY SHUTDOWN DETECTED. KILLED.") self._serial_io.close() return True @gcode_executor.register("M20") def _update_project_file_list(self, data: str = ""): self._project_files_view.update() # internally sends list to serial io return True def _list_cached_project_files(self): self.sendIO("Begin file list") for item in map( FileInfo.get_gcode_info, self._project_files_view.get_all_cached_info() ): self.sendIO(item) self.sendIO("End file list") self.sendOk() @gcode_executor.register_no_data("M24") def _start_resume_sd_print(self): self._current_state.start_new_print() return True @gcode_executor.register_no_data("M25") def _pause_print(self): self._current_state.pause_print() return True @gcode_executor.register("M524") def _cancel_print(self): self._current_state.cancel_print() return True def report_print_job_status(self): if self.current_print_job is not None: file_position = 1 if self.current_print_job.file_position == 0 else self.current_print_job.file_position self.sendIO( f"SD printing byte {file_position}" f"/{self.current_print_job.file_info.size}" ) else: self.sendIO("Not SD printing") def report_print_finished(self): if self.current_print_job is None: return self._log.debug( f"SD File Print finishing: {self.current_print_job.file_info.file_name}" ) self.sendIO("Done printing file") def finalize_print_job(self): if self.current_print_job is not None: self.report_print_job_status() self.report_print_finished() self.current_print_job = None self.report_print_job_status() self.change_state(self._state_idle) def _create_temperature_message(self) -> str: template = "{heater}:{actual:.2f}/ {target:.2f}" temps = collections.OrderedDict() temps["T"] = (self._telemetry.temp[0], self._telemetry.targetTemp[0]) temps["B"] = (self._telemetry.bedTemp, self._telemetry.bedTargetTemp) if self._telemetry.hasChamber: temps["C"] = ( self._telemetry.chamberTemp, self._telemetry.chamberTargetTemp, ) output = " ".join( map( lambda x: template.format(heater=x[0], actual=x[1][0], target=x[1][1]), temps.items(), ) ) output += " @:64\n" return output def _processTemperatureQuery(self) -> bool: # Debug-Log hinzufügen, um zu prüfen, ob die Methode aufgerufen wird self._log.debug(f"Processing temperature query - connected: {self.is_connected}") # Aktuelle Temperaturdaten ausgeben self._log.debug(f"Current temperature data: Nozzle={self._telemetry.temp[0]}/{self._telemetry.targetTemp[0]}, " + f"Bed={self._telemetry.bedTemp}/{self._telemetry.bedTargetTemp}") # Temperaturmeldung erzeugen und senden, unabhängig von Connected-Status output = self._create_temperature_message() self._log.debug(f"Sending temperature message: {output.strip()}") self.sendIO(output) return True def close(self): if self._mqtt_client and self._mqtt_connected: self._mqtt_client.loop_stop() self._mqtt_client.disconnect() self._mqtt_connected = False self._custom_connected = False # Ändern Sie jeden Verweis auf bambu_client.connected zu self.is_connected if self._bambu_client: self._bambu_client.disconnect() self.change_state(self._state_idle) self._serial_io.close() self.stop() def stop(self): self._running = False self._printer_thread.join() def _wait_for_state_change(self): self._state_change_queue.join() def _printer_worker(self): self._create_client_connection_async() self.sendIO("Printer connection complete") while self._running: try: next_state = self._state_change_queue.get(timeout=0.01) self._trigger_change_state(next_state) self._state_change_queue.task_done() except queue.Empty: continue except Exception as e: self._state_change_queue.task_done() raise e self._current_state.finalize() def _trigger_change_state(self, new_state: APrinterState): if self._current_state == new_state: return self._log.debug( f"Changing state from {self._current_state.__class__.__name__} to {new_state.__class__.__name__}" ) self._current_state.finalize() self._current_state = new_state self._current_state.init() def _showPrompt(self, text, choices): self._hidePrompt() self.sendIO(f"//action:prompt_begin {text}") for choice in choices: self.sendIO(f"//action:prompt_button {choice}") self.sendIO("//action:prompt_show") def _hidePrompt(self): self.sendIO("//action:prompt_end")