Refactor plugin. Add typing

This commit is contained in:
Anton Skrypnyk 2024-07-24 17:15:45 +03:00
parent ad862d5ebd
commit 527ec9ef3c
4 changed files with 566 additions and 401 deletions

View File

@ -1,260 +1,10 @@
# coding=utf-8
from __future__ import absolute_import
import os
import threading
import time
import flask
import datetime
import octoprint.plugin
from octoprint.events import Events
from octoprint.util import get_formatted_size, get_formatted_datetime, is_hidden_path
from octoprint.server.util.flask import no_firstrun_access
from octoprint.server.util.tornado import LargeResponseHandler, UrlProxyHandler, path_validation_factory
from octoprint.access.permissions import Permissions
from urllib.parse import quote as urlquote
from .ftpsclient import IoTFTPSClient
class BambuPrintPlugin(octoprint.plugin.SettingsPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.EventHandlerPlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.BlueprintPlugin):
def get_assets(self):
return {'js': ["js/bambu_printer.js"]}
def get_template_configs(self):
return [{"type": "settings", "custom_bindings": True},
{"type": "generic", "custom_bindings": True, "template": "bambu_timelapse.jinja2"}] #, {"type": "generic", "custom_bindings": True, "template": "bambu_printer.jinja2"}]
def get_settings_defaults(self):
return {"device_type": "X1C",
"serial": "",
"host": "",
"access_code": "",
"username": "bblp",
"timelapse": False,
"bed_leveling": True,
"flow_cali": False,
"vibration_cali": True,
"layer_inspect": True,
"use_ams": False,
"local_mqtt": True,
"region": "",
"email": "",
"auth_token": "",
"always_use_default_options": False
}
def is_api_adminonly(self):
return True
def get_api_commands(self):
return {"register": ["email", "password", "region", "auth_token"]}
def on_api_command(self, command, data):
if command == "register":
if "email" in data and "password" in data and "region" in data and "auth_token" in data:
self._logger.info(f"Registering user {data['email']}")
from pybambu import BambuCloud
bambu_cloud = BambuCloud(data["region"], data["email"], data["password"], data["auth_token"])
bambu_cloud.login(data["region"], data["email"], data["password"])
return flask.jsonify({"auth_token": bambu_cloud.auth_token, "username": bambu_cloud.username})
def on_event(self, event, payload):
if event == Events.TRANSFER_DONE:
self._printer.commands("M20 L T", force=True)
def support_3mf_files(self):
return {'machinecode': {'3mf': ["3mf"]}}
def upload_to_sd(self, printer, filename, path, sd_upload_started, sd_upload_succeeded, sd_upload_failed, *args, **kwargs):
self._logger.debug(f"Starting upload from {filename} to {filename}")
sd_upload_started(filename, filename)
def process():
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
elapsed = time.monotonic()
try:
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
if ftp.upload_file(path, f"{filename}"):
elapsed = time.monotonic() - elapsed
sd_upload_succeeded(filename, filename, elapsed)
# remove local file after successful upload to Bambu
# self._file_manager.remove_file("local", filename)
else:
raise Exception("upload failed")
except Exception as e:
elapsed = time.monotonic() - elapsed
sd_upload_failed(filename, filename, elapsed)
self._logger.debug(f"Error uploading file {filename}")
thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
return filename
def get_template_vars(self):
return {"plugin_version": self._plugin_version}
def virtual_printer_factory(self, comm_instance, port, baudrate, read_timeout):
if not port == "BAMBU":
return None
if self._settings.get(["serial"]) == "" or self._settings.get(["host"]) == "" or self._settings.get(["access_code"]) == "":
return None
import logging.handlers
from octoprint.logging.handlers import CleaningTimedRotatingFileHandler
seriallog_handler = CleaningTimedRotatingFileHandler(
self._settings.get_plugin_logfile_path(postfix="serial"),
when="D",
backupCount=3,
)
seriallog_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
seriallog_handler.setLevel(logging.DEBUG)
from . import virtual
serial_obj = virtual.BambuPrinter(
self._settings,
self._printer_profile_manager,
data_folder=self.get_plugin_data_folder(),
seriallog_handler=seriallog_handler,
read_timeout=float(read_timeout),
faked_baudrate=baudrate,
)
return serial_obj
def get_additional_port_names(self, *args, **kwargs):
if self._settings.get(["serial"]) != "" and self._settings.get(["host"]) != "" and self._settings.get(["access_code"]) != "":
return ["BAMBU"]
else:
return []
def get_timelapse_file_list(self):
if flask.request.path.startswith('/api/timelapse'):
def process():
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
return_file_list = []
try:
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
if self._settings.get(["device_type"]) in ["X1", "X1C"]:
timelapse_file_list = ftp.list_files("timelapse/", ".mp4") or []
else:
timelapse_file_list = ftp.list_files("timelapse/", ".avi") or []
for entry in timelapse_file_list:
if entry.startswith("/"):
filename = entry[1:].replace("timelapse/", "")
else:
filename = entry.replace("timelapse/", "")
filesize = ftp.ftps_session.size(f"timelapse/{filename}")
date_str = ftp.ftps_session.sendcmd(f"MDTM timelapse/{filename}").replace("213 ", "")
filedate = datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S").replace(tzinfo=datetime.timezone.utc).timestamp()
return_file_list.append(
{
"bytes": filesize,
"date": get_formatted_datetime(datetime.datetime.fromtimestamp(filedate)),
"name": filename,
"size": get_formatted_size(filesize),
"thumbnail": "/plugin/bambu_printer/thumbnail/" + filename.replace(".mp4", ".jpg").replace(".avi", ".jpg"),
"timestamp": filedate,
"url": f"/plugin/bambu_printer/timelapse/{filename}"
})
self._plugin_manager.send_plugin_message(self._identifier, {'files': return_file_list})
except Exception as e:
self._logger.debug(f"Error getting timelapse files: {e}")
thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
def _hook_octoprint_server_api_before_request(self, *args, **kwargs):
return [self.get_timelapse_file_list]
@octoprint.plugin.BlueprintPlugin.route("/timelapse/<filename>", methods=["GET"])
@octoprint.server.util.flask.restricted_access
@no_firstrun_access
@Permissions.TIMELAPSE_DOWNLOAD.require(403)
def downloadTimelapse(self, filename):
dest_filename = os.path.join(self.get_plugin_data_folder(), filename)
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
if not os.path.exists(dest_filename):
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
download_result = ftp.download_file(
source=f"timelapse/{filename}",
dest=dest_filename,
)
return flask.redirect("/plugin/bambu_printer/download/timelapse/" + urlquote(filename), code=302)
@octoprint.plugin.BlueprintPlugin.route("/thumbnail/<filename>", methods=["GET"])
@octoprint.server.util.flask.restricted_access
@no_firstrun_access
@Permissions.TIMELAPSE_DOWNLOAD.require(403)
def downloadThumbnail(self, filename):
dest_filename = os.path.join(self.get_plugin_data_folder(), filename)
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
if not os.path.exists(dest_filename):
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
download_result = ftp.download_file(
source=f"timelapse/thumbnail/{filename}",
dest=dest_filename,
)
return flask.redirect("/plugin/bambu_printer/download/thumbnail/" + urlquote(filename), code=302)
def is_blueprint_csrf_protected(self):
return True
def route_hook(self, server_routes, *args, **kwargs):
return [
(r"/download/timelapse/(.*)", LargeResponseHandler,
{'path': self.get_plugin_data_folder(), 'as_attachment': True, 'path_validation': path_validation_factory(
lambda path: not is_hidden_path(path), status_code=404)}),
(r"/download/thumbnail/(.*)", LargeResponseHandler,
{'path': self.get_plugin_data_folder(), 'as_attachment': True, 'path_validation': path_validation_factory(
lambda path: not is_hidden_path(path), status_code=404)})
]
def get_update_information(self):
return {'bambu_printer': {'displayName': "Bambu Printer",
'displayVersion': self._plugin_version,
'type': "github_release",
'user': "jneilliii",
'repo': "OctoPrint-BambuPrinter",
'current': self._plugin_version,
'stable_branch': {'name': "Stable",
'branch': "master",
'comittish': ["master"]},
'prerelease_branches': [
{'name': "Release Candidate",
'branch': "rc",
'comittish': ["rc", "master"]}
],
'pip': "https://github.com/jneilliii/OctoPrint-BambuPrinter/archive/{target_version}.zip"}}
__plugin_name__ = "Bambu Printer"
__plugin_pythoncompat__ = ">=3.7,<4"
from .bambu_print_plugin import BambuPrintPlugin
def __plugin_load__():
plugin = BambuPrintPlugin()
@ -270,5 +20,5 @@ def __plugin_load__():
"octoprint.printer.sdcardupload": __plugin_implementation__.upload_to_sd,
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information,
"octoprint.server.api.before_request": __plugin_implementation__._hook_octoprint_server_api_before_request,
"octoprint.server.http.routes": __plugin_implementation__.route_hook
"octoprint.server.http.routes": __plugin_implementation__.route_hook,
}

