Fix state synchronization bugs. Fix unittests.

This commit is contained in:
Anton Skrypnyk 2024-07-24 17:15:47 +03:00
parent 53e1f88e1a
commit ed33fd8fb1
8 changed files with 260 additions and 153 deletions

View File

@ -6,9 +6,12 @@ import collections
from dataclasses import dataclass, field from dataclasses import dataclass, field
import math import math
import os import os
import queue
import re import re
import threading
import time import time
import asyncio import asyncio
from octoprint_bambu_printer.printer.print_job import PrintJob
from pybambu import BambuClient, commands from pybambu import BambuClient, commands
import logging import logging
import logging.handlers import logging.handlers
@ -19,7 +22,6 @@ from octoprint_bambu_printer.printer.states.a_printer_state import APrinterState
from octoprint_bambu_printer.printer.states.idle_state import IdleState from octoprint_bambu_printer.printer.states.idle_state import IdleState
from .printer_serial_io import PrinterSerialIO from .printer_serial_io import PrinterSerialIO
from .states.print_finished_state import PrintFinishedState
from .states.paused_state import PausedState from .states.paused_state import PausedState
from .states.printing_state import PrintingState from .states.printing_state import PrintingState
@ -62,8 +64,17 @@ class BambuVirtualPrinter:
self._state_idle = IdleState(self) self._state_idle = IdleState(self)
self._state_printing = PrintingState(self) self._state_printing = PrintingState(self)
self._state_paused = PausedState(self) self._state_paused = PausedState(self)
self._state_finished = PrintFinishedState(self)
self._current_state = self._state_idle self._current_state = self._state_idle
self._running = True
self._printer_thread = threading.Thread(
target=self._printer_worker,
name="octoprint.plugins.bambu_printer.printer_worker",
)
self._state_change_queue = queue.Queue()
self._current_print_job: PrintJob | None = None
self._serial_io = PrinterSerialIO( self._serial_io = PrinterSerialIO(
handle_command_callback=self._process_gcode_serial_command, handle_command_callback=self._process_gcode_serial_command,
settings=settings, settings=settings,
@ -77,21 +88,16 @@ class BambuVirtualPrinter:
"heatedChamber" "heatedChamber"
) )
self._running = True
self.file_system = RemoteSDCardFileList(settings) self.file_system = RemoteSDCardFileList(settings)
self._busy_reason = None
self._busy_loop = None
self._busy_interval = 2.0
self._settings = settings self._settings = settings
self._printer_profile_manager = printer_profile_manager self._printer_profile_manager = printer_profile_manager
self._faked_baudrate = faked_baudrate self._faked_baudrate = faked_baudrate
self._plugin_data_folder = data_folder
self._last_hms_errors = None self._last_hms_errors = None
self._serial_io.start() self._serial_io.start()
self._printer_thread.start()
self._bambu_client: BambuClient = None self._bambu_client: BambuClient = None
asyncio.get_event_loop().run_until_complete(self._create_connection_async()) asyncio.get_event_loop().run_until_complete(self._create_connection_async())
@ -112,20 +118,12 @@ class BambuVirtualPrinter:
@property @property
def current_print_job(self): def current_print_job(self):
if isinstance(self._current_state, PrintingState): if self._current_print_job is None:
return self._current_state.print_job self.update_print_job_info()
return None return self._current_print_job
def change_state(self, new_state: APrinterState): def change_state(self, new_state: APrinterState):
if self._current_state == new_state: self._state_change_queue.put(new_state)
return
self._log.debug(
f"Changing state from {self._current_state.__class__.__name__} to {new_state.__class__.__name__}"
)
self._current_state.finalize()
self._current_state = new_state
self._current_state.init()
def new_update(self, event_type): def new_update(self, event_type):
if event_type == "event_hms_errors": if event_type == "event_hms_errors":
@ -133,28 +131,49 @@ class BambuVirtualPrinter:
elif event_type == "event_printer_data_update": elif event_type == "event_printer_data_update":
self._update_printer_info() self._update_printer_info()
def update_print_job_info(self):
print_job_info = self.bambu_client.get_device().print_job
filename: str = print_job_info.get("subtask_name")
project_file_info = self.file_system.get_data_by_suffix(
filename, [".3mf", ".gcode.3mf"]
)
if project_file_info is None:
self._log.debug(f"No 3mf file found for {print_job_info}")
self._current_print_job = None
return
if self.file_system.select_file(filename):
self.sendOk()
# fuzzy math here to get print percentage to match BambuStudio
progress = print_job_info.get("print_percentage")
self._current_print_job = PrintJob(project_file_info, 0)
self._current_print_job.progress = progress
def _update_printer_info(self): def _update_printer_info(self):
device_data = self.bambu_client.get_device() device_data = self.bambu_client.get_device()
ams = device_data.ams.__dict__
print_job = device_data.print_job print_job = device_data.print_job
temperatures = device_data.temperature.__dict__ temperatures = device_data.temperature
lights = device_data.lights.__dict__
fans = device_data.fans.__dict__
speed = device_data.speed.__dict__
self.lastTempAt = time.monotonic() self.lastTempAt = time.monotonic()
self._telemetry.temp[0] = temperatures.get("nozzle_temp", 0.0) self._telemetry.temp[0] = temperatures.nozzle_temp
self._telemetry.targetTemp[0] = temperatures.get("target_nozzle_temp", 0.0) self._telemetry.targetTemp[0] = temperatures.target_nozzle_temp
self.bedTemp = temperatures.get("bed_temp", 0.0) self._telemetry.bedTemp = temperatures.bed_temp
self.bedTargetTemp = temperatures.get("target_bed_temp", 0.0) self._telemetry.bedTargetTemp = temperatures.target_bed_temp
self.chamberTemp = temperatures.get("chamber_temp", 0.0) self._telemetry.chamberTemp = temperatures.chamber_temp
if print_job.gcode_state == "RUNNING": if (
print_job.gcode_state == "IDLE"
or print_job.gcode_state == "FINISH"
or print_job.gcode_state == "FAILED"
):
self.change_state(self._state_idle)
elif print_job.gcode_state == "RUNNING":
self.change_state(self._state_printing) self.change_state(self._state_printing)
if print_job.gcode_state == "PAUSE": elif print_job.gcode_state == "PAUSE":
self.change_state(self._state_paused) self.change_state(self._state_paused)
if print_job.gcode_state == "FINISH" or print_job.gcode_state == "FAILED": else:
self.change_state(self._state_finished) self._log.warn(f"Unknown print job state: {print_job.gcode_state}")
def _update_hms_errors(self): def _update_hms_errors(self):
bambu_printer = self.bambu_client.get_device() bambu_printer = self.bambu_client.get_device()
@ -280,6 +299,7 @@ class BambuVirtualPrinter:
def flush(self): def flush(self):
self._serial_io.flush() self._serial_io.flush()
self._wait_for_state_change()
##~~ command implementations ##~~ command implementations
@ -337,7 +357,6 @@ class BambuVirtualPrinter:
@gcode_executor.register("M117") @gcode_executor.register("M117")
def _get_lcd_message(self, data: str) -> bool: def _get_lcd_message(self, data: str) -> bool:
# we'll just use this to echo a message, to allow playing around with pause triggers
result = re.search(r"M117\s+(.*)", data).group(1) result = re.search(r"M117\s+(.*)", data).group(1)
self.sendIO(f"echo:{result}") self.sendIO(f"echo:{result}")
return True return True
@ -386,16 +405,9 @@ class BambuVirtualPrinter:
self._log.info(f"{percent}% speed adjustment command sent successfully") self._log.info(f"{percent}% speed adjustment command sent successfully")
return True return True
def _process_gcode_serial_command( def _process_gcode_serial_command(self, gcode: str, full_command: str):
self, gcode_letter: str, gcode: str, full_command: str self._log.debug(f"processing gcode {gcode} command = {full_command}")
): handled = self.gcode_executor.execute(self, gcode, full_command)
self._log.debug(
f"processing gcode command letter = {gcode_letter} | gcode = {gcode} | full = {full_command}"
)
if gcode_letter in self.gcode_executor:
handled = self.gcode_executor.execute(self, gcode_letter, full_command)
else:
handled = self.gcode_executor.execute(self, gcode, full_command)
if handled: if handled:
self._serial_io.sendOk() self._serial_io.sendOk()
return return
@ -453,9 +465,12 @@ class BambuVirtualPrinter:
template = "{heater}:{actual:.2f}/ {target:.2f}" template = "{heater}:{actual:.2f}/ {target:.2f}"
temps = collections.OrderedDict() temps = collections.OrderedDict()
temps["T"] = (self._telemetry.temp[0], self._telemetry.targetTemp[0]) temps["T"] = (self._telemetry.temp[0], self._telemetry.targetTemp[0])
temps["B"] = (self.bedTemp, self.bedTargetTemp) temps["B"] = (self._telemetry.bedTemp, self._telemetry.bedTargetTemp)
if self._telemetry.hasChamber: if self._telemetry.hasChamber:
temps["C"] = (self.chamberTemp, self._telemetry.chamberTargetTemp) temps["C"] = (
self._telemetry.chamberTemp,
self._telemetry.chamberTargetTemp,
)
output = " ".join( output = " ".join(
map( map(
@ -480,6 +495,34 @@ class BambuVirtualPrinter:
self.bambu_client.disconnect() self.bambu_client.disconnect()
self.change_state(self._state_idle) self.change_state(self._state_idle)
self._serial_io.close() self._serial_io.close()
self.stop()
def stop(self):
self._running = False
self._printer_thread.join()
def _wait_for_state_change(self):
self._state_change_queue.join()
def _printer_worker(self):
while self._running:
try:
next_state = self._state_change_queue.get(timeout=0.01)
self._trigger_change_state(next_state)
self._state_change_queue.task_done()
except queue.Empty:
continue
def _trigger_change_state(self, new_state: APrinterState):
if self._current_state == new_state:
return
self._log.debug(
f"Changing state from {self._current_state.__class__.__name__} to {new_state.__class__.__name__}"
)
self._current_state.finalize()
self._current_state = new_state
self._current_state.init()
def _showPrompt(self, text, choices): def _showPrompt(self, text, choices):
self._hidePrompt() self._hidePrompt()

View File

@ -310,7 +310,7 @@ class GCodeExecutor:
return self.gcode_handlers_no_data[gcode](printer) return self.gcode_handlers_no_data[gcode](printer)
else: else:
self._log.debug(f"ignoring {gcode_info} command.") self._log.debug(f"ignoring {gcode_info} command.")
return True return False
except Exception as e: except Exception as e:
self._log.error(f"Error during gcode {gcode_info}") self._log.error(f"Error during gcode {gcode_info}")
raise raise

View File

@ -19,7 +19,7 @@ class PrinterSerialIO(threading.Thread, BufferedIOBase):
def __init__( def __init__(
self, self,
handle_command_callback: Callable[[str, str, str], None], handle_command_callback: Callable[[str, str], None],
settings, settings,
serial_log_handler=None, serial_log_handler=None,
read_timeout=5.0, read_timeout=5.0,
@ -192,13 +192,12 @@ class PrinterSerialIO(threading.Thread, BufferedIOBase):
command = to_unicode(data, encoding="ascii", errors="replace").strip() command = to_unicode(data, encoding="ascii", errors="replace").strip()
# actual command handling
command_match = self.command_regex.match(command) command_match = self.command_regex.match(command)
if command_match is not None: if command_match is not None:
gcode = command_match.group(0) gcode = command_match.group(0)
gcode_letter = command_match.group(1) self._handle_command_callback(gcode, command)
else:
self._handle_command_callback(gcode_letter, gcode, command) self._log.warn(f'Not a valid gcode command "{command}"')
def _triggerResend( def _triggerResend(
self, self,

View File

@ -116,16 +116,13 @@ class RemoteSDCardFileList:
} }
self._file_data_cache = {info.file_name: info for info in file_info_list} self._file_data_cache = {info.file_name: info for info in file_info_list}
def search_by_stem(self, file_stem: str, allowed_suffixes: list[str]): def get_data_by_suffix(self, file_name: str, allowed_suffixes: list[str]):
for file_name in self._file_data_cache: file_data = self._get_file_data(file_name)
file_data = self._get_file_data(file_name) if file_data is None:
if file_data is None: return None
continue file_path = file_data.path
file_path = file_data.path if any(s in allowed_suffixes for s in file_path.suffixes):
if file_path.stem == file_stem and any( return file_data
s in allowed_suffixes for s in file_path.suffixes
):
return file_data
return None return None
def select_file(self, file_path: str, check_already_open: bool = False) -> bool: def select_file(self, file_path: str, check_already_open: bool = False) -> bool:

View File

@ -8,6 +8,7 @@ if TYPE_CHECKING:
import threading import threading
import pybambu.commands
from octoprint.util import RepeatedTimer from octoprint.util import RepeatedTimer
from octoprint_bambu_printer.printer.states.a_printer_state import APrinterState from octoprint_bambu_printer.printer.states.a_printer_state import APrinterState
@ -42,3 +43,11 @@ class PausedState(APrinterState):
condition=self._pausedLock.is_set, condition=self._pausedLock.is_set,
) )
paused_timer.start() paused_timer.start()
def resume_print(self):
if self._printer.bambu_client.connected:
if self._printer.bambu_client.publish(pybambu.commands.RESUME):
self._log.info("print resumed")
self._printer.change_state(self._printer._state_printing)
else:
self._log.info("print resume failed")

View File

@ -1,16 +0,0 @@
from __future__ import annotations
from octoprint_bambu_printer.printer.states.a_printer_state import APrinterState
class PrintFinishedState(APrinterState):
def init(self):
if self._printer.current_print_job is not None:
self._printer.current_print_job.progress = 100
self._finishSdPrint()
def _finishSdPrint(self):
if self._printer.is_running:
self._printer.sendIO("Done printing file")
self._printer.change_state(self._printer._state_idle)

View File

@ -22,7 +22,7 @@ class PrintingState(APrinterState):
def __init__(self, printer: BambuVirtualPrinter) -> None: def __init__(self, printer: BambuVirtualPrinter) -> None:
super().__init__(printer) super().__init__(printer)
self._printing_lock = threading.Event() self._is_printing = False
self._print_job: PrintJob | None = None self._print_job: PrintJob | None = None
self._sd_printing_thread = None self._sd_printing_thread = None
@ -31,74 +31,53 @@ class PrintingState(APrinterState):
return self._print_job return self._print_job
def init(self): def init(self):
self._printing_lock.set() self._is_printing = True
self.update_print_job_info() self._printer.update_print_job_info()
self._start_worker_thread() self._start_worker_thread()
def finalize(self): def finalize(self):
self._printing_lock.clear()
if self._sd_printing_thread is not None and self._sd_printing_thread.is_alive(): if self._sd_printing_thread is not None and self._sd_printing_thread.is_alive():
self._is_printing = False
self._sd_printing_thread.join() self._sd_printing_thread.join()
self._sd_printing_thread = None self._sd_printing_thread = None
def _start_worker_thread(self): def _start_worker_thread(self):
if self._sd_printing_thread is None: if self._sd_printing_thread is None:
if not self._printing_lock.is_set(): self._is_printing = True
self._printing_lock.set()
self._sd_printing_thread = threading.Thread(target=self._printing_worker) self._sd_printing_thread = threading.Thread(target=self._printing_worker)
self._sd_printing_thread.start() self._sd_printing_thread.start()
def update_print_job_info(self): def _printing_worker(self):
print_job_info = self._printer.bambu_client.get_device().print_job while (
filename: str = print_job_info.get("subtask_name") self._is_printing
project_file_info = self._printer.file_system.search_by_stem( and self._printer.current_print_job is not None
filename, [".3mf", ".gcode.3mf"] and self._printer.current_print_job.file_position
) < self._printer.current_print_job.file_info.size
if project_file_info is None: ):
self._log.debug(f"No 3mf file found for {print_job_info}") self._printer.update_print_job_info()
self._print_job = None self._printer.report_print_job_status()
time.sleep(3)
if self._printer.current_print_job is None:
self._log.warn("Printing state was triggered with empty print job")
return return
if self._printer.file_system.select_file(filename): if (
self._printer.sendOk() self._printer.current_print_job.file_position
>= self._printer.current_print_job.file_info.size
# fuzzy math here to get print percentage to match BambuStudio ):
progress = print_job_info.get("print_percentage") self._finish_print()
self._print_job = PrintJob(project_file_info, 0)
self._print_job.progress = progress
def _printing_worker(self):
if self._print_job is not None:
while (
self._printer.is_running
and self._print_job.file_info is not None
and self._print_job.file_position < self._print_job.file_info.size
):
self.update_print_job_info()
self._printer.report_print_job_status()
time.sleep(3)
self._printing_lock.wait()
self._log.debug(
f"SD File Print finishing: {self._print_job.file_info.file_name}"
)
self._printer.change_state(self._printer._state_finished)
def pause_print(self): def pause_print(self):
if self._printer.bambu_client.connected: if self._printer.bambu_client.connected:
if self._printer.bambu_client.publish(pybambu.commands.PAUSE): if self._printer.bambu_client.publish(pybambu.commands.PAUSE):
self._log.info("print paused") self._log.info("print paused")
self._printer.change_state(self._printer._state_finished) self._printer.change_state(self._printer._state_paused)
else: else:
self._log.info("print pause failed") self._log.info("print pause failed")
def resume_print(self):
if self._printer.bambu_client.connected:
if self._printer.bambu_client.publish(pybambu.commands.RESUME):
self._log.info("print resumed")
else:
self._log.info("print resume failed")
def cancel_print(self): def cancel_print(self):
if self._printer.bambu_client.connected: if self._printer.bambu_client.connected:
if self._printer.bambu_client.publish(pybambu.commands.STOP): if self._printer.bambu_client.publish(pybambu.commands.STOP):
@ -106,3 +85,11 @@ class PrintingState(APrinterState):
self._printer.change_state(self._printer._state_finished) self._printer.change_state(self._printer._state_finished)
else: else:
self._log.info("print cancel failed") self._log.info("print cancel failed")
def _finish_print(self):
if self._printer.current_print_job is not None:
self._log.debug(
f"SD File Print finishing: {self._printer.current_print_job.file_info.file_name}"
)
self._printer.change_state(self._printer._state_idle)

View File

@ -6,20 +6,17 @@ from pathlib import Path
import time import time
from typing import Any from typing import Any
import unittest import unittest
from unittest.mock import MagicMock from unittest.mock import MagicMock, Mock
import unittest.mock import unittest.mock
from octoprint_bambu_printer.bambu_print_plugin import BambuPrintPlugin import pybambu
import pybambu.commands
from octoprint_bambu_printer.printer.bambu_virtual_printer import BambuVirtualPrinter from octoprint_bambu_printer.printer.bambu_virtual_printer import BambuVirtualPrinter
from octoprint_bambu_printer.printer.remote_sd_card_file_list import ( from octoprint_bambu_printer.printer.remote_sd_card_file_list import (
FileInfo,
RemoteSDCardFileList, RemoteSDCardFileList,
) )
from octoprint_bambu_printer.printer.states.idle_state import IdleState from octoprint_bambu_printer.printer.states.idle_state import IdleState
from octoprint_bambu_printer.printer.states.paused_state import PausedState from octoprint_bambu_printer.printer.states.paused_state import PausedState
from octoprint_bambu_printer.printer.states.print_finished_state import (
PrintFinishedState,
)
from octoprint_bambu_printer.printer.states.printing_state import PrintingState from octoprint_bambu_printer.printer.states.printing_state import PrintingState
from pytest import fixture from pytest import fixture
@ -125,7 +122,43 @@ def ftps_session_mock(files_info_ftp):
@fixture(scope="function") @fixture(scope="function")
def printer(output_test_folder, settings, profile_manager, log_test, ftps_session_mock): def print_job_mock():
print_job = MagicMock()
print_job.get.side_effect = DictGetter({"subtask_name": "", "print_percentage": 0})
return print_job
@fixture(scope="function")
def temperatures_mock():
temperatures = MagicMock()
temperatures.nozzle_temp = 0
temperatures.target_nozzle_temp = 0
temperatures.bed_temp = 0
temperatures.target_bed_temp = 0
temperatures.chamber_temp = 0
return temperatures
@fixture(scope="function")
def bambu_client_mock(print_job_mock, temperatures_mock) -> pybambu.BambuClient:
bambu_client = MagicMock()
bambu_client.connected = True
device_mock = MagicMock()
device_mock.print_job = print_job_mock
device_mock.temperatures = temperatures_mock
bambu_client.get_device.return_value = device_mock
return bambu_client
@fixture(scope="function")
def printer(
output_test_folder,
settings,
profile_manager,
log_test,
ftps_session_mock,
bambu_client_mock,
):
async def _mock_connection(self): async def _mock_connection(self):
pass pass
@ -138,7 +171,7 @@ def printer(output_test_folder, settings, profile_manager, log_test, ftps_sessio
read_timeout=0.01, read_timeout=0.01,
faked_baudrate=115200, faked_baudrate=115200,
) )
serial_obj._bambu_client = MagicMock() serial_obj._bambu_client = bambu_client_mock
yield serial_obj yield serial_obj
serial_obj.close() serial_obj.close()
@ -166,7 +199,9 @@ def test_cannot_start_print_without_file(printer: BambuVirtualPrinter):
assert isinstance(printer.current_state, IdleState) assert isinstance(printer.current_state, IdleState)
def test_non_existing_file_not_ok(printer: BambuVirtualPrinter): def test_non_existing_file_not_selected(printer: BambuVirtualPrinter):
assert printer.file_system.selected_file is None
printer.write(b"M23 non_existing.3mf\n") printer.write(b"M23 non_existing.3mf\n")
printer.flush() printer.flush()
result = printer.readlines() result = printer.readlines()
@ -174,7 +209,7 @@ def test_non_existing_file_not_ok(printer: BambuVirtualPrinter):
assert printer.file_system.selected_file is None assert printer.file_system.selected_file is None
def test_print_started_with_selected_file(printer: BambuVirtualPrinter): def test_print_started_with_selected_file(printer: BambuVirtualPrinter, print_job_mock):
assert printer.file_system.selected_file is None assert printer.file_system.selected_file is None
printer.write(b"M20\n") printer.write(b"M20\n")
@ -189,6 +224,10 @@ def test_print_started_with_selected_file(printer: BambuVirtualPrinter):
assert printer.file_system.selected_file is not None assert printer.file_system.selected_file is not None
assert printer.file_system.selected_file.file_name == "print.3mf" assert printer.file_system.selected_file.file_name == "print.3mf"
print_job_mock.get.side_effect = DictGetter(
{"subtask_name": "print.3mf", "print_percentage": 0}
)
printer.write(b"M24\n") printer.write(b"M24\n")
printer.flush() printer.flush()
@ -197,32 +236,81 @@ def test_print_started_with_selected_file(printer: BambuVirtualPrinter):
assert isinstance(printer.current_state, PrintingState) assert isinstance(printer.current_state, PrintingState)
def test_pause_print(printer: BambuVirtualPrinter): def test_pause_print(printer: BambuVirtualPrinter, bambu_client_mock, print_job_mock):
gcode = b"G28\nG1 X10 Y10\n" print_job_mock.get.side_effect = DictGetter(
printer.write(gcode) {"subtask_name": "print.3mf", "print_percentage": 0}
)
printer.write(b"M20\n")
printer.write(b"M23 print.3mf\n")
printer.write(b"M24\n")
printer.flush()
printer.readlines()
assert isinstance(printer.current_state, PrintingState)
bambu_client_mock.publish.return_value = True
printer.write(b"M25\n") # GCode for pausing the print printer.write(b"M25\n") # GCode for pausing the print
result = printer.readline() printer.flush()
result = printer.readlines()
assert result[0] == b"ok"
assert isinstance(printer.current_state, PausedState) assert isinstance(printer.current_state, PausedState)
def test_get_printing_info(printer: BambuVirtualPrinter): def test_events_update_printer_state(printer: BambuVirtualPrinter, print_job_mock):
gcode = b"G28\nG1 X10 Y10\n" print_job_mock.gcode_state = "RUNNING"
printer.write(gcode) printer.new_update("event_printer_data_update")
printer.write(b"M27\n") # GCode for getting printing info printer.flush()
result = printer.readline() assert isinstance(printer.current_state, PrintingState)
assert result == ""
print_job_mock.gcode_state = "PAUSE"
printer.new_update("event_printer_data_update")
printer.flush()
assert isinstance(printer.current_state, PausedState)
def test_abort_print(printer: BambuVirtualPrinter): print_job_mock.gcode_state = "IDLE"
gcode = b"G28\nG1 X10 Y10\n" printer.new_update("event_printer_data_update")
printer.write(gcode) printer.flush()
printer.write(b"M26\n") # GCode for aborting the print assert isinstance(printer.current_state, IdleState)
result = printer.readline()
print_job_mock.gcode_state = "FINISH"
printer.new_update("event_printer_data_update")
printer.flush()
assert isinstance(printer.current_state, IdleState)
print_job_mock.gcode_state = "FAILED"
printer.new_update("event_printer_data_update")
printer.flush()
assert isinstance(printer.current_state, IdleState) assert isinstance(printer.current_state, IdleState)
def test_print_finished(printer: BambuVirtualPrinter): def test_printer_info_check(printer: BambuVirtualPrinter):
printer.write(b"M27\n") # printer get info
printer.flush()
result = printer.readlines()
assert result[-1] == b"ok"
assert isinstance(printer.current_state, IdleState)
def test_abort_print(printer: BambuVirtualPrinter):
printer.write(b"M26\n") # GCode for aborting the print
printer.flush()
result = printer.readlines()
assert result[-1] == b"ok"
assert isinstance(printer.current_state, IdleState)
def test_regular_move(printer: BambuVirtualPrinter, bambu_client_mock):
gcode = b"G28\nG1 X10 Y10\n" gcode = b"G28\nG1 X10 Y10\n"
printer.write(gcode) printer.write(gcode)
result = printer.readline() printer.flush()
assert isinstance(printer.current_state, PrintFinishedState) result = printer.readlines()
assert result[-1] == b"ok"
gcode_command = pybambu.commands.SEND_GCODE_TEMPLATE
gcode_command["print"]["param"] = "G28\n"
bambu_client_mock.publish.assert_called_with(gcode_command)
gcode_command["print"]["param"] = "G1 X10 Y10\n"
bambu_client_mock.publish.assert_called_with(gcode_command)