0.1.0 (#34)
* Add separate class for sftp file system * Add separate serial IO handling class * Replace function name mangling with gcode handler registration system * Add states to virtual Bambu printer that manage state specific interaction * Add synchronization utilities to work with virtual printer as if it is a binary stream * Add unittests with mocked Bambu printer to ensure core functionality works as expected * Fix formatting to be automatically processed by black formatter * Fix python 3.10 type annotations for readability
This commit is contained in:
@@ -0,0 +1,34 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .file_info import FileInfo
|
||||
|
||||
from octoprint.util import get_formatted_size, get_formatted_datetime
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BambuTimelapseFileInfo:
|
||||
bytes: int
|
||||
date: str | None
|
||||
name: str
|
||||
size: str
|
||||
thumbnail: str
|
||||
timestamp: float
|
||||
url: str
|
||||
|
||||
def to_dict(self):
|
||||
return asdict(self)
|
||||
|
||||
@staticmethod
|
||||
def from_file_info(file_info: FileInfo):
|
||||
return BambuTimelapseFileInfo(
|
||||
bytes=file_info.size,
|
||||
date=get_formatted_datetime(file_info.date),
|
||||
name=file_info.file_name,
|
||||
size=get_formatted_size(file_info.size),
|
||||
thumbnail=f"/plugin/bambu_printer/thumbnail/{file_info.path.stem}.jpg",
|
||||
timestamp=file_info.timestamp,
|
||||
url=f"/plugin/bambu_printer/timelapse/{file_info.file_name}",
|
||||
)
|
@@ -0,0 +1,94 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from octoprint_bambu_printer.printer.file_system.remote_sd_card_file_list import (
|
||||
RemoteSDCardFileList,
|
||||
)
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from octoprint_bambu_printer.printer.file_system.file_info import FileInfo
|
||||
|
||||
|
||||
@dataclass
|
||||
class CachedFileView:
|
||||
file_system: RemoteSDCardFileList
|
||||
folder_view: dict[tuple[str, str | list[str] | None], None] = field(
|
||||
default_factory=dict
|
||||
) # dict preserves order, but set does not. We use only dict keys as storage
|
||||
on_update: Callable[[], None] | None = None
|
||||
|
||||
def __post_init__(self):
|
||||
self._file_alias_cache: dict[str, str] = {}
|
||||
self._file_data_cache: dict[str, FileInfo] = {}
|
||||
|
||||
def with_filter(
|
||||
self, folder: str, extensions: str | list[str] | None = None
|
||||
) -> "CachedFileView":
|
||||
self.folder_view[(folder, extensions)] = None
|
||||
return self
|
||||
|
||||
def list_all_views(self):
|
||||
existing_files: list[str] = []
|
||||
result: list[FileInfo] = []
|
||||
|
||||
with self.file_system.get_ftps_client() as ftp:
|
||||
for filter in self.folder_view.keys():
|
||||
result.extend(self.file_system.list_files(*filter, ftp, existing_files))
|
||||
return result
|
||||
|
||||
def update(self):
|
||||
file_info_list = self.list_all_views()
|
||||
self._update_file_list_cache(file_info_list)
|
||||
if self.on_update:
|
||||
self.on_update()
|
||||
|
||||
def _update_file_list_cache(self, files: list[FileInfo]):
|
||||
self._file_alias_cache = {info.dosname: info.path.as_posix() for info in files}
|
||||
self._file_data_cache = {info.path.as_posix(): info for info in files}
|
||||
|
||||
def get_all_info(self):
|
||||
self.update()
|
||||
return self.get_all_cached_info()
|
||||
|
||||
def get_all_cached_info(self):
|
||||
return list(self._file_data_cache.values())
|
||||
|
||||
def get_file_data(self, file_path: str | Path) -> FileInfo | None:
|
||||
file_data = self.get_file_data_cached(file_path)
|
||||
if file_data is None:
|
||||
self.update()
|
||||
file_data = self.get_file_data_cached(file_path)
|
||||
return file_data
|
||||
|
||||
def get_file_data_cached(self, file_path: str | Path) -> FileInfo | None:
|
||||
if isinstance(file_path, str):
|
||||
file_path = Path(file_path).as_posix().strip("/")
|
||||
else:
|
||||
file_path = file_path.as_posix().strip("/")
|
||||
|
||||
if file_path not in self._file_data_cache:
|
||||
file_path = self._file_alias_cache.get(file_path, file_path)
|
||||
return self._file_data_cache.get(file_path, None)
|
||||
|
||||
def get_file_by_stem(self, file_stem: str, allowed_suffixes: list[str]):
|
||||
if file_stem == "":
|
||||
return None
|
||||
|
||||
file_stem = Path(file_stem).with_suffix("").stem
|
||||
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes)
|
||||
if file_data is None:
|
||||
self.update()
|
||||
file_data = self._get_file_by_stem_cached(file_stem, allowed_suffixes)
|
||||
return file_data
|
||||
|
||||
def _get_file_by_stem_cached(self, file_stem: str, allowed_suffixes: list[str]):
|
||||
for file_path_str in self._file_data_cache.keys():
|
||||
file_path = Path(file_path_str)
|
||||
if file_stem == file_path.with_suffix("").stem and all(
|
||||
suffix in allowed_suffixes for suffix in file_path.suffixes
|
||||
):
|
||||
return self.get_file_data_cached(file_path)
|
||||
return None
|
33
octoprint_bambu_printer/printer/file_system/file_info.py
Normal file
33
octoprint_bambu_printer/printer/file_system/file_info.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from octoprint.util.files import unix_timestamp_to_m20_timestamp
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FileInfo:
|
||||
dosname: str
|
||||
path: Path
|
||||
size: int
|
||||
date: datetime
|
||||
|
||||
@property
|
||||
def file_name(self):
|
||||
return self.path.name
|
||||
|
||||
@property
|
||||
def timestamp(self) -> float:
|
||||
return self.date.timestamp()
|
||||
|
||||
@property
|
||||
def timestamp_m20(self) -> str:
|
||||
return unix_timestamp_to_m20_timestamp(int(self.timestamp))
|
||||
|
||||
def get_gcode_info(self) -> str:
|
||||
return f'{self.dosname} {self.size} {self.timestamp_m20} "{self.file_name}"'
|
||||
|
||||
def to_dict(self):
|
||||
return asdict(self)
|
256
octoprint_bambu_printer/printer/file_system/ftps_client.py
Normal file
256
octoprint_bambu_printer/printer/file_system/ftps_client.py
Normal file
@@ -0,0 +1,256 @@
|
||||
"""
|
||||
Based on: <https://github.com/dgonzo27/py-iot-utils>
|
||||
|
||||
MIT License
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
wrapper for FTPS server interactions
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
import ftplib
|
||||
import os
|
||||
from pathlib import Path
|
||||
import socket
|
||||
import ssl
|
||||
from typing import Generator, Union
|
||||
|
||||
from contextlib import redirect_stdout
|
||||
import io
|
||||
import re
|
||||
|
||||
|
||||
class ImplicitTLS(ftplib.FTP_TLS):
|
||||
"""ftplib.FTP_TLS sub-class to support implicit SSL FTPS"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._sock = None
|
||||
|
||||
@property
|
||||
def sock(self):
|
||||
"""return socket"""
|
||||
return self._sock
|
||||
|
||||
@sock.setter
|
||||
def sock(self, value):
|
||||
"""wrap and set SSL socket"""
|
||||
if value is not None and not isinstance(value, ssl.SSLSocket):
|
||||
value = self.context.wrap_socket(value)
|
||||
self._sock = value
|
||||
|
||||
def ntransfercmd(self, cmd, rest=None):
|
||||
conn, size = ftplib.FTP.ntransfercmd(self, cmd, rest)
|
||||
|
||||
if self._prot_p:
|
||||
conn = self.context.wrap_socket(
|
||||
conn, server_hostname=self.host, session=self.sock.session
|
||||
) # this is the fix
|
||||
return conn, size
|
||||
|
||||
|
||||
@dataclass
|
||||
class IoTFTPSConnection:
|
||||
"""iot ftps ftpsclient"""
|
||||
|
||||
ftps_session: ftplib.FTP | ImplicitTLS
|
||||
|
||||
def close(self) -> None:
|
||||
"""close the current session from the ftps server"""
|
||||
self.ftps_session.close()
|
||||
|
||||
def download_file(self, source: str, dest: str):
|
||||
"""download a file to a path on the local filesystem"""
|
||||
with open(dest, "wb") as file:
|
||||
self.ftps_session.retrbinary(f"RETR {source}", file.write)
|
||||
|
||||
def upload_file(self, source: str, dest: str, callback=None) -> bool:
|
||||
"""upload a file to a path inside the FTPS server"""
|
||||
|
||||
file_size = os.path.getsize(source)
|
||||
|
||||
block_size = max(file_size // 100, 8192)
|
||||
rest = None
|
||||
|
||||
try:
|
||||
# Taken from ftplib.storbinary but with custom ssl handling
|
||||
# due to the shitty bambu p1p ftps server TODO fix properly.
|
||||
with open(source, "rb") as fp:
|
||||
self.ftps_session.voidcmd("TYPE I")
|
||||
|
||||
with self.ftps_session.transfercmd(f"STOR {dest}", rest) as conn:
|
||||
while 1:
|
||||
buf = fp.read(block_size)
|
||||
|
||||
if not buf:
|
||||
break
|
||||
|
||||
conn.sendall(buf)
|
||||
|
||||
if callback:
|
||||
callback(buf)
|
||||
|
||||
# shutdown ssl layer
|
||||
if ftplib._SSLSocket is not None and isinstance(
|
||||
conn, ftplib._SSLSocket
|
||||
):
|
||||
# Yeah this is suposed to be conn.unwrap
|
||||
# But since we operate in prot p mode
|
||||
# we can close the connection always.
|
||||
# This is cursed but it works.
|
||||
if "vsFTPd" in self.welcome:
|
||||
conn.unwrap()
|
||||
else:
|
||||
conn.shutdown(socket.SHUT_RDWR)
|
||||
|
||||
return True
|
||||
except Exception as ex:
|
||||
print(f"unexpected exception occurred: {ex}")
|
||||
pass
|
||||
return False
|
||||
|
||||
def delete_file(self, path: str) -> bool:
|
||||
"""delete a file from under a path inside the FTPS server"""
|
||||
try:
|
||||
self.ftps_session.delete(path)
|
||||
return True
|
||||
except Exception as ex:
|
||||
print(f"unexpected exception occurred: {ex}")
|
||||
pass
|
||||
return False
|
||||
|
||||
def move_file(self, source: str, dest: str):
|
||||
"""move a file inside the FTPS server to another path inside the FTPS server"""
|
||||
self.ftps_session.rename(source, dest)
|
||||
|
||||
def mkdir(self, path: str) -> str:
|
||||
return self.ftps_session.mkd(path)
|
||||
|
||||
def list_files(
|
||||
self, list_path: str, extensions: str | list[str] | None = None
|
||||
) -> Generator[Path]:
|
||||
"""list files under a path inside the FTPS server"""
|
||||
|
||||
if extensions is None:
|
||||
_extension_acceptable = lambda p: True
|
||||
else:
|
||||
if isinstance(extensions, str):
|
||||
extensions = [extensions]
|
||||
_extension_acceptable = lambda p: any(s in p.suffixes for s in extensions)
|
||||
|
||||
try:
|
||||
list_result = self.ftps_session.nlst(list_path) or []
|
||||
for file_list_entry in list_result:
|
||||
path = Path(list_path) / Path(file_list_entry).name
|
||||
if _extension_acceptable(path):
|
||||
yield path
|
||||
except Exception as ex:
|
||||
print(f"unexpected exception occurred: {ex}")
|
||||
|
||||
def list_files_ex(self, path: str) -> Union[list[str], None]:
|
||||
"""list files under a path inside the FTPS server"""
|
||||
try:
|
||||
f = io.StringIO()
|
||||
with redirect_stdout(f):
|
||||
self.ftps_session.dir(path)
|
||||
s = f.getvalue()
|
||||
files = []
|
||||
for row in s.split("\n"):
|
||||
if len(row) <= 0:
|
||||
continue
|
||||
|
||||
attribs = row.split(" ")
|
||||
|
||||
match = re.search(r".*\ (\d\d\:\d\d|\d\d\d\d)\ (.*)", row)
|
||||
name = ""
|
||||
if match:
|
||||
name = match.groups(1)[1]
|
||||
else:
|
||||
name = attribs[len(attribs) - 1]
|
||||
|
||||
file = (attribs[0], name)
|
||||
files.append(file)
|
||||
return files
|
||||
except Exception as ex:
|
||||
print(f"unexpected exception occurred: [{ex}]")
|
||||
pass
|
||||
return
|
||||
|
||||
def get_file_size(self, file_path: str):
|
||||
try:
|
||||
return self.ftps_session.size(file_path)
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f'Cannot get file size for "{file_path}" due to error: {str(e)}'
|
||||
)
|
||||
|
||||
def get_file_date(self, file_path: str) -> datetime:
|
||||
try:
|
||||
date_response = self.ftps_session.sendcmd(f"MDTM {file_path}").replace(
|
||||
"213 ", ""
|
||||
)
|
||||
date = datetime.strptime(date_response, "%Y%m%d%H%M%S").replace(
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
return date
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f'Cannot get file date for "{file_path}" due to error: {str(e)}'
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IoTFTPSClient:
|
||||
ftps_host: str
|
||||
ftps_port: int = 21
|
||||
ftps_user: str = ""
|
||||
ftps_pass: str = ""
|
||||
ssl_implicit: bool = False
|
||||
welcome: str = ""
|
||||
_connection: IoTFTPSConnection | None = None
|
||||
|
||||
def __enter__(self):
|
||||
session = self.open_ftps_session()
|
||||
self._connection = IoTFTPSConnection(session)
|
||||
return self._connection
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if self._connection is not None:
|
||||
self._connection.close()
|
||||
self._connection = None
|
||||
|
||||
def open_ftps_session(self) -> ftplib.FTP | ImplicitTLS:
|
||||
"""init ftps_session based on input params"""
|
||||
ftps_session = ImplicitTLS() if self.ssl_implicit else ftplib.FTP()
|
||||
ftps_session.set_debuglevel(0)
|
||||
|
||||
self.welcome = ftps_session.connect(host=self.ftps_host, port=self.ftps_port)
|
||||
|
||||
if self.ftps_user and self.ftps_pass:
|
||||
ftps_session.login(user=self.ftps_user, passwd=self.ftps_pass)
|
||||
else:
|
||||
ftps_session.login()
|
||||
|
||||
if self.ssl_implicit:
|
||||
ftps_session.prot_p()
|
||||
|
||||
return ftps_session
|
@@ -0,0 +1,87 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Iterator
|
||||
import logging.handlers
|
||||
|
||||
from octoprint.util import get_dos_filename
|
||||
|
||||
from .ftps_client import IoTFTPSClient, IoTFTPSConnection
|
||||
from .file_info import FileInfo
|
||||
|
||||
|
||||
class RemoteSDCardFileList:
|
||||
|
||||
def __init__(self, settings) -> None:
|
||||
self._settings = settings
|
||||
self._selected_project_file: FileInfo | None = None
|
||||
self._logger = logging.getLogger("octoprint.plugins.bambu_printer.BambuPrinter")
|
||||
|
||||
def delete_file(self, file_path: Path) -> None:
|
||||
try:
|
||||
with self.get_ftps_client() as ftp:
|
||||
if ftp.delete_file(file_path.as_posix()):
|
||||
self._logger.debug(f"{file_path} deleted")
|
||||
else:
|
||||
raise RuntimeError(f"Deleting file {file_path} failed")
|
||||
except Exception as e:
|
||||
self._logger.exception(e)
|
||||
|
||||
def list_files(
|
||||
self,
|
||||
folder: str,
|
||||
extensions: str | list[str] | None,
|
||||
ftp: IoTFTPSConnection,
|
||||
existing_files=None,
|
||||
):
|
||||
if existing_files is None:
|
||||
existing_files = []
|
||||
|
||||
return list(
|
||||
self.get_file_info_for_names(
|
||||
ftp, ftp.list_files(folder, extensions), existing_files
|
||||
)
|
||||
)
|
||||
|
||||
def _get_ftp_file_info(
|
||||
self,
|
||||
ftp: IoTFTPSConnection,
|
||||
file_path: Path,
|
||||
existing_files: list[str] | None = None,
|
||||
):
|
||||
file_size = ftp.get_file_size(file_path.as_posix())
|
||||
date = ftp.get_file_date(file_path.as_posix())
|
||||
file_name = file_path.name.lower()
|
||||
dosname = get_dos_filename(file_name, existing_filenames=existing_files).lower()
|
||||
return FileInfo(
|
||||
dosname,
|
||||
file_path,
|
||||
file_size if file_size is not None else 0,
|
||||
date,
|
||||
)
|
||||
|
||||
def get_file_info_for_names(
|
||||
self,
|
||||
ftp: IoTFTPSConnection,
|
||||
files: Iterable[Path],
|
||||
existing_files: list[str] | None = None,
|
||||
) -> Iterator[FileInfo]:
|
||||
if existing_files is None:
|
||||
existing_files = []
|
||||
|
||||
for entry in files:
|
||||
try:
|
||||
file_info = self._get_ftp_file_info(ftp, entry, existing_files)
|
||||
yield file_info
|
||||
existing_files.append(file_info.file_name)
|
||||
existing_files.append(file_info.dosname)
|
||||
except Exception as e:
|
||||
self._logger.exception(e, exc_info=False)
|
||||
|
||||
def get_ftps_client(self):
|
||||
host = self._settings.get(["host"])
|
||||
access_code = self._settings.get(["access_code"])
|
||||
return IoTFTPSClient(
|
||||
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
|
||||
)
|
Reference in New Issue
Block a user