View File

@ -0,0 +1,339 @@
from __future__ import absolute_import, annotations
import os
import threading
import time
import flask
import datetime
import octoprint.printer
import octoprint.server
import octoprint.plugin
from octoprint.events import Events
import octoprint.settings
from octoprint.util import get_formatted_size, get_formatted_datetime, is_hidden_path
from octoprint.server.util.flask import no_firstrun_access
from octoprint.server.util.tornado import (
LargeResponseHandler,
path_validation_factory,
)
from octoprint.access.permissions import Permissions
from octoprint.logging.handlers import CleaningTimedRotatingFileHandler
from pybambu import BambuCloud
from urllib.parse import quote as urlquote
import logging.handlers
from octoprint_bambu_printer.bambu_virtual_printer import BambuVirtualPrinter
from .ftpsclient import IoTFTPSClient
class BambuPrintPlugin(
octoprint.plugin.SettingsPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.EventHandlerPlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.BlueprintPlugin,
):
_logger: logging.Logger
_printer: octoprint.printer.PrinterInterface
_settings: octoprint.settings.Settings
_plugin_manager: octoprint.plugin.PluginManager
def get_assets(self):
return {"js": ["js/bambu_printer.js"]}
def get_template_configs(self):
return [
{"type": "settings", "custom_bindings": True},
{
"type": "generic",
"custom_bindings": True,
"template": "bambu_timelapse.jinja2",
},
] # , {"type": "generic", "custom_bindings": True, "template": "bambu_printer.jinja2"}]
def get_settings_defaults(self):
return {
"device_type": "X1C",
"serial": "",
"host": "",
"access_code": "",
"username": "bblp",
"timelapse": False,
"bed_leveling": True,
"flow_cali": False,
"vibration_cali": True,
"layer_inspect": True,
"use_ams": False,
"local_mqtt": True,
"region": "",
"email": "",
"auth_token": "",
"always_use_default_options": False,
}
def is_api_adminonly(self):
return True
def get_api_commands(self):
return {"register": ["email", "password", "region", "auth_token"]}
def on_api_command(self, command, data):
if command == "register":
if (
"email" in data
and "password" in data
and "region" in data
and "auth_token" in data
):
self._logger.info(f"Registering user {data['email']}")
bambu_cloud = BambuCloud(
data["region"], data["email"], data["password"], data["auth_token"]
)
bambu_cloud.login(data["region"], data["email"], data["password"])
return flask.jsonify(
{
"auth_token": bambu_cloud.auth_token,
"username": bambu_cloud.username,
}
)
def on_event(self, event, payload):
if event == Events.TRANSFER_DONE:
self._printer.commands("M20 L T", force=True)
def support_3mf_files(self):
return {"machinecode": {"3mf": ["3mf"]}}
def upload_to_sd(
self,
printer,
filename,
path,
sd_upload_started,
sd_upload_succeeded,
sd_upload_failed,
*args,
**kwargs,
):
self._logger.debug(f"Starting upload from {filename} to {filename}")
sd_upload_started(filename, filename)
def process():
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
elapsed = time.monotonic()
try:
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
if ftp.upload_file(path, f"{filename}"):
elapsed = time.monotonic() - elapsed
sd_upload_succeeded(filename, filename, elapsed)
# remove local file after successful upload to Bambu
# self._file_manager.remove_file("local", filename)
else:
raise Exception("upload failed")
except Exception as e:
elapsed = time.monotonic() - elapsed
sd_upload_failed(filename, filename, elapsed)
self._logger.debug(f"Error uploading file {filename}")
thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
return filename
def get_template_vars(self):
return {"plugin_version": self._plugin_version}
def virtual_printer_factory(self, comm_instance, port, baudrate, read_timeout):
if not port == "BAMBU":
return None
if (
self._settings.get(["serial"]) == ""
or self._settings.get(["host"]) == ""
or self._settings.get(["access_code"]) == ""
):
return None
seriallog_handler = CleaningTimedRotatingFileHandler(
self._settings.get_plugin_logfile_path(postfix="serial"),
when="D",
backupCount=3,
)
seriallog_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
seriallog_handler.setLevel(logging.DEBUG)
serial_obj = BambuVirtualPrinter(
self._settings,
self._printer_profile_manager,
data_folder=self.get_plugin_data_folder(),
seriallog_handler=seriallog_handler,
read_timeout=float(read_timeout),
faked_baudrate=baudrate,
)
return serial_obj
def get_additional_port_names(self, *args, **kwargs):
if (
self._settings.get(["serial"]) != ""
and self._settings.get(["host"]) != ""
and self._settings.get(["access_code"]) != ""
):
return ["BAMBU"]
else:
return []
def get_timelapse_file_list(self):
if flask.request.path.startswith("/api/timelapse"):
def process():
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
return_file_list = []
try:
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
if self._settings.get(["device_type"]) in ["X1", "X1C"]:
timelapse_file_list = ftp.list_files("timelapse/", ".mp4") or []
else:
timelapse_file_list = ftp.list_files("timelapse/", ".avi") or []
for entry in timelapse_file_list:
if entry.startswith("/"):
filename = entry[1:].replace("timelapse/", "")
else:
filename = entry.replace("timelapse/", "")
filesize = ftp.ftps_session.size(f"timelapse/{filename}")
date_str = ftp.ftps_session.sendcmd(
f"MDTM timelapse/{filename}"
).replace("213 ", "")
filedate = (
datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S")
.replace(tzinfo=datetime.timezone.utc)
.timestamp()
)
return_file_list.append(
{
"bytes": filesize,
"date": get_formatted_datetime(
datetime.datetime.fromtimestamp(filedate)
),
"name": filename,
"size": get_formatted_size(filesize),
"thumbnail": "/plugin/bambu_printer/thumbnail/"
+ filename.replace(".mp4", ".jpg").replace(
".avi", ".jpg"
),
"timestamp": filedate,
"url": f"/plugin/bambu_printer/timelapse/{filename}",
}
)
self._plugin_manager.send_plugin_message(
self._identifier, {"files": return_file_list}
)
except Exception as e:
self._logger.debug(f"Error getting timelapse files: {e}")
thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
def _hook_octoprint_server_api_before_request(self, *args, **kwargs):
return [self.get_timelapse_file_list]
@octoprint.plugin.BlueprintPlugin.route("/timelapse/<filename>", methods=["GET"])
@octoprint.server.util.flask.restricted_access
@no_firstrun_access
@Permissions.TIMELAPSE_DOWNLOAD.require(403)
def downloadTimelapse(self, filename):
dest_filename = os.path.join(self.get_plugin_data_folder(), filename)
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
if not os.path.exists(dest_filename):
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
download_result = ftp.download_file(
source=f"timelapse/{filename}",
dest=dest_filename,
)
return flask.redirect(
"/plugin/bambu_printer/download/timelapse/" + urlquote(filename), code=302
)
@octoprint.plugin.BlueprintPlugin.route("/thumbnail/<filename>", methods=["GET"])
@octoprint.server.util.flask.restricted_access
@no_firstrun_access
@Permissions.TIMELAPSE_DOWNLOAD.require(403)
def downloadThumbnail(self, filename):
dest_filename = os.path.join(self.get_plugin_data_folder(), filename)
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
if not os.path.exists(dest_filename):
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
download_result = ftp.download_file(
source=f"timelapse/thumbnail/{filename}",
dest=dest_filename,
)
return flask.redirect(
"/plugin/bambu_printer/download/thumbnail/" + urlquote(filename), code=302
)
def is_blueprint_csrf_protected(self):
return True
def route_hook(self, server_routes, *args, **kwargs):
return [
(
r"/download/timelapse/(.*)",
LargeResponseHandler,
{
"path": self.get_plugin_data_folder(),
"as_attachment": True,
"path_validation": path_validation_factory(
lambda path: not is_hidden_path(path), status_code=404
),
},
),
(
r"/download/thumbnail/(.*)",
LargeResponseHandler,
{
"path": self.get_plugin_data_folder(),
"as_attachment": True,
"path_validation": path_validation_factory(
lambda path: not is_hidden_path(path), status_code=404
),
},
),
]
def get_update_information(self):
return {
"bambu_printer": {
"displayName": "Bambu Printer",
"displayVersion": self._plugin_version,
"type": "github_release",
"user": "jneilliii",
"repo": "OctoPrint-BambuPrinter",
"current": self._plugin_version,
"stable_branch": {
"name": "Stable",
"branch": "master",
"comittish": ["master"],
},
"prerelease_branches": [
{
"name": "Release Candidate",
"branch": "rc",
"comittish": ["rc", "master"],
}
],
"pip": "https://github.com/jneilliii/OctoPrint-BambuPrinter/archive/{target_version}.zip",
}
}

