This commit is contained in:
jneilliii 2024-01-07 14:54:00 -05:00
commit 0283ed22a4
17 changed files with 1536 additions and 0 deletions

18
.editorconfig Normal file
View File

@ -0,0 +1,18 @@
# This file is for unifying the coding style for different editors and IDEs
# editorconfig.org
root = true
[*]
end_of_line = lf
charset = utf-8
insert_final_newline = true
trim_trailing_whitespace = true
[**.py]
indent_style = space
indent_size = 4
[**.js]
indent_style = space
indent_size = 4

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
*.pyc
*.swp
.idea
*.iml
build
dist
*.egg*
.DS_Store
*.zip
extras

4
MANIFEST.in Normal file
View File

@ -0,0 +1,4 @@
include README.md
recursive-include octoprint_bambu_printer/templates *
recursive-include octoprint_bambu_printer/translations *
recursive-include octoprint_bambu_printer/static *

17
README.md Normal file
View File

@ -0,0 +1,17 @@
# OctoPrint-BambuPrinter
**TODO:** Describe what your plugin does.
## Setup
Install via the bundled [Plugin Manager](https://docs.octoprint.org/en/master/bundledplugins/pluginmanager.html)
or manually using this URL:
https://github.com/jneilliii/OctoPrint-BambuPrinter/archive/master.zip
**TODO:** Describe how to install your plugin, if more needs to be done than just installing it via pip or through
the plugin manager.
## Configuration
**TODO:** Describe your plugin's configuration options (if any).

8
babel.cfg Normal file
View File

@ -0,0 +1,8 @@
[python: */**.py]
[jinja2: */**.jinja2]
silent=false
extensions=jinja2.ext.do, octoprint.util.jinja.trycatch
[javascript: */**.js]
extract_messages = gettext, ngettext

View File

@ -0,0 +1,135 @@
# coding=utf-8
from __future__ import absolute_import
import threading
import time
import octoprint.plugin
from .ftpsclient import IoTFTPSClient
class BambuPrintPlugin(
octoprint.plugin.SettingsPlugin, octoprint.plugin.TemplatePlugin
):
def get_template_configs(self):
return [{"type": "settings", "custom_bindings": False}]
def get_settings_defaults(self):
return {"device_type": "X1C",
"serial": "",
"host": "",
"access_code": "",
"username": "bblp",
"timelapse": False,
"bed_leveling": True,
"flow_cali": False,
"vibration_cali": True,
"layer_inspect": True,
"use_ams": False}
def support_3mf_files(self):
return {'machinecode': {'3mf': ["3mf"]}}
def upload_to_sd(self, printer, filename, path, sd_upload_started, sd_upload_succeeded, sd_upload_failed, *args, **kwargs):
self._logger.debug(f"Starting upload from {filename} to {filename}")
sd_upload_started(filename, filename)
def process():
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
elapsed = time.monotonic()
try:
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
if ftp.upload_file(path, f"{filename}"):
elapsed = time.monotonic() - elapsed
sd_upload_succeeded(filename, filename, elapsed)
# remove local file after successful upload to Bambu
self._file_manager.remove_file("local", filename)
else:
raise Exception("upload failed")
except Exception as e:
elapsed = time.monotonic() - elapsed
sd_upload_failed(filename, filename, elapsed)
self._logger.debug(f"Error uploading file {filename}")
thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
return filename
def virtual_printer_factory(self, comm_instance, port, baudrate, read_timeout):
if not port == "BAMBU":
return None
if self._settings.get(["serial"]) == "" or self._settings.get(["host"]) == "" or self._settings.get(["access_code"]) == "":
return None
import logging.handlers
from octoprint.logging.handlers import CleaningTimedRotatingFileHandler
seriallog_handler = CleaningTimedRotatingFileHandler(
self._settings.get_plugin_logfile_path(postfix="serial"),
when="D",
backupCount=3,
)
seriallog_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
seriallog_handler.setLevel(logging.DEBUG)
from . import virtual
serial_obj = virtual.BambuPrinter(
self._settings,
self._printer_profile_manager,
data_folder=self.get_plugin_data_folder(),
seriallog_handler=seriallog_handler,
read_timeout=float(read_timeout),
faked_baudrate=baudrate,
)
return serial_obj
def get_additional_port_names(self, *args, **kwargs):
if self._settings.get(["serial"]) != "" and self._settings.get(["host"]) != "" and self._settings.get(["access_code"]) != "":
return ["BAMBU"]
else:
return []
def get_update_information(self):
return {'bambu_printer': {'displayName': "Bambu Printer",
'displayVersion': self._plugin_version,
'type': "github_release",
'user': "jneilliii",
'repo': "OctoPrint-BambuPrinter",
'current': self._plugin_version,
'stable_branch': {'name': "Stable",
'branch': "master",
'comittish': ["master"]},
'prerelease_branches': [
{'name': "Release Candidate",
'branch': "rc",
'comittish': ["rc", "master"]}
],
'pip': "https://github.com/jneilliii/OctoPrint-BambuPrinter/archive/{target_version}.zip"}}
__plugin_name__ = "Bambu Printer"
__plugin_pythoncompat__ = ">=3.7,<4"
def __plugin_load__():
plugin = BambuPrintPlugin()
global __plugin_implementation__
__plugin_implementation__ = plugin
global __plugin_hooks__
__plugin_hooks__ = {
"octoprint.comm.transport.serial.factory": __plugin_implementation__.virtual_printer_factory,
"octoprint.comm.transport.serial.additional_port_names": __plugin_implementation__.get_additional_port_names,
"octoprint.filemanager.extension_tree": __plugin_implementation__.support_3mf_files,
"octoprint.printer.sdcardupload": __plugin_implementation__.upload_to_sd,
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information,
}

View File

@ -0,0 +1,2 @@
from ._client import IoTFTPSClient
from ._version import __version__

View File

@ -0,0 +1,159 @@
"""wrapper for FTPS server interactions"""
import ftplib
import ssl
from typing import List, Optional, Union
class ImplicitTLS(ftplib.FTP_TLS):
"""ftplib.FTP_TLS sub-class to support implicit SSL FTPS"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._sock = None
@property
def sock(self):
"""return socket"""
return self._sock
@sock.setter
def sock(self, value):
"""wrap and set SSL socket"""
if value is not None and not isinstance(value, ssl.SSLSocket):
value = self.context.wrap_socket(value)
self._sock = value
def ntransfercmd(self, cmd, rest=None):
conn, size = ftplib.FTP.ntransfercmd(self, cmd, rest)
if self._prot_p:
conn = self.context.wrap_socket(conn,
server_hostname=self.host,
session=self.sock.session) # this is the fix
return conn, size
class IoTFTPSClient:
"""iot ftps ftpsclient"""
ftps_host: str
ftps_port: int
ftps_user: str
ftps_pass: str
ssl_implicit: bool
ftps_session: Union[ftplib.FTP, ImplicitTLS]
def __init__(
self,
ftps_host: str,
ftps_port: Optional[int] = 21,
ftps_user: Optional[str] = "",
ftps_pass: Optional[str] = "",
ssl_implicit: Optional[bool] = False,
) -> None:
self.ftps_host = ftps_host
self.ftps_port = ftps_port
self.ftps_user = ftps_user
self.ftps_pass = ftps_pass
self.ssl_implicit = ssl_implicit
self.instantiate_ftps_session()
def __repr__(self) -> str:
return (
"IoT FTPS Client\n"
"--------------------\n"
f"host: {self.ftps_host}\n"
f"port: {self.ftps_port}\n"
f"user: {self.ftps_user}\n"
f"ssl: {self.ssl_implicit}"
)
def instantiate_ftps_session(self) -> None:
"""init ftps_session based on input params"""
try:
if self.ssl_implicit:
self.ftps_session = ImplicitTLS()
else:
self.ftps_session = ftplib.FTP()
self.ftps_session.connect(host=self.ftps_host, port=self.ftps_port)
if self.ftps_user != "" and self.ftps_pass != "":
self.ftps_session.login(user=self.ftps_user, passwd=self.ftps_pass)
else:
self.ftps_session.login()
if self.ssl_implicit:
self.ftps_session.prot_p()
except Exception as ex:
print(f"unexpected exception occurred: {ex}")
pass
return
def disconnect(self) -> None:
"""disconnect the current session from the ftps server"""
try:
self.ftps_session.close()
except Exception as ex:
print(f"unexpected exception occurred: {ex}")
pass
return
def download_file(self, source: str, dest: str) -> bool:
"""download a file to a path on the local filesystem"""
try:
with open(dest, "wb") as file:
self.ftps_session.retrbinary(f"RETR {source}", file.write)
return True
except Exception as ex:
print(f"unexpected exception occurred: {ex}")
pass
return False
def upload_file(self, source: str, dest: str) -> bool:
"""upload a file to a path inside the FTPS server"""
try:
with open(source, "rb") as file:
self.ftps_session.storbinary(f"STOR {dest}", file)
return True
except Exception as ex:
print(f"unexpected exception occurred: {ex}")
pass
return False
def delete_file(self, path: str) -> bool:
"""delete a file from under a path inside the FTPS server"""
try:
self.ftps_session.delete(path)
return True
except Exception as ex:
print(f"unexpected exception occurred: {ex}")
pass
return False
def move_file(self, source: str, dest: str) -> bool:
"""move a file inside the FTPS server to another path inside the FTPS server"""
try:
self.ftps_session.rename(source, dest)
return True
except Exception as ex:
print(f"unexpected exception occurred: {ex}")
pass
return False
def list_files(
self, path: str, file_pattern: Optional[str] = None
) -> Union[List[str], None]:
"""list files under a path inside the FTPS server"""
try:
files = self.ftps_session.nlst(path)
if not files:
return
if file_pattern:
return [f for f in files if file_pattern in f]
return files
except Exception as ex:
print(f"unexpected exception occurred: {ex}")
pass
return

View File

@ -0,0 +1,3 @@
VERSION = "1.1.1"
__version__ = VERSION

View File

@ -0,0 +1,29 @@
/*
* View model for OctoPrint-BambuPrinter
*
* Author: jneilliii
* License: AGPLv3
*/
$(function() {
function Bambu_printerViewModel(parameters) {
var self = this;
// assign the injected parameters, e.g.:
// self.loginStateViewModel = parameters[0];
// self.settingsViewModel = parameters[1];
// TODO: Implement your plugin's view model here.
}
/* view model class, parameters for constructor, container to bind to
* Please see http://docs.octoprint.org/en/master/plugins/viewmodels.html#registering-custom-viewmodels for more details
* and a full list of the available options.
*/
OCTOPRINT_VIEWMODELS.push({
construct: Bambu_printerViewModel,
// ViewModels your plugin depends on, e.g. loginStateViewModel, settingsViewModel, ...
dependencies: [ /* "loginStateViewModel", "settingsViewModel" */ ],
// Elements to bind to, e.g. #settings_plugin_bambu_printer, #tab_plugin_bambu_printer, ...
elements: [ /* ... */ ]
});
});

View File

@ -0,0 +1,40 @@
<h3>Virtual Printer</h3>
<form class="form-horizontal" onsubmit="return false;">
<div class="control-group">
<label class="control-label">{{ _('Device Type') }}</label>
<div class="controls">
<select class="input-block-level" data-bind="options: ['A1', 'A1MINI', 'P1P', 'P1S', 'X1', 'X1C'], value: settings.plugins.bambu_printer.device_type, allowUnset: true">
</select>
</div>
</div>
<div class="control-group">
<label class="control-label">{{ _('IP Address') }}</label>
<div class="controls">
<input type="text" class="input-block-level" data-bind="value: settings.plugins.bambu_printer.host" placeholder="192.168.0.2" title="{{ _('IP address or hostname of the printer') }}"></input>
</div>
</div>
<div class="control-group">
<label class="control-label">{{ _('Serial Number') }}</label>
<div class="controls">
<input type="text" class="input-block-level" data-bind="value: settings.plugins.bambu_printer.serial" title="{{ _('Serial number of printer') }}"></input>
</div>
</div>
<div class="control-group">
<label class="control-label">{{ _('Access Code') }}</label>
<div class="controls">
<input type="text" class="input-block-level" data-bind="value: settings.plugins.bambu_printer.access_code" title="{{ _('Access code of printer') }}"></input>
</div>
</div>
<div class="control-group">
<label class="control-label">{{ _('Print Options') }}</label>
<div class="controls">
<label class="checkbox"><input type="checkbox" data-bind="checked: settings.plugins.bambu_printer.timelapse"> {{ _('Enable timelapse') }}</label>
<label class="checkbox"><input type="checkbox" data-bind="checked: settings.plugins.bambu_printer.bed_leveling"> {{ _('Enable bed leveling') }}</label>
<label class="checkbox"><input type="checkbox" data-bind="checked: settings.plugins.bambu_printer.flow_cali"> {{ _('Enable flow calibration') }}</label>
<label class="checkbox"><input type="checkbox" data-bind="checked: settings.plugins.bambu_printer.vibration_cali"> {{ _('Enable vibration calibration') }}</label>
<label class="checkbox"><input type="checkbox" data-bind="checked: settings.plugins.bambu_printer.layer_inspect"> {{ _('Enable first layer inspection') }}</label>
<label class="checkbox"><input type="checkbox" data-bind="checked: settings.plugins.bambu_printer.use_ams"> {{ _('Use AMS') }}</label>
</div>
</div>
</form>

View File

@ -0,0 +1,970 @@
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = "GNU Affero General Public License http://www.gnu.org/licenses/agpl.html"
import collections
import os
import queue
import re
import threading
import time
from typing import Any, Dict, List, Optional
import asyncio
from pybambu import BambuClient, commands
from serial import SerialTimeoutException
from octoprint.util import RepeatedTimer, to_bytes, to_unicode, get_dos_filename
from .ftpsclient import IoTFTPSClient
# noinspection PyBroadException
class BambuPrinter:
command_regex = re.compile(r"^([GM])(\d+)")
def __init__(
self,
settings,
printer_profile_manager,
data_folder,
seriallog_handler=None,
read_timeout=5.0,
write_timeout=10.0,
faked_baudrate=115200,
):
self._busyInterval = 2.0
self.tick_rate = 2.0
self._errors = {
"checksum_mismatch": "Checksum mismatch",
"checksum_missing": "Missing checksum",
"lineno_mismatch": "expected line {} got {}",
"lineno_missing": "No Line Number with checksum, Last Line: {}",
"maxtemp": "MAXTEMP triggered!",
"mintemp": "MINTEMP triggered!",
"command_unknown": "Unknown command {}",
}
self._sendBusy = False
self._ambient_temperature = 21.3
self.temp = [self._ambient_temperature ]
self.targetTemp = [0.0]
self.bedTemp = self._ambient_temperature
self.bedTargetTemp = 0.0
self.chamberTemp = self._ambient_temperature
self.chamberTargetTemp = 0.0
self.lastTempAt = time.monotonic()
self._firmwareName = "Bambu"
self._m115FormatString = "FIRMWARE_NAME:{firmware_name} PROTOCOL_VERSION:1.0"
self._received_lines = 0
self.extruderCount = 1
self._waitInterval = 5.0
self._killed = False
self._heatingUp = False
self.current_line = 0
self._writingToSd = False
self._sdCardReady = True
self._sdPrinter = None
self._sdPrinting = False
self._sdPrintingSemaphore = threading.Event()
self._sdPrintingPausedSemaphore = threading.Event()
self._selectedSdFile = None
self._selectedSdFileSize = 0
self._selectedSdFilePos = 0
self._busy = None
self._busy_loop = None
import logging
self._logger = logging.getLogger(
"octoprint.plugins.bambu_printer.BambuPrinter"
)
self._settings = settings
self._printer_profile_manager = printer_profile_manager
self._faked_baudrate = faked_baudrate
self._plugin_data_folder = data_folder
self._seriallog = logging.getLogger(
"octoprint.plugins.bambu_printer.BambuPrinter.serial"
)
self._seriallog.setLevel(logging.CRITICAL)
self._seriallog.propagate = False
if seriallog_handler is not None:
import logging.handlers
self._seriallog.addHandler(seriallog_handler)
self._seriallog.setLevel(logging.INFO)
self._seriallog.debug("-" * 78)
self._read_timeout = read_timeout
self._write_timeout = write_timeout
self._rx_buffer_size = 64
self._incoming_lock = threading.RLock()
self.incoming = CharCountingQueue(self._rx_buffer_size, name="RxBuffer")
self.outgoing = queue.Queue()
self.buffered = queue.Queue(maxsize=4)
self._last_hms_errors = None
self.bambu = None
readThread = threading.Thread(
target=self._processIncoming,
name="octoprint.plugins.bambu_printer.wait_thread",
daemon=True
)
readThread.start()
# bufferThread = threading.Thread(
# target=self._processBuffer,
# name="octoprint.plugins.bambu_printer.buffer_thread",
# daemon=True
# )
# bufferThread.start()
connectionThread = threading.Thread(
target=self._create_connection,
name="octoprint.plugins.bambu_printer.connection_thread",
daemon=True
)
connectionThread.start()
def new_update(self, event_type):
if event_type == "event_hms_errors":
bambu_printer = self.bambu.get_device()
if bambu_printer.hms.errors != self._last_hms_errors and bambu_printer.hms.errors["Count"] > 0:
self._logger.debug(f"HMS Error: {bambu_printer.hms.errors}")
for n in range(1, bambu_printer.hms.errors["Count"]+1):
error = bambu_printer.hms.errors[f"{n}-Error"].strip()
self._send(f"// action:notification {error}")
self._last_hms_errors = bambu_printer.hms.errors
elif event_type == "event_printer_data_update":
device_data = self.bambu.get_device()
ams = device_data.ams.__dict__
info = device_data.info.__dict__
temperatures = device_data.temperature.__dict__
lights = device_data.lights.__dict__
fans = device_data.fans.__dict__
speed = device_data.speed.__dict__
self.temp[0] = temperatures.get("nozzle_temp", 0.0)
self.targetTemp[0] = temperatures.get("target_nozzle_temp", 0.0)
self.bedTemp = temperatures.get("bed_temp", 0.0)
self.bedTargetTemp = temperatures.get("target_bed_temp", 0.0)
self.chamberTemp = temperatures.get("chamber_temp", 0.0)
if info.get("gcode_state") == "RUNNING":
if not self._sdPrintingSemaphore.is_set():
self._sdPrintingSemaphore.set()
if self._sdPrintingPausedSemaphore.is_set():
self._sdPrintingPausedSemaphore.clear()
if not self._sdPrinting:
filename = info.get("subtask_name")
self._selectSdFile(filename)
self._startSdPrint(from_printer=True)
# fuzzy math here to get print percentage to match BambuStudio
self._selectedSdFilePos = int(self._selectedSdFileSize * ((info.get("print_percentage") + 1)/100))
if info.get("gcode_state") == "PAUSE":
if not self._sdPrintingPausedSemaphore.is_set():
self._sdPrintingPausedSemaphore.set()
if self._sdPrintingSemaphore.is_set():
self._sdPrintingSemaphore.clear()
self._send("// action:paused")
self._sendPaused()
if info.get("gcode_state") == "FINISH" and self._sdPrintingSemaphore.is_set():
self._selectedSdFilePos = self._selectedSdFileSize
self._finishSdPrint()
def _create_connection(self):
if (self._settings.get(["device_type"]) != "" and
self._settings.get(["serial"]) != "" and
self._settings.get(["serial"]) != "" and
self._settings.get(["username"]) != "" and
self._settings.get(["access_code"]) != ""
):
asyncio.run(self._create_connection_async())
async def _create_connection_async(self):
self.bambu = BambuClient(device_type=self._settings.get(["device_type"]),
serial=self._settings.get(["serial"]),
host=self._settings.get(["host"]),
username=self._settings.get(["username"]),
access_code=self._settings.get(["access_code"])
)
await self.bambu.connect(callback=self.new_update)
self._logger.info(f"bambu connection status: {self.bambu.connected}")
self._sendOk()
# while True:
# await asyncio.sleep(self.tick_rate)
# self._processTemperatureQuery()
def __str__(self):
return "BAMBU(read_timeout={read_timeout},write_timeout={write_timeout},options={options})".format(
read_timeout=self._read_timeout,
write_timeout=self._write_timeout,
options={"device_type": self._settings.get(["device_type"]), "host": self._settings.get(["host"])},
)
def _calculate_resend_every_n(self, resend_ratio):
self._resend_every_n = (100 // resend_ratio) if resend_ratio else 0
def _reset(self):
with self._incoming_lock:
self._relative = True
self._lastX = 0.0
self._lastY = 0.0
self._lastZ = 0.0
self._lastE = [0.0] * self.extruderCount
self._lastF = 200
self._unitModifier = 1
self._feedrate_multiplier = 100
self._flowrate_multiplier = 100
self._sdCardReady = True
self._sdPrinting = False
if self._sdPrinter:
self._sdPrinting = False
self._sdPrintingSemaphore.clear()
self._sdPrintingPausedSemaphore.clear()
self._sdPrinter = None
self._selectedSdFile = None
self._selectedSdFileSize = None
self._selectedSdFilePos = None
if self._writingToSdHandle:
try:
self._writingToSdHandle.close()
except Exception:
pass
self._writingToSd = False
self._writingToSdHandle = None
self._writingToSdFile = None
self._newSdFilePos = None
self._heatingUp = False
self.current_line = 0
self.lastN = 0
self._debug_awol = False
self._debug_sleep = 0
# self._sleepAfterNext.clear()
# self._sleepAfter.clear()
self._dont_answer = False
self._broken_klipper_connection = False
self._debug_drop_connection = False
self._killed = False
if self._sdstatus_reporter is not None:
self._sdstatus_reporter.cancel()
self._sdstatus_reporter = None
self._clearQueue(self.incoming)
self._clearQueue(self.outgoing)
# self._clearQueue(self.buffered)
if self._settings.get_boolean(["simulateReset"]):
for item in self._settings.get(["resetLines"]):
self._send(item + "\n")
self._locked = self._settings.get_boolean(["locked"])
@property
def timeout(self):
return self._read_timeout
@timeout.setter
def timeout(self, value):
self._logger.debug(f"Setting read timeout to {value}s")
self._read_timeout = value
@property
def write_timeout(self):
return self._write_timeout
@write_timeout.setter
def write_timeout(self, value):
self._logger.debug(f"Setting write timeout to {value}s")
self._write_timeout = value
@property
def port(self):
return "BAMBU"
@property
def baudrate(self):
return self._faked_baudrate
# noinspection PyMethodMayBeStatic
def _clearQueue(self, q):
try:
while q.get(block=False):
q.task_done()
continue
except queue.Empty:
pass
def _processIncoming(self):
linenumber = 0
next_wait_timeout = 0
def recalculate_next_wait_timeout():
nonlocal next_wait_timeout
next_wait_timeout = time.monotonic() + self._waitInterval
recalculate_next_wait_timeout()
data = None
buf = b""
while self.incoming is not None and not self._killed:
try:
data = self.incoming.get(timeout=0.01)
data = to_bytes(data, encoding="ascii", errors="replace")
self.incoming.task_done()
except queue.Empty:
continue
except Exception:
if self.incoming is None:
# just got closed
break
if data is not None:
buf += data
nl = buf.find(b"\n") + 1
if nl > 0:
data = buf[:nl]
buf = buf[nl:]
else:
continue
recalculate_next_wait_timeout()
if data is None:
continue
self._received_lines += 1
# strip checksum
if b"*" in data:
checksum = int(data[data.rfind(b"*") + 1 :])
data = data[: data.rfind(b"*")]
if not checksum == self._calculate_checksum(data):
self._triggerResend(expected=self.current_line + 1)
continue
self.current_line += 1
elif self._settings.get_boolean(["forceChecksum"]):
self._send(self._error("checksum_missing"))
continue
# track N = N + 1
if data.startswith(b"N") and b"M110" in data:
linenumber = int(re.search(b"N([0-9]+)", data).group(1))
self.lastN = linenumber
self.current_line = linenumber
self._sendOk()
continue
elif data.startswith(b"N"):
linenumber = int(re.search(b"N([0-9]+)", data).group(1))
expected = self.lastN + 1
if linenumber != expected:
self._triggerResend(actual=linenumber)
continue
else:
self.lastN = linenumber
data = data.split(None, 1)[1].strip()
data += b"\n"
data = to_unicode(data, encoding="ascii", errors="replace").strip()
# actual command handling
command_match = BambuPrinter.command_regex.match(data)
if command_match is not None:
command = command_match.group(0)
letter = command_match.group(1)
try:
# if we have a method _gcode_G, _gcode_M or _gcode_T, execute that first
letter_handler = f"_gcode_{letter}"
if hasattr(self, letter_handler):
code = command_match.group(2)
handled = getattr(self, letter_handler)(code, data)
if handled:
self._sendOk()
continue
# then look for a method _gcode_<command> and execute that if it exists
command_handler = f"_gcode_{command}"
if hasattr(self, command_handler):
handled = getattr(self, command_handler)(data)
if handled:
self._sendOk()
continue
else:
self._sendOk()
finally:
self._logger.debug(f"{data}")
self._logger.debug("Closing down read loop")
##~~ command implementations
# noinspection PyUnusedLocal
def _gcode_M20(self, data: str) -> bool:
if self._sdCardReady:
self._listSd(incl_long="L" in data, incl_timestamp="T" in data)
return True
# noinspection PyUnusedLocal
def _gcode_M21(self, data: str) -> bool:
self._sdCardReady = True
self._send("SD card ok")
return True
# noinspection PyUnusedLocal
def _gcode_M22(self, data: str) -> bool:
self._logger.debug("ignoring M22 command.")
self._send("M22 disabled for Bambu")
return True
def _gcode_M23(self, data: str) -> bool:
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._selectSdFile(filename)
return True
# noinspection PyUnusedLocal
def _gcode_M24(self, data: str) -> bool:
if self._sdCardReady:
self._startSdPrint()
return True
# noinspection PyUnusedLocal
def _gcode_M25(self, data: str) -> bool:
if self._sdCardReady:
self._pauseSdPrint()
return True
def _gcode_M26(self, data: str) -> bool:
self._logger.debug("ignoring M26 command.")
self._send("M26 disabled for Bambu")
return True
def _gcode_M27(self, data: str) -> bool:
def report():
if self._sdCardReady:
self._reportSdStatus()
matchS = re.search(r"S([0-9]+)", data)
if matchS:
interval = int(matchS.group(1))
if self._sdstatus_reporter is not None:
self._sdstatus_reporter.cancel()
if interval > 0:
self._sdstatus_reporter = RepeatedTimer(interval, report)
self._sdstatus_reporter.start()
else:
self._sdstatus_reporter = None
report()
return True
def _gcode_M28(self, data: str) -> bool:
self._logger.debug("ignoring M28 command.")
self._send("M28 disabled for Bambu")
return True
# noinspection PyUnusedLocal
def _gcode_M29(self, data: str) -> bool:
self._logger.debug("ignoring M28 command.")
self._send("M28 disabled for Bambu")
return True
def _gcode_M30(self, data: str) -> bool:
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._deleteSdFile(filename)
return True
def _gcode_M33(self, data: str) -> bool:
self._logger.debug("ignoring M33 command.")
self._send("M33 disabled for Bambu")
return True
# noinspection PyUnusedLocal
def _gcode_M105(self, data: str) -> bool:
self._processTemperatureQuery()
return True
# noinspection PyUnusedLocal
def _gcode_M115(self, data: str) -> bool:
self._send("Bambu Printer Integration")
self._send("Cap:EXTENDED_M20:1")
self._send("Cap:LFN_WRITE:1")
self._send("Cap:LFN_WRITE:1")
return True
def _gcode_M117(self, data: str) -> bool:
# we'll just use this to echo a message, to allow playing around with pause triggers
result = re.search(r"M117\s+(.*)", data).group(1)
self._send(f"echo:{result}")
return False
def _gcode_M118(self, data: str) -> bool:
match = re.search(r"M118 (?:(?P<parameter>A1|E1|Pn[012])\s)?(?P<text>.*)", data)
if not match:
self._send("Unrecognized command parameters for M118")
else:
result = match.groupdict()
text = result["text"]
parameter = result["parameter"]
if parameter == "A1":
self._send(f"//{text}")
elif parameter == "E1":
self._send(f"echo:{text}")
else:
self._send(text)
return True
# noinspection PyUnusedLocal
def _gcode_M400(self, data: str) -> bool:
return True
@staticmethod
def _check_param_letters(letters, data):
# Checks if any of the params (letters) are included in data
# Purely for saving typing :)
for param in list(letters):
if param in data:
return True
##~~ further helpers
# noinspection PyMethodMayBeStatic
def _calculate_checksum(self, line: bytes) -> int:
checksum = 0
for c in bytearray(line):
checksum ^= c
return checksum
def _kill(self):
self._killed = True
if self.bambu.connected:
self.bambu.disconnect()
self._send("echo:EMERGENCY SHUTDOWN DETECTED. KILLED.")
def _triggerResend(
self, expected: int = None, actual: int = None, checksum: int = None
) -> None:
with self._incoming_lock:
if expected is None:
expected = self.lastN + 1
else:
self.lastN = expected - 1
if actual is None:
if checksum:
self._send(self._error("checksum_mismatch"))
else:
self._send(self._error("checksum_missing"))
else:
self._send(self._error("lineno_mismatch", expected, actual))
def request_resend():
self._send("Resend:%d" % expected)
# if not self._brokenResend:
self._sendOk()
request_resend()
def _listSd(self, incl_long=False, incl_timestamp=False):
line = "{name} {size} \"{name}\""
self._send("Begin file list")
for item in map(lambda x: line.format(**x), self._getSdFiles()):
self._send(item)
self._send("End file list")
def _mappedSdList(self) -> Dict[str, Dict[str, Any]]:
result = {}
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
filelist = ftp.list_files("", ".3mf")
for entry in filelist:
if entry.startswith("/"):
filename = entry[1:]
else:
filename = entry
filesize = ftp.ftps_session.size(entry)
dosname = get_dos_filename(filename, existing_filenames=list(result.keys())).lower()
data = {
"dosname": dosname,
"name": filename,
"path": filename,
"size": filesize,
}
result[filename.lower()] = data
# result[dosname.lower()] = filename.lower()
return result
def _getSdFileData(self, filename: str) -> Optional[Dict[str, Any]]:
files = self._mappedSdList()
data = files.get(filename.lower())
if isinstance(data, str):
data = files.get(data.lower())
return data
def _getSdFiles(self) -> List[Dict[str, Any]]:
files = self._mappedSdList()
return [x for x in files.values() if isinstance(x, dict)]
def _selectSdFile(self, filename: str, check_already_open: bool = False) -> None:
if filename.startswith("/"):
filename = filename[1:]
file = self._getSdFileData(filename)
if file is None:
self._send(f"{filename} open failed")
return
if self._selectedSdFile == file["path"] and check_already_open:
return
self._selectedSdFile = file["path"]
self._selectedSdFileSize = file["size"]
self._send(f"File opened: {file['name']} Size: {self._selectedSdFileSize}")
self._send("File selected")
def _startSdPrint(self, from_printer: bool = False) -> None:
if self._selectedSdFile is not None:
if self._sdPrinter is None:
self._sdPrinting = True
self._sdPrinter = threading.Thread(target=self._sdPrintingWorker, kwargs={"from_printer": from_printer})
self._sdPrinter.start()
# self._sdPrintingSemaphore.set()
if self._sdPrinter is not None:
if self.bambu.connected:
if self.bambu.publish(commands.RESUME):
self._logger.info("print resumed")
# if not self._sdPrintingSemaphore.is_set():
# self._sdPrintingSemaphore.set()
else:
self._logger.info("print resume failed")
def _pauseSdPrint(self):
if self.bambu.connected:
if self.bambu.publish(commands.PAUSE):
self._logger.info("print paused")
else:
self._logger.info("print pause failed")
def _setSdPos(self, pos):
self._newSdFilePos = pos
def _reportSdStatus(self):
if self._sdPrinter is not None and (self._sdPrintingSemaphore.is_set() or self._sdPrintingPausedSemaphore.is_set()):
self._send(f"SD printing byte {self._selectedSdFilePos}/{self._selectedSdFileSize}")
else:
self._send("Not SD printing")
def _generateTemperatureOutput(self) -> str:
template = "{heater}:{actual:.2f}/ {target:.2f}"
temps = collections.OrderedDict()
heater = "T"
temps[heater] = (self.temp[0], self.targetTemp[0])
temps["B"] = (self.bedTemp, self.bedTargetTemp)
temps["C"] = (self.chamberTemp, self.chamberTargetTemp)
output = " ".join(
map(
lambda x: template.format(heater=x[0], actual=x[1][0], target=x[1][1]),
temps.items(),
)
)
output += " @:64\n"
return output
def _processTemperatureQuery(self):
# includeOk = not self._okBeforeCommandOutput
output = self._generateTemperatureOutput()
self._send(output)
def _writeSdFile(self, filename: str) -> None:
self._send(f"Writing to file: {filename}")
def _finishSdFile(self):
try:
self._writingToSdHandle.close()
except Exception:
pass
finally:
self._writingToSdHandle = None
self._writingToSd = False
self._selectedSdFile = None
# Most printers don't have RTC and set some ancient date
# by default. Emulate that using 2000-01-01 01:00:00
# (taken from prusa firmware behaviour)
st = os.stat(self._writingToSdFile)
os.utime(self._writingToSdFile, (st.st_atime, 946684800))
self._writingToSdFile = None
self._send("Done saving file")
def _sdPrintingWorker(self, from_printer: bool = False):
self._selectedSdFilePos = 0
try:
if not from_printer and self.bambu.connected:
print_command = {"print": {"sequence_id": 0,
"command": "project_file",
"param": "Metadata/plate_1.gcode",
"subtask_name": f"{self._selectedSdFile}",
"url": f"file:///mnt/sdcard/{self._selectedSdFile}",
"timelapse": self._settings.get_boolean(["timelapse"]),
"bed_leveling": self._settings.get_boolean(["bed_leveling"]),
"flow_cali": self._settings.get_boolean(["flow_cali"]),
"vibration_cali": self._settings.get_boolean(["vibration_cali"]),
"layer_inspect": self._settings.get_boolean(["layer_inspect"]),
"use_ams": self._settings.get_boolean(["use_ams"])
}
}
self.bambu.publish(print_command)
while self._selectedSdFilePos < self._selectedSdFileSize:
if self._killed or not self._sdPrinting:
break
# if we are paused, wait for resuming
self._sdPrintingSemaphore.wait()
self._reportSdStatus()
time.sleep(3)
self._logger.debug(f"SD File Print: {self._selectedSdFile}")
except AttributeError:
if self.outgoing is not None:
raise
self._finishSdPrint()
def _finishSdPrint(self):
if not self._killed:
self._sdPrintingSemaphore.clear()
self._sdPrintingPausedSemaphore.clear()
self._send("Done printing file")
self._selectedSdFilePos = 0
self._selectedSdFileSize = 0
self._sdPrinting = False
self._sdPrinter = None
def _deleteSdFile(self, filename: str) -> None:
host = self._settings.get(["host"])
access_code = self._settings.get(["access_code"])
if filename.startswith("/"):
filename = filename[1:]
file = self._getSdFileData(filename)
if file is not None:
ftp = IoTFTPSClient(f"{host}", 990, "bblp", f"{access_code}", ssl_implicit=True)
try:
if ftp.delete_file(filename):
self._logger.debug(f"{filename} deleted")
else:
raise Exception("delete failed")
except Exception as e:
self._logger.debug(f"Error deleting file {filename}")
def _setBusy(self, reason="processing"):
if not self._sendBusy:
return
def loop():
while self._busy:
self._send(f"echo:busy {self._busy}")
time.sleep(self._busyInterval)
self._sendOk()
self._busy = reason
self._busy_loop = threading.Thread(target=loop)
self._busy_loop.daemon = True
self._busy_loop.start()
def _setUnbusy(self):
self._busy = None
# def _processBuffer(self):
# while self.buffered is not None:
# try:
# line = self.buffered.get(timeout=0.5)
# except queue.Empty:
# continue
#
# if line is None:
# continue
#
# self.buffered.task_done()
#
# self._logger.debug("Closing down buffer loop")
def _showPrompt(self, text, choices):
self._hidePrompt()
self._send(f"//action:prompt_begin {text}")
for choice in choices:
self._send(f"//action:prompt_button {choice}")
self._send("//action:prompt_show")
def _hidePrompt(self):
self._send("//action:prompt_end")
def write(self, data: bytes) -> int:
data = to_bytes(data, errors="replace")
u_data = to_unicode(data, errors="replace")
with self._incoming_lock:
if self.incoming is None or self.outgoing is None:
return 0
if b"M112" in data:
self._seriallog.debug(f"<<< {u_data}")
self._kill()
return len(data)
try:
written = self.incoming.put(data, timeout=self._write_timeout, partial=True)
self._seriallog.debug(f"<<< {u_data}")
return written
except queue.Full:
self._logger.info(
"Incoming queue is full, raising SerialTimeoutException"
)
raise SerialTimeoutException()
def readline(self) -> bytes:
timeout = self._read_timeout
try:
# fetch a line from the queue, wait no longer than timeout
line = to_unicode(self.outgoing.get(timeout=timeout), errors="replace")
self._seriallog.debug(f">>> {line.strip()}")
self.outgoing.task_done()
return to_bytes(line)
except queue.Empty:
# queue empty? return empty line
return b""
def close(self):
if self.bambu.connected:
self.bambu.disconnect()
self._killed = True
self.incoming = None
self.outgoing = None
self.buffered = None
def _sendOk(self):
if self.outgoing is None:
return
ok = self._ok()
if ok:
self._send(ok)
def _isPaused(self):
return self._sdPrintingPausedSemaphore.is_set()
def _sendPaused(self):
paused_timer = RepeatedTimer(interval=3.0, function=self._send, args=[f"SD printing byte {self._selectedSdFilePos}/{self._selectedSdFileSize}"],
daemon=True, run_first=True, condition=self._isPaused)
paused_timer.start()
def _send(self, line: str) -> None:
if self.outgoing is not None:
self.outgoing.put(line)
def _ok(self):
return "ok"
def _error(self, error: str, *args, **kwargs) -> str:
return f"Error: {self._errors.get(error).format(*args, **kwargs)}"
# noinspection PyUnresolvedReferences
class CharCountingQueue(queue.Queue):
def __init__(self, maxsize, name=None):
queue.Queue.__init__(self, maxsize=maxsize)
self._size = 0
self._name = name
def clear(self):
with self.mutex:
self.queue.clear()
def put(self, item, block=True, timeout=None, partial=False) -> int:
self.not_full.acquire()
try:
if not self._will_it_fit(item) and partial:
space_left = self.maxsize - self._qsize()
if space_left:
item = item[:space_left]
if not block:
if not self._will_it_fit(item):
raise queue.Full
elif timeout is None:
while not self._will_it_fit(item):
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = time.monotonic() + timeout
while not self._will_it_fit(item):
remaining = endtime - time.monotonic()
if remaining <= 0:
raise queue.Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
return self._len(item)
finally:
self.not_full.release()
# noinspection PyMethodMayBeStatic
def _len(self, item):
return len(item)
def _qsize(self, l=len): # noqa: E741
return self._size
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
self._size += self._len(item)
# Get an item from the queue
def _get(self):
item = self.queue.popleft()
self._size -= self._len(item)
return item
def _will_it_fit(self, item):
return self.maxsize - self._qsize() >= self._len(item)

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
###
# This file is only here to make sure that something like
#
# pip install -e .
#
# works as expected. Requirements can be found in setup.py.
###
.

BIN
screenshot.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

2
setup.cfg Normal file
View File

@ -0,0 +1,2 @@
[bdist_wheel]
universal = 1

102
setup.py Normal file
View File

@ -0,0 +1,102 @@
# coding=utf-8
########################################################################################################################
### Do not forget to adjust the following variables to your own plugin.
# The plugin's identifier, has to be unique
plugin_identifier = "bambu_printer"
# The plugin's python package, should be "octoprint_<plugin identifier>", has to be unique
plugin_package = "octoprint_bambu_printer"
# The plugin's human readable name. Can be overwritten within OctoPrint's internal data via __plugin_name__ in the
# plugin module
plugin_name = "OctoPrint-BambuPrinter"
# The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module
plugin_version = "0.0.1"
# The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin
# module
plugin_description = """Connects OctoPrint to BambuLabs printers."""
# The plugin's author. Can be overwritten within OctoPrint's internal data via __plugin_author__ in the plugin module
plugin_author = "jneilliii"
# The plugin's author's mail address.
plugin_author_email = "jneilliii+github@gmail.com"
# The plugin's homepage URL. Can be overwritten within OctoPrint's internal data via __plugin_url__ in the plugin module
plugin_url = "https://github.com/jneilliii/OctoPrint-BambuPrinter"
# The plugin's license. Can be overwritten within OctoPrint's internal data via __plugin_license__ in the plugin module
plugin_license = "AGPLv3"
# Any additional requirements besides OctoPrint should be listed here
plugin_requires = ["paho-mqtt", "pybambu>=1.0.0"]
### --------------------------------------------------------------------------------------------------------------------
### More advanced options that you usually shouldn't have to touch follow after this point
### --------------------------------------------------------------------------------------------------------------------
# Additional package data to install for this plugin. The subfolders "templates", "static" and "translations" will
# already be installed automatically if they exist. Note that if you add something here you'll also need to update
# MANIFEST.in to match to ensure that python setup.py sdist produces a source distribution that contains all your
# files. This is sadly due to how python's setup.py works, see also http://stackoverflow.com/a/14159430/2028598
plugin_additional_data = []
# Any additional python packages you need to install with your plugin that are not contained in <plugin_package>.*
plugin_additional_packages = []
# Any python packages within <plugin_package>.* you do NOT want to install with your plugin
plugin_ignored_packages = []
# Additional parameters for the call to setuptools.setup. If your plugin wants to register additional entry points,
# define dependency links or other things like that, this is the place to go. Will be merged recursively with the
# default setup parameters as provided by octoprint_setuptools.create_plugin_setup_parameters using
# octoprint.util.dict_merge.
#
# Example:
# plugin_requires = ["someDependency==dev"]
# additional_setup_parameters = {"dependency_links": ["https://github.com/someUser/someRepo/archive/master.zip#egg=someDependency-dev"]}
# "python_requires": ">=3,<4" blocks installation on Python 2 systems, to prevent confused users and provide a helpful error.
# Remove it if you would like to support Python 2 as well as 3 (not recommended).
additional_setup_parameters = {"python_requires": ">=3,<4"}
########################################################################################################################
from setuptools import setup
try:
import octoprint_setuptools
except:
print(
"Could not import OctoPrint's setuptools, are you sure you are running that under "
"the same python installation that OctoPrint is installed under?"
)
import sys
sys.exit(-1)
setup_parameters = octoprint_setuptools.create_plugin_setup_parameters(
identifier=plugin_identifier,
package=plugin_package,
name=plugin_name,
version=plugin_version,
description=plugin_description,
author=plugin_author,
mail=plugin_author_email,
url=plugin_url,
license=plugin_license,
requires=plugin_requires,
additional_packages=plugin_additional_packages,
ignored_packages=plugin_ignored_packages,
additional_data=plugin_additional_data,
)
if len(additional_setup_parameters):
from octoprint.util import dict_merge
setup_parameters = dict_merge(setup_parameters, additional_setup_parameters)
setup(**setup_parameters)

28
translations/README.txt Normal file
View File

@ -0,0 +1,28 @@
Your plugin's translations will reside here. The provided setup.py supports a
couple of additional commands to make managing your translations easier:
babel_extract
Extracts any translateable messages (marked with Jinja's `_("...")` or
JavaScript's `gettext("...")`) and creates the initial `messages.pot` file.
babel_refresh
Reruns extraction and updates the `messages.pot` file.
babel_new --locale=<locale>
Creates a new translation folder for locale `<locale>`.
babel_compile
Compiles the translations into `mo` files, ready to be used within
OctoPrint.
babel_pack --locale=<locale> [ --author=<author> ]
Packs the translation for locale `<locale>` up as an installable
language pack that can be manually installed by your plugin's users. This is
interesting for languages you can not guarantee to keep up to date yourself
with each new release of your plugin and have to depend on contributors for.
If you want to bundle translations with your plugin, create a new folder
`octoprint_bambu_printer/translations`. When that folder exists,
an additional command becomes available:
babel_bundle --locale=<locale>
Moves the translation for locale `<locale>` to octoprint_bambu_printer/translations,
effectively bundling it with your plugin. This is interesting for languages
you can guarantee to keep up to date yourself with each new release of your
plugin.