Implement idle state. Fix serial io

This commit is contained in:
Anton Skrypnyk 2024-07-24 17:15:46 +03:00
parent ba2eadb064
commit 98a1f59169
7 changed files with 218 additions and 206 deletions

View File

@ -67,13 +67,13 @@ class BambuVirtualPrinter:
self._state_finished = PrintFinishedState(self) self._state_finished = PrintFinishedState(self)
self._current_state = self._state_idle self._current_state = self._state_idle
self._serial_io = PrinterSerialIO( self._serial_io = PrinterSerialIO(
self._process_gcode_serial_command, handle_command_callback=self._process_gcode_serial_command,
serial_log_handler, settings=settings,
serial_log_handler=serial_log_handler,
read_timeout=read_timeout, read_timeout=read_timeout,
write_timeout=10.0, write_timeout=10.0,
) )
self.tick_rate = 2.0
self._telemetry = BambuPrinterTelemetry() self._telemetry = BambuPrinterTelemetry()
self._telemetry.hasChamber = printer_profile_manager.get_current().get( self._telemetry.hasChamber = printer_profile_manager.get_current().get(
"heatedChamber" "heatedChamber"
@ -153,7 +153,6 @@ class BambuVirtualPrinter:
if print_job.gcode_state == "RUNNING": if print_job.gcode_state == "RUNNING":
self.change_state(self._state_printing) self.change_state(self._state_printing)
self._state_printing.update_print_job_info()
if print_job.gcode_state == "PAUSE": if print_job.gcode_state == "PAUSE":
self.change_state(self._state_paused) self.change_state(self._state_paused)
if print_job.gcode_state == "FINISH" or print_job.gcode_state == "FAILED": if print_job.gcode_state == "FINISH" or print_job.gcode_state == "FAILED":
@ -228,17 +227,6 @@ class BambuVirtualPrinter:
def _reset(self): def _reset(self):
with self._serial_io.incoming_lock: with self._serial_io.incoming_lock:
self.lastN = 0 self.lastN = 0
self._debug_awol = False
self._debug_sleep = 0
# self._sleepAfterNext.clear()
# self._sleepAfter.clear()
self._dont_answer = False
self._broken_klipper_connection = False
self._debug_drop_connection = False
self._running = False self._running = False
if self._sdstatus_reporter is not None: if self._sdstatus_reporter is not None:
@ -249,7 +237,6 @@ class BambuVirtualPrinter:
for item in self._settings.get(["resetLines"]): for item in self._settings.get(["resetLines"]):
self.sendIO(item + "\n") self.sendIO(item + "\n")
self._locked = self._settings.get_boolean(["locked"])
self._serial_io.reset() self._serial_io.reset()
@property @property
@ -284,6 +271,15 @@ class BambuVirtualPrinter:
def readline(self) -> bytes: def readline(self) -> bytes:
return self._serial_io.readline() return self._serial_io.readline()
def readlines(self) -> list[bytes]:
result = []
self._serial_io.wait_for_input()
next_line = self._serial_io.readline()
while next_line != b"":
result.append(next_line)
next_line = self._serial_io.readline()
return result
def sendIO(self, line: str): def sendIO(self, line: str):
self.sendIO(line) self.sendIO(line)
@ -291,26 +287,24 @@ class BambuVirtualPrinter:
self._serial_io.sendOk() self._serial_io.sendOk()
##~~ command implementations ##~~ command implementations
def run_gcode_handler(self, gcode, data):
self.gcode_executor.execute(self, gcode, data)
@gcode_executor.register("M23") @gcode_executor.register("M23")
def _gcode_M23(self, data: str) -> bool: def _select_sd_file(self, data: str) -> bool:
filename = data.split(maxsplit=1)[1].strip() filename = data.split(maxsplit=1)[1].strip()
self.file_system.select_file(filename) self.file_system.select_file(filename)
return True return True
@gcode_executor.register("M26") @gcode_executor.register("M26")
def _gcode_M26(self, data: str) -> bool: def _set_sd_position(self, data: str) -> bool:
if data == "M26 S0": if data == "M26 S0":
return self._cancelSdPrint() return self._cancel_print()
else: else:
self._log.debug("ignoring M26 command.") self._log.debug("ignoring M26 command.")
self.sendIO("M26 disabled for Bambu") self.sendIO("M26 disabled for Bambu")
return True return True
@gcode_executor.register("M27") @gcode_executor.register("M27")
def _gcode_M27(self, data: str) -> bool: def _report_sd_print_status(self, data: str) -> bool:
matchS = re.search(r"S([0-9]+)", data) matchS = re.search(r"S([0-9]+)", data)
if matchS: if matchS:
interval = int(matchS.group(1)) interval = int(matchS.group(1))
@ -329,18 +323,18 @@ class BambuVirtualPrinter:
return True return True
@gcode_executor.register("M30") @gcode_executor.register("M30")
def _gcode_M30(self, data: str) -> bool: def _delete_sd_file(self, data: str) -> bool:
filename = data.split(None, 1)[1].strip() filename = data.split(None, 1)[1].strip()
self.file_system.delete_file(filename) self.file_system.delete_file(filename)
return True return True
@gcode_executor.register("M105") @gcode_executor.register("M105")
def _gcode_M105(self, data: str) -> bool: def _report_temperatures(self, data: str) -> bool:
return self._processTemperatureQuery() return self._processTemperatureQuery()
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@gcode_executor.register("M115") @gcode_executor.register("M115")
def _gcode_M115(self, data: str) -> bool: def _report_firmware_info(self, data: str) -> bool:
self.sendIO("Bambu Printer Integration") self.sendIO("Bambu Printer Integration")
self.sendIO("Cap:EXTENDED_M20:1") self.sendIO("Cap:EXTENDED_M20:1")
self.sendIO("Cap:LFN_WRITE:1") self.sendIO("Cap:LFN_WRITE:1")
@ -348,14 +342,14 @@ class BambuVirtualPrinter:
return True return True
@gcode_executor.register("M117") @gcode_executor.register("M117")
def _gcode_M117(self, data: str) -> bool: def _get_lcd_message(self, data: str) -> bool:
# we'll just use this to echo a message, to allow playing around with pause triggers # we'll just use this to echo a message, to allow playing around with pause triggers
result = re.search(r"M117\s+(.*)", data).group(1) result = re.search(r"M117\s+(.*)", data).group(1)
self.sendIO(f"echo:{result}") self.sendIO(f"echo:{result}")
return False return True
@gcode_executor.register("M118") @gcode_executor.register("M118")
def _gcode_M118(self, data: str) -> bool: def _serial_print(self, data: str) -> bool:
match = re.search(r"M118 (?:(?P<parameter>A1|E1|Pn[012])\s)?(?P<text>.*)", data) match = re.search(r"M118 (?:(?P<parameter>A1|E1|Pn[012])\s)?(?P<text>.*)", data)
if not match: if not match:
self.sendIO("Unrecognized command parameters for M118") self.sendIO("Unrecognized command parameters for M118")
@ -374,7 +368,7 @@ class BambuVirtualPrinter:
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@gcode_executor.register("M220") @gcode_executor.register("M220")
def _gcode_M220(self, data: str) -> bool: def _set_feedrate_percent(self, data: str) -> bool:
if self.bambu_client.connected: if self.bambu_client.connected:
gcode_command = commands.SEND_GCODE_TEMPLATE gcode_command = commands.SEND_GCODE_TEMPLATE
percent = int(data[1:]) percent = int(data[1:])
@ -401,9 +395,9 @@ class BambuVirtualPrinter:
def _process_gcode_serial_command(self, gcode_letter: str, gcode: str, data: bytes): def _process_gcode_serial_command(self, gcode_letter: str, gcode: str, data: bytes):
self._log.debug(f"processing gcode command {gcode_letter} {gcode} {data}") self._log.debug(f"processing gcode command {gcode_letter} {gcode} {data}")
if gcode_letter in self.gcode_executor: if gcode_letter in self.gcode_executor:
handled = self.run_gcode_handler(gcode_letter, data) handled = self.gcode_executor.execute(self, gcode_letter, data)
else: else:
handled = self.run_gcode_handler(gcode, data) handled = self.gcode_executor.execute(self, gcode, data)
if handled: if handled:
self._serial_io.sendOk() self._serial_io.sendOk()
return return
@ -416,39 +410,37 @@ class BambuVirtualPrinter:
self._log.info("command sent successfully") self._log.info("command sent successfully")
self._serial_io.sendOk() self._serial_io.sendOk()
##~~ further helpers
@gcode_executor.register_no_data("M112") @gcode_executor.register_no_data("M112")
def _kill(self): def _shutdown(self):
self._running = True self._running = True
if self.bambu_client.connected: if self.bambu_client.connected:
self.bambu_client.disconnect() self.bambu_client.disconnect()
self.sendIO("echo:EMERGENCY SHUTDOWN DETECTED. KILLED.") self.sendIO("echo:EMERGENCY SHUTDOWN DETECTED. KILLED.")
self._serial_io.stop() self._serial_io.stop()
return True
@gcode_executor.register_no_data("M20") @gcode_executor.register_no_data("M20")
def _listSd(self): def _list_sd(self):
self.sendIO("Begin file list") self.sendIO("Begin file list")
for item in map(lambda f: f.get_log_info(), self.file_system.get_all_files()): for item in map(lambda f: f.get_log_info(), self.file_system.get_all_files()):
self.sendIO(item) self.sendIO(item)
self.sendIO("End file list") self.sendIO("End file list")
return True
@gcode_executor.register_no_data("M24") @gcode_executor.register_no_data("M24")
def _startSdPrint(self, from_printer: bool = False) -> bool: def _start_print(self):
self._log.debug(f"_startSdPrint: from_printer={from_printer}") self._current_state.start_new_print()
self.change_state(self._state_printing) return True
@gcode_executor.register_no_data("M25") @gcode_executor.register_no_data("M25")
def _pauseSdPrint(self): def _pause_print(self):
if self.bambu_client.connected: self._current_state.pause_print()
if self.bambu_client.publish(commands.PAUSE): return True
self._log.info("print paused")
else:
self._log.info("print pause failed")
@gcode_executor.register("M524") @gcode_executor.register("M524")
def _cancelSdPrint(self) -> bool: def _cancel_print(self):
self._current_state.cancel() self._current_state.cancel_print()
return True
def report_print_job_status(self): def report_print_job_status(self):
print_job = self.current_print_job print_job = self.current_print_job

View File

@ -300,15 +300,19 @@ class GCodeExecutor:
def execute(self, printer, gcode, data): def execute(self, printer, gcode, data):
gcode_info = self._gcode_with_info(gcode) gcode_info = self._gcode_with_info(gcode)
if gcode in self.gcode_handlers: try:
self._log.debug(f"Executing {gcode_info}") if gcode in self.gcode_handlers:
return self.gcode_handlers[gcode](printer, data) self._log.debug(f"Executing {gcode_info}")
elif gcode in self.gcode_handlers_no_data: return self.gcode_handlers[gcode](printer, data)
self._log.debug(f"Executing {gcode_info}") elif gcode in self.gcode_handlers_no_data:
return self.gcode_handlers_no_data[gcode](printer) self._log.debug(f"Executing {gcode_info}")
else: return self.gcode_handlers_no_data[gcode](printer)
self._log.debug(f"ignoring {gcode_info} command.") else:
return True self._log.debug(f"ignoring {gcode_info} command.")
return True
except Exception as e:
self._log.error(f"Error {gcode_info}: {str(e)}")
return False
def _gcode_with_info(self, gcode): def _gcode_with_info(self, gcode):
return f"{gcode} ({GCODE_DOCUMENTATION.get(gcode, 'Info not specified')})" return f"{gcode} ({GCODE_DOCUMENTATION.get(gcode, 'Info not specified')})"

View File

@ -48,26 +48,19 @@ class PrinterSerialIO(threading.Thread):
self._rx_buffer_size = 64 self._rx_buffer_size = 64
self._incoming_lock = threading.RLock() self._incoming_lock = threading.RLock()
self._input_queue_empty = threading.Event()
self._input_queue_empty.set()
self._input_processing_finished = threading.Event()
self._input_processing_finished.set()
self.incoming = CharCountingQueue(self._rx_buffer_size, name="RxBuffer") self.incoming = CharCountingQueue(self._rx_buffer_size, name="RxBuffer")
self.outgoing = queue.Queue() self.outgoing = queue.Queue()
self.buffered = queue.Queue(maxsize=4)
self.command_queue = queue.Queue()
@property @property
def incoming_lock(self): def incoming_lock(self):
return self._incoming_lock return self._incoming_lock
def run(self) -> None: def run(self) -> None:
linenumber = 0
next_wait_timeout = 0
def recalculate_next_wait_timeout():
nonlocal next_wait_timeout
next_wait_timeout = time.monotonic() + self._wait_interval
recalculate_next_wait_timeout()
data = None data = None
buf = b"" buf = b""
@ -76,7 +69,9 @@ class PrinterSerialIO(threading.Thread):
data = self.incoming.get(timeout=0.01) data = self.incoming.get(timeout=0.01)
data = to_bytes(data, encoding="ascii", errors="replace") data = to_bytes(data, encoding="ascii", errors="replace")
self.incoming.task_done() self.incoming.task_done()
self._input_queue_empty.clear()
except queue.Empty: except queue.Empty:
self._input_queue_empty.set()
continue continue
except Exception: except Exception:
if self.incoming is None: if self.incoming is None:
@ -92,62 +87,69 @@ class PrinterSerialIO(threading.Thread):
else: else:
continue continue
recalculate_next_wait_timeout()
if data is None: if data is None:
continue continue
self._received_lines += 1 self._received_lines += 1
# strip checksum try:
if b"*" in data: self._process_input_line(data)
checksum = int(data[data.rfind(b"*") + 1 :]) finally:
data = data[: data.rfind(b"*")] self._input_processing_finished.set()
if not checksum == self._calculate_checksum(data):
self._triggerResend(expected=self.current_line + 1)
continue
self.current_line += 1 self._serial_log.debug("Closing down read loop")
elif self._settings.get_boolean(["forceChecksum"]):
self.send(self._format_error("checksum_missing"))
continue
# track N = N + 1
if data.startswith(b"N") and b"M110" in data:
linenumber = int(re.search(b"N([0-9]+)", data).group(1))
self.lastN = linenumber
self.current_line = linenumber
self.sendOk()
continue
elif data.startswith(b"N"):
linenumber = int(re.search(b"N([0-9]+)", data).group(1))
expected = self.lastN + 1
if linenumber != expected:
self._triggerResend(actual=linenumber)
continue
else:
self.lastN = linenumber
data = data.split(None, 1)[1].strip()
data += b"\n"
command = to_unicode(data, encoding="ascii", errors="replace").strip()
# actual command handling
command_match = self.command_regex.match(command)
if command_match is not None:
gcode = command_match.group(0)
gcode_letter = command_match.group(1)
self._handle_command_callback(gcode_letter, gcode, data)
self._serial_log.debug("Closing down read loop")
def stop(self): def stop(self):
self._running = False self._running = False
def wait_for_input(self):
self._input_queue_empty.wait()
self._input_processing_finished.wait()
def _process_input_line(self, data: bytes):
if b"*" in data:
checksum = int(data[data.rfind(b"*") + 1 :])
data = data[: data.rfind(b"*")]
if not checksum == self._calculate_checksum(data):
self._triggerResend(expected=self.current_line + 1)
return
self.current_line += 1
elif self._settings.get_boolean(["forceChecksum"]):
self.send(self._format_error("checksum_missing"))
return
# track N = N + 1
linenumber = 0
if data.startswith(b"N") and b"M110" in data:
linenumber = int(re.search(b"N([0-9]+)", data).group(1))
self.lastN = linenumber
self.current_line = linenumber
self.sendOk()
return
elif data.startswith(b"N"):
linenumber = int(re.search(b"N([0-9]+)", data).group(1))
expected = self.lastN + 1
if linenumber != expected:
self._triggerResend(actual=linenumber)
return
else:
self.lastN = linenumber
data = data.split(None, 1)[1].strip()
data += b"\n"
command = to_unicode(data, encoding="ascii", errors="replace").strip()
# actual command handling
command_match = self.command_regex.match(command)
if command_match is not None:
gcode = command_match.group(0)
gcode_letter = command_match.group(1)
self._handle_command_callback(gcode_letter, gcode, data)
def _showPrompt(self, text, choices): def _showPrompt(self, text, choices):
self._hidePrompt() self._hidePrompt()
self.send(f"//action:prompt_begin {text}") self.send(f"//action:prompt_begin {text}")
@ -180,11 +182,11 @@ class PrinterSerialIO(threading.Thread):
def readline(self) -> bytes: def readline(self) -> bytes:
assert self.outgoing is not None assert self.outgoing is not None
timeout = self._read_timeout
try: try:
# fetch a line from the queue, wait no longer than timeout # fetch a line from the queue, wait no longer than timeout
line = to_unicode(self.outgoing.get(timeout=timeout), errors="replace") line = to_unicode(
self.outgoing.get(timeout=self._read_timeout), errors="replace"
)
self._serial_log.debug(f">>> {line.strip()}") self._serial_log.debug(f">>> {line.strip()}")
self.outgoing.task_done() self.outgoing.task_done()
return to_bytes(line) return to_bytes(line)
@ -197,8 +199,6 @@ class PrinterSerialIO(threading.Thread):
self.outgoing.put(line) self.outgoing.put(line)
def sendOk(self): def sendOk(self):
if self.outgoing is None:
return
self.send("ok") self.send("ok")
def reset(self): def reset(self):

View File

@ -25,17 +25,19 @@ class APrinterState:
def handle_gcode(self, gcode): def handle_gcode(self, gcode):
self._log.debug(f"{self.__class__.__name__} gcode execution disabled") self._log.debug(f"{self.__class__.__name__} gcode execution disabled")
def connect(self): def start_new_print(self):
self._log_skip_state_transition("connect") self._log_skip_state_transition("start_new_print")
def pause(self): def pause_print(self):
self._log_skip_state_transition("pause") self._log_skip_state_transition("pause_print")
def cancel(self): def cancel_print(self):
self._log_skip_state_transition("cancel") self._log_skip_state_transition("cancel_print")
def resume(self): def resume_print(self):
self._log_skip_state_transition("resume") self._log_skip_state_transition("resume_print")
def _log_skip_state_transition(self, method): def _log_skip_state_transition(self, method):
self._log.debug(f"skipping {self.__class__.__name__} state transition {method}") self._log.debug(
f"skipping {self.__class__.__name__} state transition for '{method}'"
)

View File

@ -4,4 +4,50 @@ from octoprint_bambu_printer.printer.states.a_printer_state import APrinterState
class IdleState(APrinterState): class IdleState(APrinterState):
pass
def start_new_print(self):
selected_file = self._printer.file_system.selected_file
if selected_file is None:
self._log.warn("Cannot start print job if file was not selected")
self._printer.change_state(self._printer._state_idle)
return
print_command = self._get_print_command_for_file(selected_file)
if self._printer.bambu_client.publish(print_command):
self._log.info(f"Started print for {selected_file.file_name}")
self._printer.change_state(self._printer._state_printing)
else:
self._log.warn(f"Failed to start print for {selected_file.file_name}")
self._printer.change_state(self._printer._state_idle)
def _get_print_command_for_file(self, selected_file):
print_command = {
"print": {
"sequence_id": 0,
"command": "project_file",
"param": "Metadata/plate_1.gcode",
"md5": "",
"profile_id": "0",
"project_id": "0",
"subtask_id": "0",
"task_id": "0",
"subtask_name": f"{selected_file}",
"file": f"{selected_file}",
"url": (
f"file:///mnt/sdcard/{selected_file}"
if self._printer._settings.get_boolean(["device_type"])
in ["X1", "X1C"]
else f"file:///sdcard/{selected_file}"
),
"timelapse": self._printer._settings.get_boolean(["timelapse"]),
"bed_leveling": self._printer._settings.get_boolean(["bed_leveling"]),
"flow_cali": self._printer._settings.get_boolean(["flow_cali"]),
"vibration_cali": self._printer._settings.get_boolean(
["vibration_cali"]
),
"layer_inspect": self._printer._settings.get_boolean(["layer_inspect"]),
"use_ams": self._printer._settings.get_boolean(["use_ams"]),
}
}
return print_command

View File

@ -22,7 +22,7 @@ class PrintingState(APrinterState):
def __init__(self, printer: BambuVirtualPrinter) -> None: def __init__(self, printer: BambuVirtualPrinter) -> None:
super().__init__(printer) super().__init__(printer)
self._printingLock = threading.Event() self._printing_lock = threading.Event()
self._print_job: PrintJob | None = None self._print_job: PrintJob | None = None
self._sd_printing_thread = None self._sd_printing_thread = None
@ -31,20 +31,22 @@ class PrintingState(APrinterState):
return self._print_job return self._print_job
def init(self): def init(self):
if not self._printingLock.is_set(): self._printing_lock.set()
self._printingLock.set() self.update_print_job_info()
self._start_worker_thread()
def finalize(self): def finalize(self):
if self._printingLock.is_set(): self._printing_lock.clear()
self._printingLock.clear()
def _start_worker_thread(self, from_printer: bool = False): if self._sd_printing_thread is not None and self._sd_printing_thread.is_alive():
self._sd_printing_thread.join()
self._sd_printing_thread = None
def _start_worker_thread(self):
if self._sd_printing_thread is None: if self._sd_printing_thread is None:
self._sdPrinting = True if not self._printing_lock.is_set():
self._sdPrintStarting = True self._printing_lock.set()
self._sd_printing_thread = threading.Thread( self._sd_printing_thread = threading.Thread(target=self._printing_worker)
target=self._printing_worker, kwargs={"from_printer": from_printer}
)
self._sd_printing_thread.start() self._sd_printing_thread.start()
def update_print_job_info(self): def update_print_job_info(self):
@ -55,88 +57,52 @@ class PrintingState(APrinterState):
) )
if project_file_info is None: if project_file_info is None:
self._log.debug(f"No 3mf file found for {print_job_info}") self._log.debug(f"No 3mf file found for {print_job_info}")
self._print_job = None
return return
if self._printer.file_system.select_file(filename): if self._printer.file_system.select_file(filename):
self._printer.sendOk() self._printer.sendOk()
self.start_new_print(from_printer=True)
# fuzzy math here to get print percentage to match BambuStudio # fuzzy math here to get print percentage to match BambuStudio
progress = print_job_info.get("print_percentage") progress = print_job_info.get("print_percentage")
self._print_job = PrintJob(project_file_info, 0) self._print_job = PrintJob(project_file_info, 0)
self._print_job.progress = progress self._print_job.progress = progress
def start_new_print(self, from_printer: bool = False): def _printing_worker(self):
if self._printer.file_system.selected_file is not None: if self._print_job is not None:
self._start_worker_thread(from_printer) while (
self._printer.is_running
if self._sd_printing_thread is not None: and self._print_job.file_info is not None
if self._printer.bambu_client.connected: and self._print_job.file_position < self._print_job.file_info.size
if self._printer.bambu_client.publish(pybambu.commands.RESUME): ):
self._log.info("print resumed") self.update_print_job_info()
else:
self._log.info("print resume failed")
return True
def _printing_worker(self, from_printer: bool = False):
try:
if not from_printer and self._printer.bambu_client.connected:
selected_file = self._printer.file_system.selected_file
print_command = {
"print": {
"sequence_id": 0,
"command": "project_file",
"param": "Metadata/plate_1.gcode",
"md5": "",
"profile_id": "0",
"project_id": "0",
"subtask_id": "0",
"task_id": "0",
"subtask_name": f"{selected_file}",
"file": f"{selected_file}",
"url": (
f"file:///mnt/sdcard/{selected_file}"
if self._printer._settings.get_boolean(["device_type"])
in ["X1", "X1C"]
else f"file:///sdcard/{selected_file}"
),
"timelapse": self._printer._settings.get_boolean(["timelapse"]),
"bed_leveling": self._printer._settings.get_boolean(
["bed_leveling"]
),
"flow_cali": self._printer._settings.get_boolean(["flow_cali"]),
"vibration_cali": self._printer._settings.get_boolean(
["vibration_cali"]
),
"layer_inspect": self._printer._settings.get_boolean(
["layer_inspect"]
),
"use_ams": self._printer._settings.get_boolean(["use_ams"]),
}
}
self._printer.bambu_client.publish(print_command)
while self._print_job.file_position < self._print_job.file_info.size:
if self._printer.is_running:
break
self._printingLock.wait()
self._printer.report_print_job_status() self._printer.report_print_job_status()
time.sleep(3) time.sleep(3)
self._log.debug(f"SD File Print: {self._selectedSdFile}") self._printing_lock.wait()
except AttributeError: self._log.debug(
if self.outgoing is not None: f"SD File Print finishing: {self._print_job.file_info.file_name}"
raise )
self._printer.change_state(self._printer._state_finished) self._printer.change_state(self._printer._state_finished)
def cancel(self): def pause_print(self):
if self._printer.bambu_client.connected:
if self._printer.bambu_client.publish(pybambu.commands.PAUSE):
self._log.info("print paused")
self._printer.change_state(self._printer._state_finished)
else:
self._log.info("print pause failed")
def resume_print(self):
if self._printer.bambu_client.connected:
if self._printer.bambu_client.publish(pybambu.commands.RESUME):
self._log.info("print resumed")
else:
self._log.info("print resume failed")
def cancel_print(self):
if self._printer.bambu_client.connected: if self._printer.bambu_client.connected:
if self._printer.bambu_client.publish(pybambu.commands.STOP): if self._printer.bambu_client.publish(pybambu.commands.STOP):
self._log.info("print cancelled") self._log.info("print cancelled")
self._printer.change_state(self._printer._state_finished) self._printer.change_state(self._printer._state_finished)
return True
else: else:
self._log.info("print cancel failed") self._log.info("print cancel failed")
return False
return False

