Fix serial IO synchronization.

This commit is contained in:
Anton Skrypnyk 2024-07-24 17:15:46 +03:00
parent 98a1f59169
commit 07f601694d
4 changed files with 124 additions and 134 deletions

View File

@ -273,7 +273,6 @@ class BambuVirtualPrinter:
def readlines(self) -> list[bytes]:
result = []
self._serial_io.wait_for_input()
next_line = self._serial_io.readline()
while next_line != b"":
result.append(next_line)
@ -281,11 +280,14 @@ class BambuVirtualPrinter:
return result
def sendIO(self, line: str):
self.sendIO(line)
self._serial_io.send(line)
def sendOk(self):
self._serial_io.sendOk()
def flush(self):
self._serial_io.flush()
##~~ command implementations
@gcode_executor.register("M23")
@ -392,12 +394,16 @@ class BambuVirtualPrinter:
self._log.info(f"{percent}% speed adjustment command sent successfully")
return True
def _process_gcode_serial_command(self, gcode_letter: str, gcode: str, data: bytes):
self._log.debug(f"processing gcode command {gcode_letter} {gcode} {data}")
def _process_gcode_serial_command(
self, gcode_letter: str, gcode: str, full_command: str
):
self._log.debug(
f"processing gcode command letter = {gcode_letter} | gcode = {gcode} | full = {full_command}"
)
if gcode_letter in self.gcode_executor:
handled = self.gcode_executor.execute(self, gcode_letter, data)
handled = self.gcode_executor.execute(self, gcode_letter, full_command)
else:
handled = self.gcode_executor.execute(self, gcode, data)
handled = self.gcode_executor.execute(self, gcode, full_command)
if handled:
self._serial_io.sendOk()
return
@ -405,7 +411,7 @@ class BambuVirtualPrinter:
# post gcode to printer otherwise
if self.bambu_client.connected:
GCODE_COMMAND = commands.SEND_GCODE_TEMPLATE
GCODE_COMMAND["print"]["param"] = data + "\n"
GCODE_COMMAND["print"]["param"] = full_command + "\n"
if self.bambu_client.publish(GCODE_COMMAND):
self._log.info("command sent successfully")
self._serial_io.sendOk()
@ -416,7 +422,7 @@ class BambuVirtualPrinter:
if self.bambu_client.connected:
self.bambu_client.disconnect()
self.sendIO("echo:EMERGENCY SHUTDOWN DETECTED. KILLED.")
self._serial_io.stop()
self._serial_io.close()
return True
@gcode_executor.register_no_data("M20")
@ -481,6 +487,7 @@ class BambuVirtualPrinter:
self.sendIO(f"Writing to file: {filename}")
def _finishSdFile(self):
# FIXME: maybe remove or move to remote SD card
try:
self._writingToSdHandle.close()
except Exception:
@ -515,4 +522,14 @@ class BambuVirtualPrinter:
def close(self):
if self.bambu_client.connected:
self.bambu_client.disconnect()
self._serial_io.stop()
self._serial_io.close()
def _showPrompt(self, text, choices):
self._hidePrompt()
self.sendIO(f"//action:prompt_begin {text}")
for choice in choices:
self.sendIO(f"//action:prompt_button {choice}")
self.sendIO("//action:prompt_show")
def _hidePrompt(self):
self.sendIO("//action:prompt_end")

View File

