Move all ftp operations to printer file system.

This commit is contained in:
Anton Skrypnyk 2024-07-25 16:51:15 +03:00
parent 55b78cea05
commit 1f7eed6b23
16 changed files with 439 additions and 428 deletions

View File

@ -1,10 +1,9 @@
from __future__ import absolute_import, annotations
import os
from pathlib import Path
import threading
import time
from time import perf_counter
from contextlib import contextmanager
import flask
import datetime
import logging.handlers
from urllib.parse import quote as urlquote
@ -13,7 +12,7 @@ import octoprint.server
import octoprint.plugin
from octoprint.events import Events
import octoprint.settings
from octoprint.util import get_formatted_size, get_formatted_datetime, is_hidden_path
from octoprint.util import is_hidden_path
from octoprint.server.util.flask import no_firstrun_access
from octoprint.server.util.tornado import (
LargeResponseHandler,
@ -24,10 +23,18 @@ from octoprint.logging.handlers import CleaningTimedRotatingFileHandler
from pybambu import BambuCloud
from .printer.ftpsclient.ftpsclient import IoTFTPSClient
from .printer.file_system.bambu_timelapse_file_info import (
BambuTimelapseFileInfo,
)
from .printer.bambu_virtual_printer import BambuVirtualPrinter
@contextmanager
def measure_elapsed():
start = perf_counter()
yield lambda: perf_counter() - start
class BambuPrintPlugin(
octoprint.plugin.SettingsPlugin,
octoprint.plugin.TemplatePlugin,
@ -36,6 +43,7 @@ class BambuPrintPlugin(
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.BlueprintPlugin,
):
_printer: BambuVirtualPrinter
_logger: logging.Logger
_plugin_manager: octoprint.plugin.PluginManager
@ -120,24 +128,16 @@ class BambuPrintPlugin(
sd_upload_started(filename, filename)
def process():
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
elapsed = time.monotonic()
try:
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
if ftp.upload_file(path, f"{filename}"):
elapsed = time.monotonic() - elapsed
sd_upload_succeeded(filename, filename, elapsed)
# remove local file after successful upload to Bambu
# self._file_manager.remove_file("local", filename)
else:
raise Exception("upload failed")
except Exception as e:
elapsed = time.monotonic() - elapsed
sd_upload_failed(filename, filename, elapsed)
self._logger.debug(f"Error uploading file {filename}")
with measure_elapsed() as get_elapsed:
try:
with self._printer.file_system.get_ftps_client() as ftp:
if ftp.upload_file(path, f"{filename}"):
sd_upload_succeeded(filename, filename, get_elapsed())
else:
raise Exception("upload failed")
except Exception as e:
sd_upload_failed(filename, filename, get_elapsed())
self._logger.exception(e)
thread = threading.Thread(target=process)
thread.daemon = True
@ -188,49 +188,13 @@ class BambuPrintPlugin(
if flask.request.path.startswith("/api/timelapse"):
def process():
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
return_file_list = []
try:
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
if self._settings.get(["device_type"]) in ["X1", "X1C"]:
timelapse_file_list = ftp.list_files("timelapse/", ".mp4")
else:
timelapse_file_list = ftp.list_files("timelapse/", ".avi")
for entry in timelapse_file_list:
filename = entry.name
filesize = ftp.ftps_session.size(entry.as_posix())
date_str = ftp.ftps_session.sendcmd(
f"MDTM {entry.as_posix()}"
).replace("213 ", "")
filedate = (
datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S")
.replace(tzinfo=datetime.timezone.utc)
.timestamp()
)
return_file_list.append(
{
"bytes": filesize,
"date": get_formatted_datetime(
datetime.datetime.fromtimestamp(filedate)
),
"name": filename,
"size": get_formatted_size(filesize),
"thumbnail": "/plugin/bambu_printer/thumbnail/"
+ filename.replace(".mp4", ".jpg").replace(
".avi", ".jpg"
),
"timestamp": filedate,
"url": f"/plugin/bambu_printer/timelapse/{filename}",
}
)
self._plugin_manager.send_plugin_message(
self._identifier, {"files": return_file_list}
)
except Exception as e:
self._logger.debug(f"Error getting timelapse files: {e}")
for file_info in self._printer.file_system.get_all_timelapse_files():
timelapse_info = BambuTimelapseFileInfo.from_file_info(file_info)
return_file_list.append(timelapse_info.to_dict())
self._plugin_manager.send_plugin_message(
self._identifier, {"files": return_file_list}
)
thread = threading.Thread(target=process)
thread.daemon = True
@ -239,22 +203,24 @@ class BambuPrintPlugin(
def _hook_octoprint_server_api_before_request(self, *args, **kwargs):
return [self.get_timelapse_file_list]
def _download_file(self, file_name: str, source_path: str):
destination = Path(self.get_plugin_data_folder()) / file_name
if destination.exists():
return destination
with self._printer.file_system.get_ftps_client() as ftp:
ftp.download_file(
source=(Path(source_path) / file_name).as_posix(),
dest=destination.as_posix(),
)
return destination
@octoprint.plugin.BlueprintPlugin.route("/timelapse/<filename>", methods=["GET"])
@octoprint.server.util.flask.restricted_access
@no_firstrun_access
@Permissions.TIMELAPSE_DOWNLOAD.require(403)
def downloadTimelapse(self, filename):
dest_filename = os.path.join(self.get_plugin_data_folder(), filename)
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
if not os.path.exists(dest_filename):
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
download_result = ftp.download_file(
source=f"timelapse/{filename}",
dest=dest_filename,
)
self._download_file(filename, "timelapse/")
return flask.redirect(
"/plugin/bambu_printer/download/timelapse/" + urlquote(filename), code=302
)
@ -264,17 +230,7 @@ class BambuPrintPlugin(
@no_firstrun_access
@Permissions.TIMELAPSE_DOWNLOAD.require(403)
def downloadThumbnail(self, filename):
dest_filename = os.path.join(self.get_plugin_data_folder(), filename)
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
if not os.path.exists(dest_filename):
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
download_result = ftp.download_file(
source=f"timelapse/thumbnail/{filename}",
dest=dest_filename,
)
self._download_file(filename, "timelapse/thumbnail/")
return flask.redirect(
"/plugin/bambu_printer/download/thumbnail/" + urlquote(filename), code=302
)

View File

@ -22,7 +22,7 @@ from .states.paused_state import PausedState
from .states.printing_state import PrintingState
from .gcode_executor import GCodeExecutor
from .remote_sd_card_file_list import RemoteSDCardFileList
from .file_system.remote_sd_card_file_list import RemoteSDCardFileList
AMBIENT_TEMPERATURE: float = 21.3
@ -293,7 +293,7 @@ class BambuVirtualPrinter:
def _select_sd_file(self, data: str) -> bool:
filename = data.split(maxsplit=1)[1].strip()
self._list_sd()
if not self.file_system.select_file(filename):
if not self.file_system.select_project_file(filename):
return False
assert self.file_system.selected_file is not None
@ -431,7 +431,9 @@ class BambuVirtualPrinter:
@gcode_executor.register("M20")
def _list_sd(self, data: str = ""):
self.sendIO("Begin file list")
for item in map(lambda f: f.get_log_info(), self.file_system.get_all_files()):
for item in map(
lambda f: f.get_log_info(), self.file_system.get_all_project_files()
):
self.sendIO(item)
self.sendIO("End file list")
return True

View File

@ -1,67 +0,0 @@
import queue
import time
class CharCountingQueue(queue.Queue):
def __init__(self, maxsize, name=None):
queue.Queue.__init__(self, maxsize=maxsize)
self._size = 0
self._name = name
def clear(self):
with self.mutex:
self.queue.clear()
def put(self, item, block=True, timeout=None, partial=False) -> int:
self.not_full.acquire()
try:
if not self._will_it_fit(item) and partial:
space_left = self.maxsize - self._qsize()
if space_left:
item = item[:space_left]
if not block:
if not self._will_it_fit(item):
raise queue.Full
elif timeout is None:
while not self._will_it_fit(item):
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = time.monotonic() + timeout
while not self._will_it_fit(item):
remaining = endtime - time.monotonic()
if remaining <= 0:
raise queue.Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
return self._len(item)
finally:
self.not_full.release()
# noinspection PyMethodMayBeStatic
def _len(self, item):
return len(item)
def _qsize(self, l=len): # noqa: E741
return self._size
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
self._size += self._len(item)
# Get an item from the queue
def _get(self):
item = self.queue.popleft()
self._size -= self._len(item)
return item
def _will_it_fit(self, item):
return self.maxsize - self._qsize() >= self._len(item)

View File

@ -0,0 +1,34 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
from pathlib import Path
from .file_info import FileInfo
from octoprint.util import get_formatted_size, get_formatted_datetime
@dataclass(frozen=True)
class BambuTimelapseFileInfo:
bytes: int
date: str | None
name: str
size: str
thumbnail: str
timestamp: float
url: str
def to_dict(self):
return asdict(self)
@staticmethod
def from_file_info(file_info: FileInfo):
return BambuTimelapseFileInfo(
bytes=file_info.size,
date=get_formatted_datetime(file_info.date),
name=file_info.file_name,
size=get_formatted_size(file_info.size),
thumbnail=f"/plugin/bambu_printer/thumbnail/{file_info.path.stem}.jpg",
timestamp=file_info.timestamp,
url=f"/plugin/bambu_printer/timelapse/{file_info.file_name}",
)

View File

@ -0,0 +1,71 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from octoprint_bambu_printer.printer.file_system.remote_sd_card_file_list import (
RemoteSDCardFileList,
)
from dataclasses import dataclass, field
from pathlib import Path
from octoprint_bambu_printer.printer.file_system.file_info import FileInfo
@dataclass
class CachedFileView:
file_system: RemoteSDCardFileList
folder_view: set[tuple[str, str | list[str] | None]] = field(default_factory=set)
def __post_init__(self):
self._file_alias_cache = {}
self._file_data_cache = {}
def with_filter(
self, folder: str, extensions: str | list[str] | None = None
) -> "CachedFileView":
self.folder_view.add((folder, extensions))
return self
def list_all_views(self):
existing_files = []
result = []
with self.file_system.get_ftps_client() as ftp:
for filter in self.folder_view:
result.extend(self.file_system.list_files(*filter, ftp, existing_files))
return result
def update(self):
file_info_list = self.list_all_views()
self._file_alias_cache = {
info.dosname: info.file_name for info in file_info_list
}
self._file_data_cache = {info.file_name: info for info in file_info_list}
def get_all_cached_info(self):
return list(self._file_data_cache.values())
def get_file_by_suffix(self, file_stem: str, allowed_suffixes: list[str]):
if file_stem == "":
return None
file_data = self._get_file_by_suffix_cached(file_stem, allowed_suffixes)
if file_data is None:
self.update()
file_data = self._get_file_by_suffix_cached(file_stem, allowed_suffixes)
return file_data
def get_cached_file_data(self, file_name: str) -> FileInfo | None:
file_name = Path(file_name).name
file_name = self._file_alias_cache.get(file_name, file_name)
return self._file_data_cache.get(file_name, None)
def _get_file_by_suffix_cached(self, file_stem: str, allowed_suffixes: list[str]):
for suffix in allowed_suffixes:
file_data = self.get_cached_file_data(
Path(file_stem).with_suffix(suffix).as_posix()
)
if file_data is not None:
return file_data
return None

View File

@ -0,0 +1,33 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
from octoprint.util.files import unix_timestamp_to_m20_timestamp
@dataclass(frozen=True)
class FileInfo:
dosname: str
path: Path
size: int
date: datetime
@property
def file_name(self):
return self.path.name
@property
def timestamp(self) -> float:
return self.date.timestamp()
@property
def timestamp_m20(self) -> str:
return unix_timestamp_to_m20_timestamp(int(self.timestamp))
def get_log_info(self) -> str:
return f'{self.dosname} {self.size} {self.timestamp_m20} "{self.file_name}"'
def to_dict(self):
return asdict(self)

View File

@ -25,12 +25,13 @@ wrapper for FTPS server interactions
"""
from __future__ import annotations
from dataclasses import dataclass
import ftplib
import os
from pathlib import Path
import socket
import ssl
from typing import Optional, Union, List
from typing import Generator, Union
from contextlib import redirect_stdout
import io
@ -66,62 +67,14 @@ class ImplicitTLS(ftplib.FTP_TLS):
return conn, size
class IoTFTPSClient:
@dataclass
class IoTFTPSConnection:
"""iot ftps ftpsclient"""
ftps_host: str
ftps_port: int
ftps_user: str
ftps_pass: str
ssl_implicit: bool
ftps_session: Union[ftplib.FTP, ImplicitTLS]
last_error: Optional[str] = None
welcome: str
ftps_session: ftplib.FTP | ImplicitTLS
def __init__(
self,
ftps_host: str,
ftps_port: Optional[int] = 21,
ftps_user: Optional[str] = "",
ftps_pass: Optional[str] = "",
ssl_implicit: Optional[bool] = False,
) -> None:
self.ftps_host = ftps_host
self.ftps_port = ftps_port
self.ftps_user = ftps_user
self.ftps_pass = ftps_pass
self.ssl_implicit = ssl_implicit
self.instantiate_ftps_session()
def __repr__(self) -> str:
return (
"IoT FTPS Client\n"
"--------------------\n"
f"host: {self.ftps_host}\n"
f"port: {self.ftps_port}\n"
f"user: {self.ftps_user}\n"
f"ssl: {self.ssl_implicit}"
)
def instantiate_ftps_session(self) -> None:
"""init ftps_session based on input params"""
self.ftps_session = ImplicitTLS() if self.ssl_implicit else ftplib.FTP()
self.ftps_session.set_debuglevel(0)
self.welcome = self.ftps_session.connect(
host=self.ftps_host, port=self.ftps_port
)
if self.ftps_user and self.ftps_pass:
self.ftps_session.login(user=self.ftps_user, passwd=self.ftps_pass)
else:
self.ftps_session.login()
if self.ssl_implicit:
self.ftps_session.prot_p()
def disconnect(self) -> None:
"""disconnect the current session from the ftps server"""
def close(self) -> None:
"""close the current session from the ftps server"""
self.ftps_session.close()
def download_file(self, source: str, dest: str):
@ -191,7 +144,9 @@ class IoTFTPSClient:
def mkdir(self, path: str) -> str:
return self.ftps_session.mkd(path)
def list_files(self, list_path: str, extensions: str | list[str] | None = None):
def list_files(
self, list_path: str, extensions: str | list[str] | None = None
) -> Generator[Path]:
"""list files under a path inside the FTPS server"""
if extensions is None:
@ -238,3 +193,41 @@ class IoTFTPSClient:
print(f"unexpected exception occurred: [{ex}]")
pass
return
@dataclass
class IoTFTPSClient:
ftps_host: str
ftps_port: int = 21
ftps_user: str = ""
ftps_pass: str = ""
ssl_implicit: bool = False
welcome: str = ""
_connection: IoTFTPSConnection | None = None
def __enter__(self):
session = self.open_ftps_session()
self._connection = IoTFTPSConnection(session)
return self._connection
def __exit__(self, type, value, traceback):
if self._connection is not None:
self._connection.close()
self._connection = None
def open_ftps_session(self) -> ftplib.FTP | ImplicitTLS:
"""init ftps_session based on input params"""
ftps_session = ImplicitTLS() if self.ssl_implicit else ftplib.FTP()
ftps_session.set_debuglevel(0)
self.welcome = ftps_session.connect(host=self.ftps_host, port=self.ftps_port)
if self.ftps_user and self.ftps_pass:
ftps_session.login(user=self.ftps_user, passwd=self.ftps_pass)
else:
ftps_session.login()
if self.ssl_implicit:
ftps_session.prot_p()
return ftps_session

View File

@ -0,0 +1,141 @@
from __future__ import annotations
import datetime
from pathlib import Path
from typing import Iterable, Iterator
import logging.handlers
from octoprint.util import get_dos_filename
from octoprint_bambu_printer.printer.file_system.cached_file_view import CachedFileView
from .ftps_client import IoTFTPSClient, IoTFTPSConnection
from .file_info import FileInfo
class RemoteSDCardFileList:
def __init__(self, settings) -> None:
self._settings = settings
self._file_alias_cache = {}
self._file_data_cache = {}
self._selected_project_file: FileInfo | None = None
self._logger = logging.getLogger("octoprint.plugins.bambu_printer.BambuPrinter")
self._project_files_view = (
CachedFileView(self).with_filter("", ".3mf").with_filter("cache/", ".3mf")
)
self._timelapse_files_view = CachedFileView(self)
if self._settings.get(["device_type"]) in ["X1", "X1C"]:
self._timelapse_files_view.with_filter("timelapse/", ".mp4")
else:
self._timelapse_files_view.with_filter("timelapse/", ".avi")
@property
def selected_file(self):
return self._selected_project_file
@property
def has_selected_file(self):
return self._selected_project_file is not None
@property
def project_files(self):
return self._project_files_view
def remove_file_selection(self):
self._selected_project_file = None
def get_all_project_files(self):
self._project_files_view.update()
files = self._project_files_view.get_all_cached_info()
self._logger.debug(f"get project files return: {files}")
return files
def get_all_timelapse_files(self):
self._timelapse_files_view.update()
files = self._timelapse_files_view.get_all_cached_info()
self._logger.debug(f"get timelapse files return: {files}")
return files
def select_project_file(self, file_path: str) -> bool:
self._logger.debug(f"_selectSdFile: {file_path}")
file_name = Path(file_path).name
file_info = self._project_files_view.get_cached_file_data(file_name)
if file_info is None:
self._logger.error(f"{file_name} open failed")
return False
self._selected_project_file = file_info
return True
def delete_file(self, file_path: str) -> None:
file_info = self._project_files_view.get_cached_file_data(file_path)
if file_info is not None:
try:
with self.get_ftps_client() as ftp:
if ftp.delete_file(str(file_info.path)):
self._logger.debug(f"{file_path} deleted")
else:
raise RuntimeError(f"Deleting file {file_path} failed")
except Exception as e:
self._logger.exception(e)
def list_files(
self,
folder: str,
extensions: str | list[str] | None,
ftp: IoTFTPSConnection,
existing_files=None,
):
if existing_files is None:
existing_files = []
return list(
self.get_file_info_for_names(
ftp, ftp.list_files(folder, extensions), existing_files
)
)
def _get_ftp_file_info(
self,
ftp: IoTFTPSConnection,
file_path: Path,
existing_files: list[str] | None = None,
):
file_size = ftp.ftps_session.size(file_path.as_posix())
date_str = ftp.ftps_session.sendcmd(f"MDTM {file_path.as_posix()}").replace(
"213 ", ""
)
date = datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S").replace(
tzinfo=datetime.timezone.utc
)
file_name = file_path.name.lower()
dosname = get_dos_filename(file_name, existing_filenames=existing_files).lower()
return FileInfo(
dosname,
file_path,
file_size if file_size is not None else 0,
date,
)
def get_file_info_for_names(
self,
ftp: IoTFTPSConnection,
files: Iterable[Path],
existing_files: list[str] | None = None,
) -> Iterator[FileInfo]:
if existing_files is None:
existing_files = []
for entry in files:
file_info = self._get_ftp_file_info(ftp, entry, existing_files)
yield file_info
existing_files.append(file_info.file_name)
existing_files.append(file_info.dosname)
def get_ftps_client(self):
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
return IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)

View File

@ -1 +0,0 @@
from .ftpsclient import IoTFTPSClient

View File

@ -1,7 +1,9 @@
from __future__ import annotations
from dataclasses import dataclass
from octoprint_bambu_printer.printer.remote_sd_card_file_list import FileInfo
from octoprint_bambu_printer.printer.file_system.remote_sd_card_file_list import (
FileInfo,
)
@dataclass

View File

@ -5,15 +5,13 @@ import logging
import queue
import re
import threading
import time
import traceback
from types import TracebackType
from typing import Callable
from octoprint.util import to_bytes, to_unicode
from serial import SerialTimeoutException
from .char_counting_queue import CharCountingQueue
class PrinterSerialIO(threading.Thread, BufferedIOBase):
command_regex = re.compile(r"^([GM])(\d+)")
@ -80,7 +78,10 @@ class PrinterSerialIO(threading.Thread, BufferedIOBase):
self._error_detected = e
self.input_bytes.task_done()
self._clearQueue(self.input_bytes)
self._log.info("\n".join(traceback.format_exception(e)[-50:]))
self._log.info(
"\n".join(traceback.format_exception_only(type(e), e)[-50:])
)
self._running = False
self._log.debug("Closing IO read loop")

View File

@ -1,169 +0,0 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
import datetime
import itertools
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional
import logging.handlers
from octoprint.util import get_dos_filename
from octoprint.util.files import unix_timestamp_to_m20_timestamp
from .ftpsclient import IoTFTPSClient
@dataclass(frozen=True)
class FileInfo:
dosname: str
path: Path
size: int
timestamp: str
@property
def file_name(self):
return self.path.name
def get_log_info(self):
return f'{self.dosname} {self.size} {self.timestamp} "{self.file_name}"'
def to_dict(self):
return asdict(self)
class RemoteSDCardFileList:
def __init__(self, settings) -> None:
self._settings = settings
self._file_alias_cache = {}
self._file_data_cache = {}
self._selected_file_info: FileInfo | None = None
self._logger = logging.getLogger("octoprint.plugins.bambu_printer.BambuPrinter")
@property
def selected_file(self):
return self._selected_file_info
@property
def has_selected_file(self):
return self._selected_file_info is not None
def remove_file_selection(self):
self._selected_file_info = None
def get_all_files(self):
self._update_existing_files_info()
self._logger.debug(f"get_all_files return: {self._file_data_cache}")
return list(self._file_data_cache.values())
def get_data_by_suffix(self, file_stem: str, allowed_suffixes: list[str]):
if file_stem == "":
return None
file_data = self._get_cached_data_by_suffix(file_stem, allowed_suffixes)
if file_data is None:
self._update_existing_files_info()
file_data = self._get_cached_data_by_suffix(file_stem, allowed_suffixes)
return file_data
def select_file(self, file_path: str) -> bool:
self._logger.debug(f"_selectSdFile: {file_path}")
file_name = Path(file_path).name
file_info = self._get_cached_file_data(file_name)
if file_info is None:
self._logger.error(f"{file_name} open failed")
return False
self._selected_file_info = file_info
return True
def delete_file(self, file_path: str) -> None:
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
file_info = self._get_cached_file_data(file_path)
if file_info is not None:
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
try:
if ftp.delete_file(str(file_info.path)):
self._logger.debug(f"{file_path} deleted")
else:
raise Exception("delete failed")
except Exception as e:
self._logger.debug(f"Error deleting file {file_path}")
def _get_ftp_file_info(
self, ftp: IoTFTPSClient, file_path: Path, existing_files: list[str]
):
file_size = ftp.ftps_session.size(file_path.as_posix())
date_str = ftp.ftps_session.sendcmd(f"MDTM {file_path.as_posix()}").replace(
"213 ", ""
)
filedate = (
datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S")
.replace(tzinfo=datetime.timezone.utc)
.timestamp()
)
file_name = file_path.name.lower()
dosname = get_dos_filename(file_name, existing_filenames=existing_files).lower()
return FileInfo(
dosname,
file_path,
file_size if file_size is not None else 0,
unix_timestamp_to_m20_timestamp(int(filedate)),
)
def _scan_ftp_file_list(
self, ftp, files: list[str], existing_files: list[str]
) -> Iterator[FileInfo]:
for entry in files:
file_info = self._get_ftp_file_info(ftp, Path(entry), existing_files)
yield file_info
existing_files.append(file_info.file_name)
existing_files.append(file_info.dosname)
def _get_existing_files_info(self):
ftp = self._connect_ftps_server()
file_list = []
file_list.extend(ftp.list_files("", ".3mf"))
file_list.extend(ftp.list_files("cache/", ".3mf"))
existing_files = []
return list(self._scan_ftp_file_list(ftp, file_list, existing_files))
def _connect_ftps_server(self):
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
return ftp
def _get_cached_file_data(self, file_name: str) -> FileInfo | None:
self._logger.debug(f"get data for path: {file_name}")
# replace if name is an alias
file_name = Path(file_name).name
file_name = self._file_alias_cache.get(file_name, file_name)
data = self._file_data_cache.get(file_name, None)
self._logger.debug(f"get file data: {data}")
return data
def _update_existing_files_info(self):
file_info_list = self._get_existing_files_info()
self._file_alias_cache = {
info.dosname: info.file_name for info in file_info_list
}
self._file_data_cache = {info.file_name: info for info in file_info_list}
def _get_cached_data_by_suffix(self, file_stem: str, allowed_suffixes: list[str]):
for suffix in allowed_suffixes:
file_data = self._get_cached_file_data(
Path(file_stem).with_suffix(suffix).as_posix()
)
if file_data is not None:
return file_data
return None

View File

@ -64,7 +64,7 @@ class PrintingState(APrinterState):
def update_print_job_info(self):
print_job_info = self._printer.bambu_client.get_device().print_job
task_name: str = print_job_info.subtask_name
project_file_info = self._printer.file_system.get_data_by_suffix(
project_file_info = self._printer.file_system.project_files.get_file_by_suffix(
task_name, [".3mf", ".gcode.3mf"]
)
if project_file_info is None:

View File

@ -0,0 +1,30 @@
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from octoprint.util import get_formatted_size, get_formatted_datetime
from octoprint_bambu_printer.printer.file_system.bambu_timelapse_file_info import (
BambuTimelapseFileInfo,
)
from octoprint_bambu_printer.printer.file_system.file_info import FileInfo
def test_timelapse_info_valid():
file_name = "part.mp4"
file_size = 1000
file_date = datetime(2020, 1, 1)
file_timestamp = file_date.timestamp()
file_info = FileInfo(file_name, Path(file_name), file_size, file_date)
timelapse = BambuTimelapseFileInfo.from_file_info(file_info)
assert timelapse.to_dict() == {
"bytes": file_size,
"date": get_formatted_datetime(datetime.fromtimestamp(file_timestamp)),
"name": file_name,
"size": get_formatted_size(file_size),
"thumbnail": "/plugin/bambu_printer/thumbnail/"
+ file_name.replace(".mp4", ".jpg").replace(".avi", ".jpg"),
"timestamp": file_timestamp,
"url": f"/plugin/bambu_printer/timelapse/{file_name}",
}

View File

@ -1,20 +1,14 @@
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timezone
import logging
from pathlib import Path
import time
from typing import Any
import unittest
from unittest.mock import MagicMock, Mock
import unittest.mock
from unittest.mock import MagicMock
import pybambu
import pybambu.commands
from octoprint_bambu_printer.printer.bambu_virtual_printer import BambuVirtualPrinter
from octoprint_bambu_printer.printer.remote_sd_card_file_list import (
RemoteSDCardFileList,
)
from octoprint_bambu_printer.printer.file_system.ftps_client import IoTFTPSClient
from octoprint_bambu_printer.printer.states.idle_state import IdleState
from octoprint_bambu_printer.printer.states.paused_state import PausedState
from octoprint_bambu_printer.printer.states.printing_state import PrintingState
@ -88,40 +82,31 @@ def files_info_ftp():
@fixture
def ftps_session_mock(files_info_ftp):
with unittest.mock.patch(
"octoprint_bambu_printer.printer.ftpsclient.ftpsclient.IoTFTPSClient"
) as ftps_client_mock:
ftps_session = MagicMock()
ftps_session.size.side_effect = DictGetter(
{file: info[0] for file, info in files_info_ftp.items()}
)
ftps_session = MagicMock()
ftps_session.size.side_effect = DictGetter(
{file: info[0] for file, info in files_info_ftp.items()}
)
ftps_session.sendcmd.side_effect = DictGetter(
{f"MDTM {file}": info[1] for file, info in files_info_ftp.items()}
)
ftps_session.sendcmd.side_effect = DictGetter(
{f"MDTM {file}": info[1] for file, info in files_info_ftp.items()}
)
all_files = list(files_info_ftp.keys())
file_registry = DictGetter(
{
("", ".3mf"): list(
filter(lambda f: Path(f).parent == Path("."), all_files)
),
("cache/", ".3mf"): list(
map(
lambda f: Path(f).name,
filter(lambda f: Path(f).parent == Path("cache/"), all_files),
)
),
}
)
ftps_client_mock.list_files.side_effect = lambda folder, ext: file_registry(
(folder, ext)
)
ftps_client_mock.ftps_session = ftps_session
RemoteSDCardFileList._connect_ftps_server = MagicMock(
return_value=ftps_client_mock
)
yield
all_files = list(files_info_ftp.keys())
ftps_session.nlst.side_effect = DictGetter(
{
"": list(filter(lambda f: Path(f).parent == Path("."), all_files))
+ ["Mock folder"],
"cache/": list(
map(
lambda f: Path(f).name,
filter(lambda f: Path(f).parent == Path("cache/"), all_files),
)
)
+ ["Mock folder"],
}
)
IoTFTPSClient.open_ftps_session = MagicMock(return_value=ftps_session)
yield
@fixture(scope="function")