View File

@ -13,16 +13,18 @@ import time
from typing import Any, Dict, List, Optional
import asyncio
from pybambu import BambuClient, commands
import logging
from serial import SerialTimeoutException
from octoprint.util import RepeatedTimer, to_bytes, to_unicode, get_dos_filename
from octoprint.util.files import unix_timestamp_to_m20_timestamp
from .char_counting_queue import CharCountingQueue
from .ftpsclient import IoTFTPSClient
# noinspection PyBroadException
class BambuPrinter:
class BambuVirtualPrinter:
command_regex = re.compile(r"^([GM])(\d+)")
def __init__(
@ -80,13 +82,7 @@ class BambuPrinter:
self._busy = None
self._busy_loop = None
import logging
self._logger = logging.getLogger(
"octoprint.plugins.bambu_printer.BambuPrinter"
)
self._logger = logging.getLogger("octoprint.plugins.bambu_printer.BambuPrinter")
self._settings = settings
self._printer_profile_manager = printer_profile_manager
@ -119,12 +115,12 @@ class BambuPrinter:
self._last_hms_errors = None
self.bambu = None
self._bambu: BambuClient = None
readThread = threading.Thread(
target=self._processIncoming,
name="octoprint.plugins.bambu_printer.wait_thread",
daemon=True
daemon=True,
)
readThread.start()
@ -139,14 +135,23 @@ class BambuPrinter:
connectionThread = threading.Thread(
target=self._create_connection,
name="octoprint.plugins.bambu_printer.connection_thread",
daemon=True
daemon=True,
)
connectionThread.start()
@property
def bambu(self):
if self._bambu is None:
raise ValueError("No connection to Bambulab was established")
return self._bambu
def new_update(self, event_type):
if event_type == "event_hms_errors":
bambu_printer = self.bambu.get_device()
if bambu_printer.hms.errors != self._last_hms_errors and bambu_printer.hms.errors["Count"] > 0:
if (
bambu_printer.hms.errors != self._last_hms_errors
and bambu_printer.hms.errors["Count"] > 0
):
self._logger.debug(f"HMS Error: {bambu_printer.hms.errors}")
for n in range(1, bambu_printer.hms.errors["Count"] + 1):
error = bambu_printer.hms.errors[f"{n}-Error"].strip()
@ -177,7 +182,7 @@ class BambuPrinter:
self._sdPrintingPausedSemaphore.clear()
self._sdPrintStarting = False
if not self._sdPrinting:
filename = print_job.get("subtask_name")
filename: str = print_job.get("subtask_name")
if not self._sdFileListCache.get(filename.lower()):
if self._sdFileListCache.get(f"{filename.lower()}.3mf"):
filename = f"{filename.lower()}.3mf"
@ -192,7 +197,10 @@ class BambuPrinter:
self._startSdPrint(from_printer=True)
# fuzzy math here to get print percentage to match BambuStudio
self._selectedSdFilePos = int(self._selectedSdFileSize * ((print_job.get("print_percentage") + 1)/100))
self._selectedSdFilePos = int(
self._selectedSdFileSize
* ((print_job.get("print_percentage") + 1) / 100)
)
if print_job.get("gcode_state") == "PAUSE":
if not self._sdPrintingPausedSemaphore.is_set():
@ -202,18 +210,23 @@ class BambuPrinter:
self._send("// action:paused")
self._sendPaused()
if print_job.get("gcode_state") == "FINISH" or print_job.get("gcode_state") == "FAILED":
if (
print_job.get("gcode_state") == "FINISH"
or print_job.get("gcode_state") == "FAILED"
):
if self._sdPrintStarting is False:
self._sdPrinting = False
if self._sdPrintingSemaphore.is_set():
self._selectedSdFilePos = self._selectedSdFileSize
self._finishSdPrint()
def _create_connection(self):
if (self._settings.get(["device_type"]) != "" and
self._settings.get(["serial"]) != "" and
self._settings.get(["serial"]) != "" and
self._settings.get(["username"]) != "" and
self._settings.get(["access_code"]) != ""
if (
self._settings.get(["device_type"]) != ""
and self._settings.get(["serial"]) != ""
and self._settings.get(["serial"]) != ""
and self._settings.get(["username"]) != ""
and self._settings.get(["access_code"]) != ""
):
asyncio.run(self._create_connection_async())
@ -226,16 +239,23 @@ class BambuPrinter:
return on_connect
async def _create_connection_async(self):
self._logger.debug(f"connecting via local mqtt: {self._settings.get_boolean(['local_mqtt'])}")
self.bambu = BambuClient(device_type=self._settings.get(["device_type"]),
self._logger.debug(
f"connecting via local mqtt: {self._settings.get_boolean(['local_mqtt'])}"
)
self.bambu = BambuClient(
device_type=self._settings.get(["device_type"]),
serial=self._settings.get(["serial"]),
host=self._settings.get(["host"]),
username="bblp" if self._settings.get_boolean(["local_mqtt"]) else self._settings.get(["username"]),
username=(
"bblp"
if self._settings.get_boolean(["local_mqtt"])
else self._settings.get(["username"])
),
access_code=self._settings.get(["access_code"]),
local_mqtt=self._settings.get_boolean(["local_mqtt"]),
region=self._settings.get(["region"]),
email=self._settings.get(["email"]),
auth_token=self._settings.get(["auth_token"])
auth_token=self._settings.get(["auth_token"]),
)
self.bambu.on_disconnect = self.on_disconnect(self.bambu.on_disconnect)
self.bambu.on_connect = self.on_connect(self.bambu.on_connect)
@ -247,7 +267,10 @@ class BambuPrinter:
return "BAMBU(read_timeout={read_timeout},write_timeout={write_timeout},options={options})".format(
read_timeout=self._read_timeout,
write_timeout=self._write_timeout,
options={"device_type": self._settings.get(["device_type"]), "host": self._settings.get(["host"])},
options={
"device_type": self._settings.get(["device_type"]),
"host": self._settings.get(["host"]),
},
)
def _calculate_resend_every_n(self, resend_ratio):
@ -432,7 +455,7 @@ class BambuPrinter:
data = to_unicode(data, encoding="ascii", errors="replace").strip()
# actual command handling
command_match = BambuPrinter.command_regex.match(data)
command_match = BambuVirtualPrinter.command_regex.match(data)
if command_match is not None:
command = command_match.group(0)
letter = command_match.group(1)
@ -459,7 +482,7 @@ class BambuPrinter:
if self.bambu.connected:
GCODE_COMMAND = commands.SEND_GCODE_TEMPLATE
GCODE_COMMAND['print']['param'] = data + "\n"
GCODE_COMMAND["print"]["param"] = data + "\n"
if self.bambu.publish(GCODE_COMMAND):
self._logger.info("command sent successfully")
self._sendOk()
@ -611,13 +634,20 @@ class BambuPrinter:
speed_fraction = 100 / percent
acceleration = math.exp((speed_fraction - 1.0191) / -0.814)
feed_rate = (2.1645 * (acceleration ** 3) - 5.3247 * (acceleration ** 2) + 4.342 * acceleration - 0.181)
feed_rate = (
2.1645 * (acceleration**3)
- 5.3247 * (acceleration**2)
+ 4.342 * acceleration
- 0.181
)
speed_level = 1.539 * (acceleration**2) - 0.7032 * acceleration + 4.0834
speed_command = f"M204.2 K${acceleration:.2f} \nM220 K${feed_rate:.2f} \nM73.2 R${speed_fraction:.2f} \nM1002 set_gcode_claim_speed_level ${speed_level:.0f}\n"
gcode_command['print']['param'] = speed_command
gcode_command["print"]["param"] = speed_command
if self.bambu.publish(gcode_command):
self._logger.info(f"{percent}% speed adjustment command sent successfully")
self._logger.info(
f"{percent}% speed adjustment command sent successfully"
)
return True
# noinspection PyUnusedLocal
@ -672,7 +702,7 @@ class BambuPrinter:
request_resend()
def _listSd(self, incl_long=False, incl_timestamp=False):
line = "{dosname} {size} {timestamp} \"{name}\""
line = '{dosname} {size} {timestamp} "{name}"'
self._send("Begin file list")
for item in map(lambda x: line.format(**x), self._getSdFiles()):
@ -694,14 +724,20 @@ class BambuPrinter:
filename = entry
filesize = ftp.ftps_session.size(entry)
date_str = ftp.ftps_session.sendcmd(f"MDTM {entry}").replace("213 ", "")
filedate = datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S").replace(tzinfo=datetime.timezone.utc).timestamp()
dosname = get_dos_filename(filename, existing_filenames=list(result.keys())).lower()
filedate = (
datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S")
.replace(tzinfo=datetime.timezone.utc)
.timestamp()
)
dosname = get_dos_filename(
filename, existing_filenames=list(result.keys())
).lower()
data = {
"dosname": dosname,
"name": filename,
"path": filename,
"size": filesize,
"timestamp": unix_timestamp_to_m20_timestamp(int(filedate))
"timestamp": unix_timestamp_to_m20_timestamp(int(filedate)),
}
result[dosname.lower()] = filename.lower()
result[filename.lower()] = data
@ -714,15 +750,23 @@ class BambuPrinter:
else:
filename = entry.replace("cache/", "")
filesize = ftp.ftps_session.size(f"cache/{filename}")
date_str = ftp.ftps_session.sendcmd(f"MDTM cache/{filename}").replace("213 ", "")
filedate = datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S").replace(tzinfo=datetime.timezone.utc).timestamp()
dosname = get_dos_filename(filename, existing_filenames=list(result.keys())).lower()
date_str = ftp.ftps_session.sendcmd(f"MDTM cache/{filename}").replace(
"213 ", ""
)
filedate = (
datetime.datetime.strptime(date_str, "%Y%m%d%H%M%S")
.replace(tzinfo=datetime.timezone.utc)
.timestamp()
)
dosname = get_dos_filename(
filename, existing_filenames=list(result.keys())
).lower()
data = {
"dosname": dosname,
"name": filename,
"path": "cache/" + filename,
"size": filesize,
"timestamp": unix_timestamp_to_m20_timestamp(int(filedate))
"timestamp": unix_timestamp_to_m20_timestamp(int(filedate)),
}
result[dosname.lower()] = filename.lower()
result[filename.lower()] = data
@ -743,7 +787,9 @@ class BambuPrinter:
return [x for x in self._sdFileListCache.values() if isinstance(x, dict)]
def _selectSdFile(self, filename: str, check_already_open: bool = False) -> None:
self._logger.debug(f"_selectSdFile: {filename}, check_already_open={check_already_open}")
self._logger.debug(
f"_selectSdFile: {filename}, check_already_open={check_already_open}"
)
if filename.startswith("/"):
filename = filename[1:]
@ -770,7 +816,9 @@ class BambuPrinter:
if self._sdPrinter is None:
self._sdPrinting = True
self._sdPrintStarting = True
self._sdPrinter = threading.Thread(target=self._sdPrintingWorker, kwargs={"from_printer": from_printer})
self._sdPrinter = threading.Thread(
target=self._sdPrintingWorker, kwargs={"from_printer": from_printer}
)
self._sdPrinter.start()
# self._sdPrintingSemaphore.set()
if self._sdPrinter is not None:
@ -803,8 +851,12 @@ class BambuPrinter:
self._newSdFilePos = pos
def _reportSdStatus(self):
if ( self._sdPrinter is not None or self._sdPrintStarting is True ) and self._selectedSdFileSize > 0:
self._send(f"SD printing byte {self._selectedSdFilePos}/{self._selectedSdFileSize}")
if (
self._sdPrinter is not None or self._sdPrintStarting is True
) and self._selectedSdFileSize > 0:
self._send(
f"SD printing byte {self._selectedSdFilePos}/{self._selectedSdFileSize}"
)
else:
self._send("Not SD printing")
@ -858,7 +910,9 @@ class BambuPrinter:
self._selectedSdFilePos = 0
try:
if not from_printer and self.bambu.connected:
print_command = {"print": {"sequence_id": 0,
print_command = {
"print": {
"sequence_id": 0,
"command": "project_file",
"param": "Metadata/plate_1.gcode",
"md5": "",
@ -868,13 +922,20 @@ class BambuPrinter:
"task_id": "0",
"subtask_name": f"{self._selectedSdFile}",
"file": f"{self._selectedSdFile}",
"url": f"file:///mnt/sdcard/{self._selectedSdFile}" if self._settings.get_boolean(["device_type"]) in ["X1", "X1C"] else f"file:///sdcard/{self._selectedSdFile}",
"url": (
f"file:///mnt/sdcard/{self._selectedSdFile}"
if self._settings.get_boolean(["device_type"])
in ["X1", "X1C"]
else f"file:///sdcard/{self._selectedSdFile}"
),
"timelapse": self._settings.get_boolean(["timelapse"]),
"bed_leveling": self._settings.get_boolean(["bed_leveling"]),
"flow_cali": self._settings.get_boolean(["flow_cali"]),
"vibration_cali": self._settings.get_boolean(["vibration_cali"]),
"vibration_cali": self._settings.get_boolean(
["vibration_cali"]
),
"layer_inspect": self._settings.get_boolean(["layer_inspect"]),
"use_ams": self._settings.get_boolean(["use_ams"])
"use_ams": self._settings.get_boolean(["use_ams"]),
}
}
self.bambu.publish(print_command)
@ -913,7 +974,9 @@ class BambuPrinter:
filename = filename[1:]
file = self._getSdFileData(filename)
if file is not None:
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
ftp = IoTFTPSClient(
f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True
)
try:
if ftp.delete_file(file["path"]):
self._logger.debug(f"{filename} deleted")
@ -978,7 +1041,9 @@ class BambuPrinter:
return len(data)
try:
written = self.incoming.put(data, timeout=self._write_timeout, partial=True)
written = self.incoming.put(
data, timeout=self._write_timeout, partial=True
)
self._seriallog.debug(f"<<< {u_data}")
return written
except queue.Full:
@ -1017,9 +1082,18 @@ class BambuPrinter:
def _isPaused(self):
return self._sdPrintingPausedSemaphore.is_set()
def _sendPaused(self):
paused_timer = RepeatedTimer(interval=3.0, function=self._send, args=[f"SD printing byte {self._selectedSdFilePos}/{self._selectedSdFileSize}"],
daemon=True, run_first=True, condition=self._isPaused)
paused_timer = RepeatedTimer(
interval=3.0,
function=self._send,
args=[
f"SD printing byte {self._selectedSdFilePos}/{self._selectedSdFileSize}"
],
daemon=True,
run_first=True,
condition=self._isPaused,
)
paused_timer.start()
def _send(self, line: str) -> None:
@ -1031,68 +1105,3 @@ class BambuPrinter:
def _error(self, error: str, *args, **kwargs) -> str:
return f"Error: {self._errors.get(error).format(*args, **kwargs)}"
# noinspection PyUnresolvedReferences
class CharCountingQueue(queue.Queue):
def __init__(self, maxsize, name=None):
queue.Queue.__init__(self, maxsize=maxsize)
self._size = 0
self._name = name
def clear(self):
with self.mutex:
self.queue.clear()
def put(self, item, block=True, timeout=None, partial=False) -> int:
self.not_full.acquire()
try:
if not self._will_it_fit(item) and partial:
space_left = self.maxsize - self._qsize()
if space_left:
item = item[:space_left]
if not block:
if not self._will_it_fit(item):
raise queue.Full
elif timeout is None:
while not self._will_it_fit(item):
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = time.monotonic() + timeout
while not self._will_it_fit(item):
remaining = endtime - time.monotonic()
if remaining <= 0:
raise queue.Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
return self._len(item)
finally:
self.not_full.release()
# noinspection PyMethodMayBeStatic
def _len(self, item):
return len(item)
def _qsize(self, l=len): # noqa: E741
return self._size
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
self._size += self._len(item)
# Get an item from the queue
def _get(self):
item = self.queue.popleft()
self._size -= self._len(item)
return item
def _will_it_fit(self, item):
return self.maxsize - self._qsize() >= self._len(item)

View File

@ -0,0 +1,67 @@
import queue
import time
class CharCountingQueue(queue.Queue):
def __init__(self, maxsize, name=None):
queue.Queue.__init__(self, maxsize=maxsize)
self._size = 0
self._name = name
def clear(self):
with self.mutex:
self.queue.clear()
def put(self, item, block=True, timeout=None, partial=False) -> int:
self.not_full.acquire()
try:
if not self._will_it_fit(item) and partial:
space_left = self.maxsize - self._qsize()
if space_left:
item = item[:space_left]
if not block:
if not self._will_it_fit(item):
raise queue.Full
elif timeout is None:
while not self._will_it_fit(item):
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = time.monotonic() + timeout
while not self._will_it_fit(item):
remaining = endtime - time.monotonic()
if remaining <= 0:
raise queue.Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
return self._len(item)
finally:
self.not_full.release()
# noinspection PyMethodMayBeStatic
def _len(self, item):
return len(item)
def _qsize(self, l=len): # noqa: E741
return self._size
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
self._size += self._len(item)
# Get an item from the queue
def _get(self):
item = self.queue.popleft()
self._size -= self._len(item)
return item
def _will_it_fit(self, item):
return self.maxsize - self._qsize() >= self._len(item)