View File

@ -2,6 +2,7 @@ from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
import logging import logging
from pathlib import Path from pathlib import Path
import time
import unittest import unittest
from unittest.mock import MagicMock from unittest.mock import MagicMock
import unittest.mock import unittest.mock
@ -50,6 +51,7 @@ def settings(output_test_folder):
"access_code": "12345", "access_code": "12345",
} }
) )
_settings.get_boolean.side_effect = DictGetter({"forceChecksum": False})
log_file_path = output_test_folder / "log.txt" log_file_path = output_test_folder / "log.txt"
log_file_path.touch() log_file_path.touch()
@ -128,13 +130,13 @@ def test_initial_state(printer: BambuVirtualPrinter):
def test_list_sd_card(printer: BambuVirtualPrinter): def test_list_sd_card(printer: BambuVirtualPrinter):
printer.write(b"M20\n") # GCode for listing SD card printer.write(b"M20\n") # GCode for listing SD card
result = printer.readline() time.sleep(0.1)
result = printer.readlines()
assert result == "" # Replace with the actual expected result assert result == "" # Replace with the actual expected result
def test_start_print(printer: BambuVirtualPrinter): def test_start_print(printer: BambuVirtualPrinter):
gcode = b"G28\nG1 X10 Y10\n" printer.write(b"M\n")
printer.write(gcode)
result = printer.readline() result = printer.readline()
assert isinstance(printer.current_state, PrintingState) assert isinstance(printer.current_state, PrintingState)