@ -1,6 +1,7 @@
import itertools
import logging
from inspect import signature
import traceback
GCODE_DOCUMENTATION = {
@ -311,7 +312,8 @@ class GCodeExecutor:
self._log.debug(f"ignoring {gcode_info} command.")
return True
except Exception as e:
self._log.error(f"Error {gcode_info}: {str(e)}")
self._log.error(f"Error during gcode {gcode_info}")
self._log.error(e, exc_info=True)
return False
def _gcode_with_info(self, gcode):

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import logging
import queue
import re
@ -16,28 +18,18 @@ class PrinterSerialIO(threading.Thread):
def __init__(
self,
handle_command_callback: Callable[[str, str, bytes], None],
handle_command_callback: Callable[[str, str, str], None],
settings,
serial_log_handler=None,
read_timeout=5.0,
write_timeout=10.0,
) -> None:
super().__init__(
name="octoprint.plugins.bambu_printer.wait_thread", daemon=True
name="octoprint.plugins.bambu_printer.serial_io_thread", daemon=True
)
self._handle_command_callback = handle_command_callback
self._settings = settings
self._serial_log = logging.getLogger(
"octoprint.plugins.bambu_printer.BambuPrinter.serial"
)
self._serial_log.setLevel(logging.CRITICAL)
self._serial_log.propagate = False
if serial_log_handler is not None:
self._serial_log.addHandler(serial_log_handler)
self._serial_log.setLevel(logging.INFO)
self._serial_log.debug("-" * 78)
self._log = self._init_logger(serial_log_handler)
self._read_timeout = read_timeout
self._write_timeout = write_timeout
@ -48,65 +40,106 @@ class PrinterSerialIO(threading.Thread):
self._rx_buffer_size = 64
self._incoming_lock = threading.RLock()
self._input_queue_empty = threading.Event()
self._input_queue_empty.set()
self._input_processing_finished = threading.Event()
self._input_processing_finished.set()
self.incoming = CharCountingQueue(self._rx_buffer_size, name="RxBuffer")
self.outgoing = queue.Queue()
self.input_bytes = CharCountingQueue(self._rx_buffer_size, name="RxBuffer")
self.output_bytes = queue.Queue()
def _init_logger(self, log_handler):
log = logging.getLogger("octoprint.plugins.bambu_printer.BambuPrinter.serial")
if log_handler is not None:
log.addHandler(log_handler)
log.debug("-" * 78)
return log
@property
def incoming_lock(self):
return self._incoming_lock
def run(self) -> None:
data = None
buffer = b""
buf = b""
while self.incoming is not None and self._running:
while self._running:
try:
data = self.incoming.get(timeout=0.01)
data = self.input_bytes.get(block=True, timeout=0.01)
data = to_bytes(data, encoding="ascii", errors="replace")
self.incoming.task_done()
self._input_queue_empty.clear()
self.input_bytes.task_done()
line, buffer = self._read_next_line_buffered(data, buffer)
while line is not None:
self._received_lines += 1
self._process_input_gcode_line(line)
line, buffer = self._read_next_line_buffered(data, buffer)
except queue.Empty:
self._input_queue_empty.set()
continue
except Exception:
if self.incoming is None:
# just got closed
break
if data is not None:
buf += data
nl = buf.find(b"\n") + 1
if nl > 0:
data = buf[:nl]
buf = buf[nl:]
else:
continue
if data is None:
continue
self._received_lines += 1
self._log.debug("Closing IO read loop")
def _read_next_line_buffered(self, additional: bytes, buffer: bytes):
buffer += additional
new_line_pos = buffer.find(b"\n") + 1
if new_line_pos > 0:
additional = buffer[:new_line_pos]
buffer = buffer[new_line_pos:]
return additional, buffer
def close(self):
self.flush()
self._running = False
self.join()
def flush(self):
with self.input_bytes.all_tasks_done:
self.input_bytes.all_tasks_done.wait()
def write(self, data: bytes) -> int:
data = to_bytes(data, errors="replace")
u_data = to_unicode(data, errors="replace")
with self._incoming_lock:
if self.is_closed():
return 0
try:
self._process_input_line(data)
finally:
self._input_processing_finished.set()
written = self.input_bytes.put(
data, timeout=self._write_timeout, partial=True
)
self._log.debug(f"<<< {u_data}")
return written
except queue.Full:
self._log.error(
"Incoming queue is full, raising SerialTimeoutException"
)
raise SerialTimeoutException()
self._serial_log.debug("Closing down read loop")
def readline(self) -> bytes:
try:
# fetch a line from the queue, wait no longer than timeout
line = to_unicode(
self.output_bytes.get(timeout=self._read_timeout), errors="replace"
)
self._log.debug(f">>> {line.strip()}")
self.output_bytes.task_done()
return to_bytes(line)
except queue.Empty:
# queue empty? return empty line
return b""
def stop(self):
self._running = False
def send(self, line: str) -> None:
if self.output_bytes is not None:
self.output_bytes.put(line)
def wait_for_input(self):
self._input_queue_empty.wait()
self._input_processing_finished.wait()
def sendOk(self):
self.send("ok")
def _process_input_line(self, data: bytes):
def reset(self):
self._clearQueue(self.input_bytes)
self._clearQueue(self.output_bytes)
def is_closed(self):
return not self._running
def _process_input_gcode_line(self, data: bytes):
if b"*" in data:
checksum = int(data[data.rfind(b"*") + 1 :])
data = data[: data.rfind(b"*")]
@ -148,75 +181,13 @@ class PrinterSerialIO(threading.Thread):
gcode = command_match.group(0)
gcode_letter = command_match.group(1)
self._handle_command_callback(gcode_letter, gcode, data)
def _showPrompt(self, text, choices):
self._hidePrompt()
self.send(f"//action:prompt_begin {text}")
for choice in choices:
self.send(f"//action:prompt_button {choice}")
self.send("//action:prompt_show")
def _hidePrompt(self):
self.send("//action:prompt_end")
def write(self, data: bytes) -> int:
data = to_bytes(data, errors="replace")
u_data = to_unicode(data, errors="replace")
with self._incoming_lock:
if self.is_closed():
return 0
try:
written = self.incoming.put(
data, timeout=self._write_timeout, partial=True
)
self._serial_log.debug(f"<<< {u_data}")
return written
except queue.Full:
self._serial_log.error(
"Incoming queue is full, raising SerialTimeoutException"
)
raise SerialTimeoutException()
def readline(self) -> bytes:
assert self.outgoing is not None
try:
# fetch a line from the queue, wait no longer than timeout
line = to_unicode(
self.outgoing.get(timeout=self._read_timeout), errors="replace"
)
self._serial_log.debug(f">>> {line.strip()}")
self.outgoing.task_done()
return to_bytes(line)
except queue.Empty:
# queue empty? return empty line
return b""
def send(self, line: str) -> None:
if self.outgoing is not None:
self.outgoing.put(line)
def sendOk(self):
self.send("ok")
def reset(self):
if self.incoming is not None:
self._clearQueue(self.incoming)
if self.outgoing is not None:
self._clearQueue(self.outgoing)
def close(self):
self.stop()
self.incoming = None
self.outgoing = None
def is_closed(self):
return self.incoming is None or self.outgoing is None
self._handle_command_callback(gcode_letter, gcode, command)
def _triggerResend(
self, expected: int = None, actual: int = None, checksum: int = None
self,
expected: int | None = None,
actual: int | None = None,
checksum: int | None = None,
) -> None:
with self._incoming_lock:
if expected is None:

View File

@ -86,7 +86,7 @@ def files_info_ftp():
def ftps_session_mock(files_info_ftp):
with unittest.mock.patch(
"octoprint_bambu_printer.printer.ftpsclient.ftpsclient.IoTFTPSClient"
) as ftps_client:
) as ftps_client_mock:
ftps_session = MagicMock()
ftps_session.size.side_effect = DictGetter(
{file: info[0] for file, info in files_info_ftp.items()}
@ -96,13 +96,13 @@ def ftps_session_mock(files_info_ftp):
)
all_files = list(files_info_ftp.keys())
ftps_client.list_files.side_effect = DictGetter(
ftps_client_mock.list_files.side_effect = DictGetter(
{
("", ".3mf"): all_files,
("cache/", ".3mf"): [f"cache/{file}" for file in all_files],
}
)
ftps_client.ftps_session = ftps_session
ftps_client_mock.ftps_session = ftps_session
yield
@ -130,7 +130,7 @@ def test_initial_state(printer: BambuVirtualPrinter):
def test_list_sd_card(printer: BambuVirtualPrinter):
printer.write(b"M20\n") # GCode for listing SD card
time.sleep(0.1)
printer.flush()
result = printer.readlines()
assert result == "" # Replace with the actual expected result