diff --git a/octoprint_bambu_printer/printer/bambu_virtual_printer.py b/octoprint_bambu_printer/printer/bambu_virtual_printer.py index 08991a6..d24b0fa 100644 --- a/octoprint_bambu_printer/printer/bambu_virtual_printer.py +++ b/octoprint_bambu_printer/printer/bambu_virtual_printer.py @@ -273,7 +273,6 @@ class BambuVirtualPrinter: def readlines(self) -> list[bytes]: result = [] - self._serial_io.wait_for_input() next_line = self._serial_io.readline() while next_line != b"": result.append(next_line) @@ -281,11 +280,14 @@ class BambuVirtualPrinter: return result def sendIO(self, line: str): - self.sendIO(line) + self._serial_io.send(line) def sendOk(self): self._serial_io.sendOk() + def flush(self): + self._serial_io.flush() + ##~~ command implementations @gcode_executor.register("M23") @@ -392,12 +394,16 @@ class BambuVirtualPrinter: self._log.info(f"{percent}% speed adjustment command sent successfully") return True - def _process_gcode_serial_command(self, gcode_letter: str, gcode: str, data: bytes): - self._log.debug(f"processing gcode command {gcode_letter} {gcode} {data}") + def _process_gcode_serial_command( + self, gcode_letter: str, gcode: str, full_command: str + ): + self._log.debug( + f"processing gcode command letter = {gcode_letter} | gcode = {gcode} | full = {full_command}" + ) if gcode_letter in self.gcode_executor: - handled = self.gcode_executor.execute(self, gcode_letter, data) + handled = self.gcode_executor.execute(self, gcode_letter, full_command) else: - handled = self.gcode_executor.execute(self, gcode, data) + handled = self.gcode_executor.execute(self, gcode, full_command) if handled: self._serial_io.sendOk() return @@ -405,7 +411,7 @@ class BambuVirtualPrinter: # post gcode to printer otherwise if self.bambu_client.connected: GCODE_COMMAND = commands.SEND_GCODE_TEMPLATE - GCODE_COMMAND["print"]["param"] = data + "\n" + GCODE_COMMAND["print"]["param"] = full_command + "\n" if self.bambu_client.publish(GCODE_COMMAND): self._log.info("command sent successfully") self._serial_io.sendOk() @@ -416,7 +422,7 @@ class BambuVirtualPrinter: if self.bambu_client.connected: self.bambu_client.disconnect() self.sendIO("echo:EMERGENCY SHUTDOWN DETECTED. KILLED.") - self._serial_io.stop() + self._serial_io.close() return True @gcode_executor.register_no_data("M20") @@ -481,6 +487,7 @@ class BambuVirtualPrinter: self.sendIO(f"Writing to file: {filename}") def _finishSdFile(self): + # FIXME: maybe remove or move to remote SD card try: self._writingToSdHandle.close() except Exception: @@ -515,4 +522,14 @@ class BambuVirtualPrinter: def close(self): if self.bambu_client.connected: self.bambu_client.disconnect() - self._serial_io.stop() + self._serial_io.close() + + def _showPrompt(self, text, choices): + self._hidePrompt() + self.sendIO(f"//action:prompt_begin {text}") + for choice in choices: + self.sendIO(f"//action:prompt_button {choice}") + self.sendIO("//action:prompt_show") + + def _hidePrompt(self): + self.sendIO("//action:prompt_end") diff --git a/octoprint_bambu_printer/printer/gcode_executor.py b/octoprint_bambu_printer/printer/gcode_executor.py index d03881b..0f0b21f 100644 --- a/octoprint_bambu_printer/printer/gcode_executor.py +++ b/octoprint_bambu_printer/printer/gcode_executor.py @@ -1,6 +1,7 @@ import itertools import logging from inspect import signature +import traceback GCODE_DOCUMENTATION = { @@ -311,7 +312,8 @@ class GCodeExecutor: self._log.debug(f"ignoring {gcode_info} command.") return True except Exception as e: - self._log.error(f"Error {gcode_info}: {str(e)}") + self._log.error(f"Error during gcode {gcode_info}") + self._log.error(e, exc_info=True) return False def _gcode_with_info(self, gcode): diff --git a/octoprint_bambu_printer/printer/printer_serial_io.py b/octoprint_bambu_printer/printer/printer_serial_io.py index 8cfe6f1..35324a2 100644 --- a/octoprint_bambu_printer/printer/printer_serial_io.py +++ b/octoprint_bambu_printer/printer/printer_serial_io.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import queue import re @@ -16,28 +18,18 @@ class PrinterSerialIO(threading.Thread): def __init__( self, - handle_command_callback: Callable[[str, str, bytes], None], + handle_command_callback: Callable[[str, str, str], None], settings, serial_log_handler=None, read_timeout=5.0, write_timeout=10.0, ) -> None: super().__init__( - name="octoprint.plugins.bambu_printer.wait_thread", daemon=True + name="octoprint.plugins.bambu_printer.serial_io_thread", daemon=True ) self._handle_command_callback = handle_command_callback self._settings = settings - self._serial_log = logging.getLogger( - "octoprint.plugins.bambu_printer.BambuPrinter.serial" - ) - self._serial_log.setLevel(logging.CRITICAL) - self._serial_log.propagate = False - - if serial_log_handler is not None: - self._serial_log.addHandler(serial_log_handler) - self._serial_log.setLevel(logging.INFO) - - self._serial_log.debug("-" * 78) + self._log = self._init_logger(serial_log_handler) self._read_timeout = read_timeout self._write_timeout = write_timeout @@ -48,65 +40,106 @@ class PrinterSerialIO(threading.Thread): self._rx_buffer_size = 64 self._incoming_lock = threading.RLock() - self._input_queue_empty = threading.Event() - self._input_queue_empty.set() - self._input_processing_finished = threading.Event() - self._input_processing_finished.set() - self.incoming = CharCountingQueue(self._rx_buffer_size, name="RxBuffer") - self.outgoing = queue.Queue() + self.input_bytes = CharCountingQueue(self._rx_buffer_size, name="RxBuffer") + self.output_bytes = queue.Queue() + + def _init_logger(self, log_handler): + log = logging.getLogger("octoprint.plugins.bambu_printer.BambuPrinter.serial") + if log_handler is not None: + log.addHandler(log_handler) + log.debug("-" * 78) + return log @property def incoming_lock(self): return self._incoming_lock def run(self) -> None: - data = None + buffer = b"" - buf = b"" - while self.incoming is not None and self._running: + while self._running: try: - data = self.incoming.get(timeout=0.01) + data = self.input_bytes.get(block=True, timeout=0.01) data = to_bytes(data, encoding="ascii", errors="replace") - self.incoming.task_done() - self._input_queue_empty.clear() + self.input_bytes.task_done() + + line, buffer = self._read_next_line_buffered(data, buffer) + while line is not None: + self._received_lines += 1 + self._process_input_gcode_line(line) + line, buffer = self._read_next_line_buffered(data, buffer) except queue.Empty: - self._input_queue_empty.set() - continue - except Exception: - if self.incoming is None: - # just got closed - break - - if data is not None: - buf += data - nl = buf.find(b"\n") + 1 - if nl > 0: - data = buf[:nl] - buf = buf[nl:] - else: - continue - - if data is None: continue - self._received_lines += 1 + self._log.debug("Closing IO read loop") + + def _read_next_line_buffered(self, additional: bytes, buffer: bytes): + buffer += additional + new_line_pos = buffer.find(b"\n") + 1 + if new_line_pos > 0: + additional = buffer[:new_line_pos] + buffer = buffer[new_line_pos:] + + return additional, buffer + + def close(self): + self.flush() + self._running = False + self.join() + + def flush(self): + with self.input_bytes.all_tasks_done: + self.input_bytes.all_tasks_done.wait() + + def write(self, data: bytes) -> int: + data = to_bytes(data, errors="replace") + u_data = to_unicode(data, errors="replace") + + with self._incoming_lock: + if self.is_closed(): + return 0 try: - self._process_input_line(data) - finally: - self._input_processing_finished.set() + written = self.input_bytes.put( + data, timeout=self._write_timeout, partial=True + ) + self._log.debug(f"<<< {u_data}") + return written + except queue.Full: + self._log.error( + "Incoming queue is full, raising SerialTimeoutException" + ) + raise SerialTimeoutException() - self._serial_log.debug("Closing down read loop") + def readline(self) -> bytes: + try: + # fetch a line from the queue, wait no longer than timeout + line = to_unicode( + self.output_bytes.get(timeout=self._read_timeout), errors="replace" + ) + self._log.debug(f">>> {line.strip()}") + self.output_bytes.task_done() + return to_bytes(line) + except queue.Empty: + # queue empty? return empty line + return b"" - def stop(self): - self._running = False + def send(self, line: str) -> None: + if self.output_bytes is not None: + self.output_bytes.put(line) - def wait_for_input(self): - self._input_queue_empty.wait() - self._input_processing_finished.wait() + def sendOk(self): + self.send("ok") - def _process_input_line(self, data: bytes): + def reset(self): + self._clearQueue(self.input_bytes) + self._clearQueue(self.output_bytes) + + def is_closed(self): + return not self._running + + def _process_input_gcode_line(self, data: bytes): if b"*" in data: checksum = int(data[data.rfind(b"*") + 1 :]) data = data[: data.rfind(b"*")] @@ -148,75 +181,13 @@ class PrinterSerialIO(threading.Thread): gcode = command_match.group(0) gcode_letter = command_match.group(1) - self._handle_command_callback(gcode_letter, gcode, data) - - def _showPrompt(self, text, choices): - self._hidePrompt() - self.send(f"//action:prompt_begin {text}") - for choice in choices: - self.send(f"//action:prompt_button {choice}") - self.send("//action:prompt_show") - - def _hidePrompt(self): - self.send("//action:prompt_end") - - def write(self, data: bytes) -> int: - data = to_bytes(data, errors="replace") - u_data = to_unicode(data, errors="replace") - - with self._incoming_lock: - if self.is_closed(): - return 0 - - try: - written = self.incoming.put( - data, timeout=self._write_timeout, partial=True - ) - self._serial_log.debug(f"<<< {u_data}") - return written - except queue.Full: - self._serial_log.error( - "Incoming queue is full, raising SerialTimeoutException" - ) - raise SerialTimeoutException() - - def readline(self) -> bytes: - assert self.outgoing is not None - try: - # fetch a line from the queue, wait no longer than timeout - line = to_unicode( - self.outgoing.get(timeout=self._read_timeout), errors="replace" - ) - self._serial_log.debug(f">>> {line.strip()}") - self.outgoing.task_done() - return to_bytes(line) - except queue.Empty: - # queue empty? return empty line - return b"" - - def send(self, line: str) -> None: - if self.outgoing is not None: - self.outgoing.put(line) - - def sendOk(self): - self.send("ok") - - def reset(self): - if self.incoming is not None: - self._clearQueue(self.incoming) - if self.outgoing is not None: - self._clearQueue(self.outgoing) - - def close(self): - self.stop() - self.incoming = None - self.outgoing = None - - def is_closed(self): - return self.incoming is None or self.outgoing is None + self._handle_command_callback(gcode_letter, gcode, command) def _triggerResend( - self, expected: int = None, actual: int = None, checksum: int = None + self, + expected: int | None = None, + actual: int | None = None, + checksum: int | None = None, ) -> None: with self._incoming_lock: if expected is None: diff --git a/test/test_gcode_execution.py b/test/test_gcode_execution.py index 73e4d54..40787b1 100644 --- a/test/test_gcode_execution.py +++ b/test/test_gcode_execution.py @@ -86,7 +86,7 @@ def files_info_ftp(): def ftps_session_mock(files_info_ftp): with unittest.mock.patch( "octoprint_bambu_printer.printer.ftpsclient.ftpsclient.IoTFTPSClient" - ) as ftps_client: + ) as ftps_client_mock: ftps_session = MagicMock() ftps_session.size.side_effect = DictGetter( {file: info[0] for file, info in files_info_ftp.items()} @@ -96,13 +96,13 @@ def ftps_session_mock(files_info_ftp): ) all_files = list(files_info_ftp.keys()) - ftps_client.list_files.side_effect = DictGetter( + ftps_client_mock.list_files.side_effect = DictGetter( { ("", ".3mf"): all_files, ("cache/", ".3mf"): [f"cache/{file}" for file in all_files], } ) - ftps_client.ftps_session = ftps_session + ftps_client_mock.ftps_session = ftps_session yield @@ -130,7 +130,7 @@ def test_initial_state(printer: BambuVirtualPrinter): def test_list_sd_card(printer: BambuVirtualPrinter): printer.write(b"M20\n") # GCode for listing SD card - time.sleep(0.1) + printer.flush() result = printer.readlines() assert result == "" # Replace with the actual expected result