Fix response messages. Fix filesystem name transformations.
This commit is contained in:
parent
19cac21db6
commit
42ba306e4f
@ -1,5 +1,6 @@
|
|||||||
from __future__ import absolute_import, annotations
|
from __future__ import absolute_import, annotations
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import flask
|
import flask
|
||||||
@ -195,17 +196,14 @@ class BambuPrintPlugin(
|
|||||||
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
|
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
|
||||||
)
|
)
|
||||||
if self._settings.get(["device_type"]) in ["X1", "X1C"]:
|
if self._settings.get(["device_type"]) in ["X1", "X1C"]:
|
||||||
timelapse_file_list = ftp.list_files("timelapse/", ".mp4") or []
|
timelapse_file_list = ftp.list_files("timelapse/", ".mp4")
|
||||||
else:
|
else:
|
||||||
timelapse_file_list = ftp.list_files("timelapse/", ".avi") or []
|
timelapse_file_list = ftp.list_files("timelapse/", ".avi")
|
||||||
for entry in timelapse_file_list:
|
for entry in timelapse_file_list:
|
||||||
if entry.startswith("/"):
|
filename = entry.name
|
||||||
filename = entry[1:].replace("timelapse/", "")
|
filesize = ftp.ftps_session.size(entry.as_posix())
|
||||||
else:
|
|
||||||
filename = entry.replace("timelapse/", "")
|
|
||||||
filesize = ftp.ftps_session.size(f"timelapse/{filename}")
|
|
||||||
date_str = ftp.ftps_session.sendcmd(
|
date_str = ftp.ftps_session.sendcmd(
|
||||||
f"MDTM timelapse/{filename}"
|
f"MDTM {entry.as_posix()}"
|
||||||
).replace("213 ", "")
|
).replace("213 ", "")
|
||||||
filedate = (
|
filedate = (
|
||||||
datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S")
|
datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S")
|
||||||
|
@ -67,7 +67,7 @@ class BambuVirtualPrinter:
|
|||||||
self._running = True
|
self._running = True
|
||||||
self._printer_thread = threading.Thread(
|
self._printer_thread = threading.Thread(
|
||||||
target=self._printer_worker,
|
target=self._printer_worker,
|
||||||
name="octoprint.plugins.bambu_printer.printer_worker",
|
name="octoprint.plugins.bambu_printer.printer_state",
|
||||||
)
|
)
|
||||||
self._state_change_queue = queue.Queue()
|
self._state_change_queue = queue.Queue()
|
||||||
|
|
||||||
@ -229,7 +229,7 @@ class BambuVirtualPrinter:
|
|||||||
bambu_client.on_connect = self.on_connect(bambu_client.on_connect)
|
bambu_client.on_connect = self.on_connect(bambu_client.on_connect)
|
||||||
bambu_client.connect(callback=self.new_update)
|
bambu_client.connect(callback=self.new_update)
|
||||||
self._log.info(f"bambu connection status: {bambu_client.connected}")
|
self._log.info(f"bambu connection status: {bambu_client.connected}")
|
||||||
self._serial_io.sendOk()
|
self.sendOk()
|
||||||
self._bambu_client = bambu_client
|
self._bambu_client = bambu_client
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
@ -304,11 +304,21 @@ class BambuVirtualPrinter:
|
|||||||
|
|
||||||
##~~ command implementations
|
##~~ command implementations
|
||||||
|
|
||||||
|
@gcode_executor.register_no_data("M21")
|
||||||
|
def _sd_status(self) -> None:
|
||||||
|
self.sendIO("SD card ok")
|
||||||
|
|
||||||
@gcode_executor.register("M23")
|
@gcode_executor.register("M23")
|
||||||
def _select_sd_file(self, data: str) -> bool:
|
def _select_sd_file(self, data: str) -> bool:
|
||||||
filename = data.split(maxsplit=1)[1].strip()
|
filename = data.split(maxsplit=1)[1].strip()
|
||||||
self._list_sd()
|
self._list_sd()
|
||||||
return self.file_system.select_file(filename)
|
if self.file_system.select_file(filename):
|
||||||
|
assert self.file_system.selected_file is not None
|
||||||
|
self.sendIO(
|
||||||
|
f"File opened: {self.file_system.selected_file.file_name} "
|
||||||
|
f"Size: {self.file_system.selected_file.size}"
|
||||||
|
)
|
||||||
|
self.sendIO("File selected")
|
||||||
|
|
||||||
@gcode_executor.register("M26")
|
@gcode_executor.register("M26")
|
||||||
def _set_sd_position(self, data: str) -> bool:
|
def _set_sd_position(self, data: str) -> bool:
|
||||||
@ -335,6 +345,7 @@ class BambuVirtualPrinter:
|
|||||||
else:
|
else:
|
||||||
self._sdstatus_reporter = None
|
self._sdstatus_reporter = None
|
||||||
|
|
||||||
|
self.update_print_job_info()
|
||||||
self.report_print_job_status()
|
self.report_print_job_status()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -350,8 +361,8 @@ class BambuVirtualPrinter:
|
|||||||
return self._processTemperatureQuery()
|
return self._processTemperatureQuery()
|
||||||
|
|
||||||
# noinspection PyUnusedLocal
|
# noinspection PyUnusedLocal
|
||||||
@gcode_executor.register("M115")
|
@gcode_executor.register_no_data("M115")
|
||||||
def _report_firmware_info(self, data: str) -> bool:
|
def _report_firmware_info(self) -> bool:
|
||||||
self.sendIO("Bambu Printer Integration")
|
self.sendIO("Bambu Printer Integration")
|
||||||
self.sendIO("Cap:EXTENDED_M20:1")
|
self.sendIO("Cap:EXTENDED_M20:1")
|
||||||
self.sendIO("Cap:LFN_WRITE:1")
|
self.sendIO("Cap:LFN_WRITE:1")
|
||||||
@ -412,7 +423,7 @@ class BambuVirtualPrinter:
|
|||||||
self._log.debug(f"processing gcode {gcode} command = {full_command}")
|
self._log.debug(f"processing gcode {gcode} command = {full_command}")
|
||||||
handled = self.gcode_executor.execute(self, gcode, full_command)
|
handled = self.gcode_executor.execute(self, gcode, full_command)
|
||||||
if handled:
|
if handled:
|
||||||
self._serial_io.sendOk()
|
self.sendOk()
|
||||||
return
|
return
|
||||||
|
|
||||||
# post gcode to printer otherwise
|
# post gcode to printer otherwise
|
||||||
@ -421,7 +432,7 @@ class BambuVirtualPrinter:
|
|||||||
GCODE_COMMAND["print"]["param"] = full_command + "\n"
|
GCODE_COMMAND["print"]["param"] = full_command + "\n"
|
||||||
if self.bambu_client.publish(GCODE_COMMAND):
|
if self.bambu_client.publish(GCODE_COMMAND):
|
||||||
self._log.info("command sent successfully")
|
self._log.info("command sent successfully")
|
||||||
self._serial_io.sendOk()
|
self.sendOk()
|
||||||
|
|
||||||
@gcode_executor.register_no_data("M112")
|
@gcode_executor.register_no_data("M112")
|
||||||
def _shutdown(self):
|
def _shutdown(self):
|
||||||
@ -432,8 +443,8 @@ class BambuVirtualPrinter:
|
|||||||
self._serial_io.close()
|
self._serial_io.close()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@gcode_executor.register_no_data("M20")
|
@gcode_executor.register("M20")
|
||||||
def _list_sd(self):
|
def _list_sd(self, data: str = ""):
|
||||||
self.sendIO("Begin file list")
|
self.sendIO("Begin file list")
|
||||||
for item in map(lambda f: f.get_log_info(), self.file_system.get_all_files()):
|
for item in map(lambda f: f.get_log_info(), self.file_system.get_all_files()):
|
||||||
self.sendIO(item)
|
self.sendIO(item)
|
||||||
@ -464,7 +475,7 @@ class BambuVirtualPrinter:
|
|||||||
else:
|
else:
|
||||||
self.sendIO("Not SD printing")
|
self.sendIO("Not SD printing")
|
||||||
|
|
||||||
def _generateTemperatureOutput(self) -> str:
|
def _create_temperature_message(self) -> str:
|
||||||
template = "{heater}:{actual:.2f}/ {target:.2f}"
|
template = "{heater}:{actual:.2f}/ {target:.2f}"
|
||||||
temps = collections.OrderedDict()
|
temps = collections.OrderedDict()
|
||||||
temps["T"] = (self._telemetry.temp[0], self._telemetry.targetTemp[0])
|
temps["T"] = (self._telemetry.temp[0], self._telemetry.targetTemp[0])
|
||||||
@ -487,7 +498,7 @@ class BambuVirtualPrinter:
|
|||||||
def _processTemperatureQuery(self) -> bool:
|
def _processTemperatureQuery(self) -> bool:
|
||||||
# includeOk = not self._okBeforeCommandOutput
|
# includeOk = not self._okBeforeCommandOutput
|
||||||
if self.bambu_client.connected:
|
if self.bambu_client.connected:
|
||||||
output = self._generateTemperatureOutput()
|
output = self._create_temperature_message()
|
||||||
self.sendIO(output)
|
self.sendIO(output)
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
|
@ -26,6 +26,7 @@ wrapper for FTPS server interactions
|
|||||||
|
|
||||||
import ftplib
|
import ftplib
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
from typing import Optional, Union, List
|
from typing import Optional, Union, List
|
||||||
@ -34,6 +35,7 @@ from contextlib import redirect_stdout
|
|||||||
import io
|
import io
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
|
||||||
class ImplicitTLS(ftplib.FTP_TLS):
|
class ImplicitTLS(ftplib.FTP_TLS):
|
||||||
"""ftplib.FTP_TLS sub-class to support implicit SSL FTPS"""
|
"""ftplib.FTP_TLS sub-class to support implicit SSL FTPS"""
|
||||||
|
|
||||||
@ -57,9 +59,9 @@ class ImplicitTLS(ftplib.FTP_TLS):
|
|||||||
conn, size = ftplib.FTP.ntransfercmd(self, cmd, rest)
|
conn, size = ftplib.FTP.ntransfercmd(self, cmd, rest)
|
||||||
|
|
||||||
if self._prot_p:
|
if self._prot_p:
|
||||||
conn = self.context.wrap_socket(conn,
|
conn = self.context.wrap_socket(
|
||||||
server_hostname=self.host,
|
conn, server_hostname=self.host, session=self.sock.session
|
||||||
session=self.sock.session) # this is the fix
|
) # this is the fix
|
||||||
return conn, size
|
return conn, size
|
||||||
|
|
||||||
|
|
||||||
@ -106,7 +108,8 @@ class IoTFTPSClient:
|
|||||||
self.ftps_session.set_debuglevel(0)
|
self.ftps_session.set_debuglevel(0)
|
||||||
|
|
||||||
self.welcome = self.ftps_session.connect(
|
self.welcome = self.ftps_session.connect(
|
||||||
host=self.ftps_host, port=self.ftps_port)
|
host=self.ftps_host, port=self.ftps_port
|
||||||
|
)
|
||||||
|
|
||||||
if self.ftps_user and self.ftps_pass:
|
if self.ftps_user and self.ftps_pass:
|
||||||
self.ftps_session.login(user=self.ftps_user, passwd=self.ftps_pass)
|
self.ftps_session.login(user=self.ftps_user, passwd=self.ftps_pass)
|
||||||
@ -137,7 +140,7 @@ class IoTFTPSClient:
|
|||||||
# Taken from ftplib.storbinary but with custom ssl handling
|
# Taken from ftplib.storbinary but with custom ssl handling
|
||||||
# due to the shitty bambu p1p ftps server TODO fix properly.
|
# due to the shitty bambu p1p ftps server TODO fix properly.
|
||||||
with open(source, "rb") as fp:
|
with open(source, "rb") as fp:
|
||||||
self.ftps_session.voidcmd('TYPE I')
|
self.ftps_session.voidcmd("TYPE I")
|
||||||
|
|
||||||
with self.ftps_session.transfercmd(f"STOR {dest}", rest) as conn:
|
with self.ftps_session.transfercmd(f"STOR {dest}", rest) as conn:
|
||||||
while 1:
|
while 1:
|
||||||
@ -152,7 +155,9 @@ class IoTFTPSClient:
|
|||||||
callback(buf)
|
callback(buf)
|
||||||
|
|
||||||
# shutdown ssl layer
|
# shutdown ssl layer
|
||||||
if ftplib._SSLSocket is not None and isinstance(conn, ftplib._SSLSocket):
|
if ftplib._SSLSocket is not None and isinstance(
|
||||||
|
conn, ftplib._SSLSocket
|
||||||
|
):
|
||||||
# Yeah this is suposed to be conn.unwrap
|
# Yeah this is suposed to be conn.unwrap
|
||||||
# But since we operate in prot p mode
|
# But since we operate in prot p mode
|
||||||
# we can close the connection always.
|
# we can close the connection always.
|
||||||
@ -185,19 +190,24 @@ class IoTFTPSClient:
|
|||||||
def mkdir(self, path: str) -> str:
|
def mkdir(self, path: str) -> str:
|
||||||
return self.ftps_session.mkd(path)
|
return self.ftps_session.mkd(path)
|
||||||
|
|
||||||
def list_files(self, path: str, file_pattern: Optional[str] = None) -> Union[List[str], None]:
|
def list_files(self, list_path: str, extensions: str | list[str] | None = None):
|
||||||
"""list files under a path inside the FTPS server"""
|
"""list files under a path inside the FTPS server"""
|
||||||
|
|
||||||
|
if extensions is None:
|
||||||
|
_extension_acceptable = lambda p: True
|
||||||
|
else:
|
||||||
|
if isinstance(extensions, str):
|
||||||
|
extensions = [extensions]
|
||||||
|
_extension_acceptable = lambda p: any(s in p.suffixes for s in extensions)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
files = self.ftps_session.nlst(path)
|
list_result = self.ftps_session.nlst(list_path) or []
|
||||||
if not files:
|
for file_name in list_result:
|
||||||
return
|
path = Path(list_path) / file_name
|
||||||
if file_pattern:
|
if _extension_acceptable(path):
|
||||||
return [f for f in files if file_pattern in f]
|
yield path
|
||||||
return files
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print(f"unexpected exception occurred: {ex}")
|
print(f"unexpected exception occurred: {ex}")
|
||||||
pass
|
|
||||||
return
|
|
||||||
|
|
||||||
def list_files_ex(self, path: str) -> Union[list[str], None]:
|
def list_files_ex(self, path: str) -> Union[list[str], None]:
|
||||||
"""list files under a path inside the FTPS server"""
|
"""list files under a path inside the FTPS server"""
|
||||||
@ -208,7 +218,8 @@ class IoTFTPSClient:
|
|||||||
s = f.getvalue()
|
s = f.getvalue()
|
||||||
files = []
|
files = []
|
||||||
for row in s.split("\n"):
|
for row in s.split("\n"):
|
||||||
if len(row) <= 0: continue
|
if len(row) <= 0:
|
||||||
|
continue
|
||||||
|
|
||||||
attribs = row.split(" ")
|
attribs = row.split(" ")
|
||||||
|
|
||||||
|
@ -17,4 +17,4 @@ class PrintJob:
|
|||||||
|
|
||||||
@progress.setter
|
@progress.setter
|
||||||
def progress(self, value):
|
def progress(self, value):
|
||||||
self.file_position = int(self.file_info.size * ((value + 1) / 100))
|
self.file_position = int(self.file_info.size * value / 100)
|
||||||
|
@ -27,7 +27,7 @@ class PrinterSerialIO(threading.Thread, BufferedIOBase):
|
|||||||
write_timeout=10.0,
|
write_timeout=10.0,
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(
|
super().__init__(
|
||||||
name="octoprint.plugins.bambu_printer.serial_io_thread", daemon=True
|
name="octoprint.plugins.bambu_printer.printer_worker", daemon=True
|
||||||
)
|
)
|
||||||
self._handle_command_callback = handle_command_callback
|
self._handle_command_callback = handle_command_callback
|
||||||
self._settings = settings
|
self._settings = settings
|
||||||
@ -115,8 +115,8 @@ class PrinterSerialIO(threading.Thread, BufferedIOBase):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.input_bytes.put(data, timeout=self._write_timeout)
|
|
||||||
self._log.debug(f"<<< {u_data}")
|
self._log.debug(f"<<< {u_data}")
|
||||||
|
self.input_bytes.put(data, timeout=self._write_timeout)
|
||||||
return len(data)
|
return len(data)
|
||||||
except queue.Full:
|
except queue.Full:
|
||||||
self._log.error(
|
self._log.error(
|
||||||
|
@ -77,18 +77,14 @@ class RemoteSDCardFileList:
|
|||||||
|
|
||||||
yield file_info
|
yield file_info
|
||||||
existing_files.append(file_info.file_name)
|
existing_files.append(file_info.file_name)
|
||||||
|
existing_files.append(file_info.dosname)
|
||||||
|
|
||||||
def _get_existing_files_info(self):
|
def _get_existing_files_info(self):
|
||||||
ftp = self._connect_ftps_server()
|
ftp = self._connect_ftps_server()
|
||||||
|
|
||||||
file_list = []
|
file_list = []
|
||||||
file_list.extend(ftp.list_files("", ".3mf") or [])
|
file_list.extend(ftp.list_files("", ".3mf"))
|
||||||
file_list.extend(
|
file_list.extend(ftp.list_files("cache/", ".3mf"))
|
||||||
[
|
|
||||||
(Path("cache/") / f).as_posix()
|
|
||||||
for f in (ftp.list_files("cache/", ".3mf") or [])
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
existing_files = []
|
existing_files = []
|
||||||
return list(self._scan_ftp_file_list(ftp, file_list, existing_files))
|
return list(self._scan_ftp_file_list(ftp, file_list, existing_files))
|
||||||
@ -112,7 +108,7 @@ class RemoteSDCardFileList:
|
|||||||
|
|
||||||
def get_all_files(self):
|
def get_all_files(self):
|
||||||
self._update_existing_files_info()
|
self._update_existing_files_info()
|
||||||
self._logger.debug(f"_getSdFiles return: {self._file_data_cache}")
|
self._logger.debug(f"get_all_files return: {self._file_data_cache}")
|
||||||
return list(self._file_data_cache.values())
|
return list(self._file_data_cache.values())
|
||||||
|
|
||||||
def _update_existing_files_info(self):
|
def _update_existing_files_info(self):
|
||||||
@ -124,7 +120,9 @@ class RemoteSDCardFileList:
|
|||||||
|
|
||||||
def _get_cached_data_by_suffix(self, file_stem: str, allowed_suffixes: list[str]):
|
def _get_cached_data_by_suffix(self, file_stem: str, allowed_suffixes: list[str]):
|
||||||
for suffix in allowed_suffixes:
|
for suffix in allowed_suffixes:
|
||||||
file_data = self._get_cached_file_data(f"{file_stem}{suffix}")
|
file_data = self._get_cached_file_data(
|
||||||
|
Path(file_stem).with_suffix(suffix).as_posix()
|
||||||
|
)
|
||||||
if file_data is not None:
|
if file_data is not None:
|
||||||
return file_data
|
return file_data
|
||||||
return None
|
return None
|
||||||
@ -154,9 +152,6 @@ class RemoteSDCardFileList:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
self._selected_file_info = file_info
|
self._selected_file_info = file_info
|
||||||
self._logger.info(
|
|
||||||
f"File opened: {self._selected_file_info.file_name} Size: {self._selected_file_info.size}"
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def delete_file(self, file_path: str) -> None:
|
def delete_file(self, file_path: str) -> None:
|
||||||
|
@ -36,7 +36,6 @@ class PrintingState(APrinterState):
|
|||||||
self._start_worker_thread()
|
self._start_worker_thread()
|
||||||
|
|
||||||
def finalize(self):
|
def finalize(self):
|
||||||
|
|
||||||
if self._sd_printing_thread is not None and self._sd_printing_thread.is_alive():
|
if self._sd_printing_thread is not None and self._sd_printing_thread.is_alive():
|
||||||
self._is_printing = False
|
self._is_printing = False
|
||||||
self._sd_printing_thread.join()
|
self._sd_printing_thread.join()
|
||||||
@ -82,7 +81,8 @@ class PrintingState(APrinterState):
|
|||||||
if self._printer.bambu_client.connected:
|
if self._printer.bambu_client.connected:
|
||||||
if self._printer.bambu_client.publish(pybambu.commands.STOP):
|
if self._printer.bambu_client.publish(pybambu.commands.STOP):
|
||||||
self._log.info("print cancelled")
|
self._log.info("print cancelled")
|
||||||
self._printer.change_state(self._printer._state_finished)
|
self._finish_print()
|
||||||
|
self._printer.change_state(self._printer._state_idle)
|
||||||
else:
|
else:
|
||||||
self._log.info("print cancel failed")
|
self._log.info("print cancel failed")
|
||||||
|
|
||||||
@ -91,5 +91,6 @@ class PrintingState(APrinterState):
|
|||||||
self._log.debug(
|
self._log.debug(
|
||||||
f"SD File Print finishing: {self._printer.current_print_job.file_info.file_name}"
|
f"SD File Print finishing: {self._printer.current_print_job.file_info.file_name}"
|
||||||
)
|
)
|
||||||
|
self._printer.sendIO("Done printing file")
|
||||||
|
|
||||||
self._printer.change_state(self._printer._state_idle)
|
self._printer.change_state(self._printer._state_idle)
|
||||||
|
@ -107,7 +107,10 @@ def ftps_session_mock(files_info_ftp):
|
|||||||
filter(lambda f: Path(f).parent == Path("."), all_files)
|
filter(lambda f: Path(f).parent == Path("."), all_files)
|
||||||
),
|
),
|
||||||
("cache/", ".3mf"): list(
|
("cache/", ".3mf"): list(
|
||||||
filter(lambda f: Path(f).parent == Path("cache/"), all_files)
|
map(
|
||||||
|
lambda f: Path(f).name,
|
||||||
|
filter(lambda f: Path(f).parent == Path("cache/"), all_files),
|
||||||
|
)
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@ -164,7 +167,7 @@ def printer(
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
BambuVirtualPrinter._create_client_connection_async = _mock_connection
|
BambuVirtualPrinter._create_client_connection_async = _mock_connection
|
||||||
serial_obj = BambuVirtualPrinter(
|
printer_test = BambuVirtualPrinter(
|
||||||
settings,
|
settings,
|
||||||
profile_manager,
|
profile_manager,
|
||||||
data_folder=output_test_folder,
|
data_folder=output_test_folder,
|
||||||
@ -172,9 +175,11 @@ def printer(
|
|||||||
read_timeout=0.01,
|
read_timeout=0.01,
|
||||||
faked_baudrate=115200,
|
faked_baudrate=115200,
|
||||||
)
|
)
|
||||||
serial_obj._bambu_client = bambu_client_mock
|
printer_test._bambu_client = bambu_client_mock
|
||||||
yield serial_obj
|
printer_test.flush()
|
||||||
serial_obj.close()
|
printer_test.readlines()
|
||||||
|
yield printer_test
|
||||||
|
printer_test.close()
|
||||||
|
|
||||||
|
|
||||||
def test_initial_state(printer: BambuVirtualPrinter):
|
def test_initial_state(printer: BambuVirtualPrinter):
|
||||||
@ -206,7 +211,8 @@ def test_non_existing_file_not_selected(printer: BambuVirtualPrinter):
|
|||||||
printer.write(b"M23 non_existing.3mf\n")
|
printer.write(b"M23 non_existing.3mf\n")
|
||||||
printer.flush()
|
printer.flush()
|
||||||
result = printer.readlines()
|
result = printer.readlines()
|
||||||
assert result[0] == b"ok"
|
assert result[-2] != b"File selected"
|
||||||
|
assert result[-1] == b"ok"
|
||||||
assert printer.file_system.selected_file is None
|
assert printer.file_system.selected_file is None
|
||||||
|
|
||||||
|
|
||||||
@ -220,7 +226,8 @@ def test_print_started_with_selected_file(printer: BambuVirtualPrinter, print_jo
|
|||||||
printer.write(b"M23 print.3mf\n")
|
printer.write(b"M23 print.3mf\n")
|
||||||
printer.flush()
|
printer.flush()
|
||||||
result = printer.readlines()
|
result = printer.readlines()
|
||||||
assert result[0] == b"ok"
|
assert result[-2] == b"File selected"
|
||||||
|
assert result[-1] == b"ok"
|
||||||
|
|
||||||
assert printer.file_system.selected_file is not None
|
assert printer.file_system.selected_file is not None
|
||||||
assert printer.file_system.selected_file.file_name == "print.3mf"
|
assert printer.file_system.selected_file.file_name == "print.3mf"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user