0.1.8rc1 (#56)
* bring latest pybambu module in from home assistant integration * add onBeforePrintStart callback to prevent starting "local" files and display print options dialog.* add onBeforePrintStart callback to prevent starting "local" files and display print options dialog (with AMS mapping support) * add AMS display in sidebar
This commit is contained in:
4
octoprint_bambu_printer/printer/pybambu/__init__.py
Normal file
4
octoprint_bambu_printer/printer/pybambu/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
"""Initialise the Bambu Client"""
|
||||
# TODO: Once complete, move pybambu to PyPi
|
||||
from .bambu_client import BambuClient
|
||||
from .bambu_cloud import BambuCloud
|
552
octoprint_bambu_printer/printer/pybambu/bambu_client.py
Normal file
552
octoprint_bambu_printer/printer/pybambu/bambu_client.py
Normal file
@ -0,0 +1,552 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import json
|
||||
import math
|
||||
import re
|
||||
import socket
|
||||
import ssl
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
from .bambu_cloud import BambuCloud
|
||||
from .const import (
|
||||
LOGGER,
|
||||
Features,
|
||||
)
|
||||
from .models import Device, SlicerSettings
|
||||
from .commands import (
|
||||
GET_VERSION,
|
||||
PUSH_ALL,
|
||||
START_PUSH,
|
||||
)
|
||||
|
||||
|
||||
class WatchdogThread(threading.Thread):
|
||||
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self._watchdog_fired = False
|
||||
self._stop_event = threading.Event()
|
||||
self._last_received_data = time.time()
|
||||
super().__init__()
|
||||
self.setName(f"{self._client._device.info.device_type}-Watchdog-{threading.get_native_id()}")
|
||||
|
||||
def stop(self):
|
||||
self._stop_event.set()
|
||||
|
||||
def received_data(self):
|
||||
self._last_received_data = time.time()
|
||||
|
||||
def run(self):
|
||||
LOGGER.info("Watchdog thread started.")
|
||||
WATCHDOG_TIMER = 30
|
||||
while True:
|
||||
# Wait out the remainder of the watchdog delay or 1s, whichever is higher.
|
||||
interval = time.time() - self._last_received_data
|
||||
wait_time = max(1, WATCHDOG_TIMER - interval)
|
||||
if self._stop_event.wait(wait_time):
|
||||
# Stop event has been set. Exit thread.
|
||||
break
|
||||
interval = time.time() - self._last_received_data
|
||||
if not self._watchdog_fired and (interval > WATCHDOG_TIMER):
|
||||
LOGGER.debug(f"Watchdog fired. No data received for {math.floor(interval)} seconds for {self._client._serial}.")
|
||||
self._watchdog_fired = True
|
||||
self._client._on_watchdog_fired()
|
||||
elif interval < WATCHDOG_TIMER:
|
||||
self._watchdog_fired = False
|
||||
|
||||
LOGGER.info("Watchdog thread exited.")
|
||||
|
||||
|
||||
class ChamberImageThread(threading.Thread):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self._stop_event = threading.Event()
|
||||
super().__init__()
|
||||
self.setName(f"{self._client._device.info.device_type}-Chamber-{threading.get_native_id()}")
|
||||
|
||||
def stop(self):
|
||||
self._stop_event.set()
|
||||
|
||||
def run(self):
|
||||
LOGGER.debug("Chamber image thread started.")
|
||||
|
||||
auth_data = bytearray()
|
||||
|
||||
username = 'bblp'
|
||||
access_code = self._client._access_code
|
||||
hostname = self._client.host
|
||||
port = 6000
|
||||
MAX_CONNECT_ATTEMPTS = 12
|
||||
connect_attempts = 0
|
||||
|
||||
auth_data += struct.pack("<I", 0x40) # '@'\0\0\0
|
||||
auth_data += struct.pack("<I", 0x3000) # \0'0'\0\0
|
||||
auth_data += struct.pack("<I", 0) # \0\0\0\0
|
||||
auth_data += struct.pack("<I", 0) # \0\0\0\0
|
||||
for i in range(0, len(username)):
|
||||
auth_data += struct.pack("<c", username[i].encode('ascii'))
|
||||
for i in range(0, 32 - len(username)):
|
||||
auth_data += struct.pack("<x")
|
||||
for i in range(0, len(access_code)):
|
||||
auth_data += struct.pack("<c", access_code[i].encode('ascii'))
|
||||
for i in range(0, 32 - len(access_code)):
|
||||
auth_data += struct.pack("<x")
|
||||
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
jpeg_start = bytearray([0xff, 0xd8, 0xff, 0xe0])
|
||||
jpeg_end = bytearray([0xff, 0xd9])
|
||||
|
||||
read_chunk_size = 4096 # 4096 is the max we'll get even if we increase this.
|
||||
|
||||
# Payload format for each image is:
|
||||
# 16 byte header:
|
||||
# Bytes 0:3 = little endian payload size for the jpeg image (does not include this header).
|
||||
# Bytes 4:7 = 0x00000000
|
||||
# Bytes 8:11 = 0x00000001
|
||||
# Bytes 12:15 = 0x00000000
|
||||
# These first 16 bytes are always delivered by themselves.
|
||||
#
|
||||
# Bytes 16:19 = jpeg_start magic bytes
|
||||
# Bytes 20:payload_size-2 = jpeg image bytes
|
||||
# Bytes payload_size-2:payload_size = jpeg_end magic bytes
|
||||
#
|
||||
# Further attempts to receive data will get SSLWantReadError until a new image is ready (1-2 seconds later)
|
||||
while connect_attempts < MAX_CONNECT_ATTEMPTS and not self._stop_event.is_set():
|
||||
connect_attempts += 1
|
||||
try:
|
||||
with socket.create_connection((hostname, port)) as sock:
|
||||
try:
|
||||
sslSock = ctx.wrap_socket(sock, server_hostname=hostname)
|
||||
sslSock.write(auth_data)
|
||||
img = None
|
||||
payload_size = 0
|
||||
|
||||
status = sslSock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
|
||||
LOGGER.debug(f"SOCKET STATUS: {status}")
|
||||
if status != 0:
|
||||
LOGGER.error(f"Socket error: {status}")
|
||||
except socket.error as e:
|
||||
LOGGER.error(f"Socket error: {e}")
|
||||
# Sleep to allow printer to stabilize during boot when it may fail these connection attempts repeatedly.
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
sslSock.setblocking(False)
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
dr = sslSock.recv(read_chunk_size)
|
||||
#LOGGER.debug(f"Received {len(dr)} bytes.")
|
||||
|
||||
except ssl.SSLWantReadError:
|
||||
#LOGGER.debug("SSLWantReadError")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
LOGGER.error("A Chamber Image thread inner exception occurred:")
|
||||
LOGGER.error(f"Exception. Type: {type(e)} Args: {e}")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
if img is not None and len(dr) > 0:
|
||||
img += dr
|
||||
if len(img) > payload_size:
|
||||
# We got more data than we expected.
|
||||
LOGGER.error(f"Unexpected image payload received: {len(img)} > {payload_size}")
|
||||
# Reset buffer
|
||||
img = None
|
||||
elif len(img) == payload_size:
|
||||
# We should have the full image now.
|
||||
if img[:4] != jpeg_start:
|
||||
LOGGER.error("JPEG start magic bytes missing.")
|
||||
elif img[-2:] != jpeg_end:
|
||||
LOGGER.error("JPEG end magic bytes missing.")
|
||||
else:
|
||||
# Content is as expected. Send it.
|
||||
self._client.on_jpeg_received(img)
|
||||
|
||||
# Reset buffer
|
||||
img = None
|
||||
# else:
|
||||
# Otherwise we need to continue looping without reseting the buffer to receive the remaining data
|
||||
# and without delaying.
|
||||
|
||||
elif len(dr) == 16:
|
||||
# We got the header bytes. Get the expected payload size from it and create the image buffer bytearray.
|
||||
# Reset connect_attempts now we know the connect was successful.
|
||||
connect_attempts = 0
|
||||
img = bytearray()
|
||||
payload_size = int.from_bytes(dr[0:3], byteorder='little')
|
||||
|
||||
elif len(dr) == 0:
|
||||
# This occurs if the wrong access code was provided.
|
||||
LOGGER.error("Chamber image connection rejected by the printer. Check provided access code and IP address.")
|
||||
# Sleep for a short while and then re-attempt the connection.
|
||||
time.sleep(5)
|
||||
break
|
||||
|
||||
else:
|
||||
LOGGER.error(f"UNEXPECTED DATA RECEIVED: {len(dr)}")
|
||||
time.sleep(1)
|
||||
|
||||
except OSError as e:
|
||||
if e.errno == 113:
|
||||
LOGGER.debug("Host is unreachable")
|
||||
else:
|
||||
LOGGER.error("A Chamber Image thread outer exception occurred:")
|
||||
LOGGER.error(f"Exception. Type: {type(e)} Args: {e}")
|
||||
if not self._stop_event.is_set():
|
||||
time.sleep(1) # Avoid a tight loop if this is a persistent error.
|
||||
|
||||
except Exception as e:
|
||||
LOGGER.error(f"A Chamber Image thread outer exception occurred:")
|
||||
LOGGER.error(f"Exception. Type: {type(e)} Args: {e}")
|
||||
if not self._stop_event.is_set():
|
||||
time.sleep(1) # Avoid a tight loop if this is a persistent error.
|
||||
|
||||
LOGGER.debug("Chamber image thread exited.")
|
||||
|
||||
|
||||
class MqttThread(threading.Thread):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self._stop_event = threading.Event()
|
||||
super().__init__()
|
||||
self.setName(f"{self._client._device.info.device_type}-Mqtt-{threading.get_native_id()}")
|
||||
|
||||
def stop(self):
|
||||
self._stop_event.set()
|
||||
|
||||
def run(self):
|
||||
LOGGER.info("MQTT listener thread started.")
|
||||
exceptionSeen = ""
|
||||
while True:
|
||||
try:
|
||||
host = self._client.host if self._client._local_mqtt else self._client.bambu_cloud.cloud_mqtt_host
|
||||
LOGGER.debug(f"Connect: Attempting Connection to {host}")
|
||||
self._client.client.connect(host, self._client._port, keepalive=5)
|
||||
|
||||
LOGGER.debug("Starting listen loop")
|
||||
self._client.client.loop_forever()
|
||||
LOGGER.debug("Ended listen loop.")
|
||||
break
|
||||
except TimeoutError as e:
|
||||
if exceptionSeen != "TimeoutError":
|
||||
LOGGER.debug(f"TimeoutError: {e}.")
|
||||
exceptionSeen = "TimeoutError"
|
||||
time.sleep(5)
|
||||
except ConnectionError as e:
|
||||
if exceptionSeen != "ConnectionError":
|
||||
LOGGER.debug(f"ConnectionError: {e}.")
|
||||
exceptionSeen = "ConnectionError"
|
||||
time.sleep(5)
|
||||
except OSError as e:
|
||||
if e.errno == 113:
|
||||
if exceptionSeen != "OSError113":
|
||||
LOGGER.debug(f"OSError: {e}.")
|
||||
exceptionSeen = "OSError113"
|
||||
time.sleep(5)
|
||||
else:
|
||||
LOGGER.error("A listener loop thread exception occurred:")
|
||||
LOGGER.error(f"Exception. Type: {type(e)} Args: {e}")
|
||||
time.sleep(1) # Avoid a tight loop if this is a persistent error.
|
||||
except Exception as e:
|
||||
LOGGER.error("A listener loop thread exception occurred:")
|
||||
LOGGER.error(f"Exception. Type: {type(e)} Args: {e}")
|
||||
time.sleep(1) # Avoid a tight loop if this is a persistent error.
|
||||
|
||||
if self._client.client is None:
|
||||
break
|
||||
|
||||
self._client.client.disconnect()
|
||||
|
||||
LOGGER.info("MQTT listener thread exited.")
|
||||
|
||||
|
||||
@dataclass
|
||||
class BambuClient:
|
||||
"""Initialize Bambu Client to connect to MQTT Broker"""
|
||||
_watchdog = None
|
||||
_camera = None
|
||||
_usage_hours: float
|
||||
|
||||
def __init__(self, device_type: str, serial: str, host: str, local_mqtt: bool, region: str, email: str,
|
||||
username: str, auth_token: str, access_code: str, usage_hours: float = 0, manual_refresh_mode: bool = False):
|
||||
self.callback = None
|
||||
self.host = host
|
||||
self._local_mqtt = local_mqtt
|
||||
self._serial = serial
|
||||
self._auth_token = auth_token
|
||||
self._access_code = access_code
|
||||
self._username = username
|
||||
self._connected = False
|
||||
self._device_type = device_type
|
||||
self._usage_hours = usage_hours
|
||||
self._port = 1883
|
||||
self._refreshed = False
|
||||
self._manual_refresh_mode = manual_refresh_mode
|
||||
self._device = Device(self)
|
||||
self.bambu_cloud = BambuCloud(region, email, username, auth_token)
|
||||
self.slicer_settings = SlicerSettings(self)
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
"""Return if connected to server"""
|
||||
return self._connected
|
||||
|
||||
@property
|
||||
def manual_refresh_mode(self):
|
||||
"""Return if the integration is running in poll mode"""
|
||||
return self._manual_refresh_mode
|
||||
|
||||
async def set_manual_refresh_mode(self, on):
|
||||
self._manual_refresh_mode = on
|
||||
if self._manual_refresh_mode:
|
||||
# Disconnect from the server. User must manually hit the refresh button to connect to refresh and then it will immediately disconnect.
|
||||
self.disconnect()
|
||||
else:
|
||||
# Reconnect normally
|
||||
self.connect(self.callback)
|
||||
|
||||
def connect(self, callback):
|
||||
"""Connect to the MQTT Broker"""
|
||||
self.client = mqtt.Client()
|
||||
self.callback = callback
|
||||
self.client.on_connect = self.on_connect
|
||||
self.client.on_disconnect = self.on_disconnect
|
||||
self.client.on_message = self.on_message
|
||||
# Set aggressive reconnect polling.
|
||||
self.client.reconnect_delay_set(min_delay=1, max_delay=1)
|
||||
|
||||
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
|
||||
self.client.tls_insecure_set(True)
|
||||
self._port = 8883
|
||||
if self._local_mqtt:
|
||||
self.client.username_pw_set("bblp", password=self._access_code)
|
||||
else:
|
||||
self.client.username_pw_set(self._username, password=self._auth_token)
|
||||
|
||||
LOGGER.debug("Starting MQTT listener thread")
|
||||
self._mqtt = MqttThread(self)
|
||||
self._mqtt.start()
|
||||
|
||||
def subscribe_and_request_info(self):
|
||||
LOGGER.debug("Loading slicer settings...")
|
||||
self.slicer_settings.update()
|
||||
LOGGER.debug("Now subscribing...")
|
||||
self.subscribe()
|
||||
LOGGER.debug("On Connect: Getting version info")
|
||||
self.publish(GET_VERSION)
|
||||
LOGGER.debug("On Connect: Request push all")
|
||||
self.publish(PUSH_ALL)
|
||||
|
||||
def on_connect(self,
|
||||
client_: mqtt.Client,
|
||||
userdata: None,
|
||||
flags: dict[str, Any],
|
||||
result_code: int,
|
||||
properties: mqtt.Properties | None = None, ):
|
||||
"""Handle connection"""
|
||||
LOGGER.info("On Connect: Connected to printer")
|
||||
self._on_connect()
|
||||
|
||||
def _on_connect(self):
|
||||
self._connected = True
|
||||
self.subscribe_and_request_info()
|
||||
|
||||
LOGGER.debug("Starting watchdog thread")
|
||||
self._watchdog = WatchdogThread(self)
|
||||
self._watchdog.start()
|
||||
|
||||
if self._device.supports_feature(Features.CAMERA_IMAGE):
|
||||
LOGGER.debug("Starting Chamber Image thread")
|
||||
self._camera = ChamberImageThread(self)
|
||||
self._camera.start()
|
||||
|
||||
def try_on_connect(self,
|
||||
client_: mqtt.Client,
|
||||
userdata: None,
|
||||
flags: dict[str, Any],
|
||||
result_code: int,
|
||||
properties: mqtt.Properties | None = None, ):
|
||||
"""Handle connection"""
|
||||
LOGGER.info("On Connect: Connected to printer")
|
||||
self._connected = True
|
||||
LOGGER.debug("Now test subscribing...")
|
||||
self.subscribe()
|
||||
# For the initial configuration connection attempt, we just need version info.
|
||||
LOGGER.debug("On Connect: Getting version info")
|
||||
self.publish(GET_VERSION)
|
||||
|
||||
def on_disconnect(self,
|
||||
client_: mqtt.Client,
|
||||
userdata: None,
|
||||
result_code: int):
|
||||
"""Called when MQTT Disconnects"""
|
||||
LOGGER.warn(f"On Disconnect: Printer disconnected with error code: {result_code}")
|
||||
self._on_disconnect()
|
||||
|
||||
def _on_disconnect(self):
|
||||
LOGGER.debug("_on_disconnect: Lost connection to the printer")
|
||||
self._connected = False
|
||||
self._device.info.set_online(False)
|
||||
if self._watchdog is not None:
|
||||
LOGGER.debug("Stopping watchdog thread")
|
||||
self._watchdog.stop()
|
||||
self._watchdog.join()
|
||||
if self._camera is not None:
|
||||
LOGGER.debug("Stopping camera thread")
|
||||
self._camera.stop()
|
||||
self._camera.join()
|
||||
|
||||
def _on_watchdog_fired(self):
|
||||
LOGGER.info("Watch dog fired")
|
||||
self._device.info.set_online(False)
|
||||
self.publish(START_PUSH)
|
||||
|
||||
def on_jpeg_received(self, bytes):
|
||||
self._device.chamber_image.set_jpeg(bytes)
|
||||
|
||||
def on_message(self, client, userdata, message):
|
||||
"""Return the payload when received"""
|
||||
try:
|
||||
# X1 mqtt payload is inconsistent. Adjust it for consistent logging.
|
||||
clean_msg = re.sub(r"\\n *", "", str(message.payload))
|
||||
if self._refreshed:
|
||||
LOGGER.debug(f"Received data: {clean_msg}")
|
||||
|
||||
json_data = json.loads(message.payload)
|
||||
if json_data.get("event"):
|
||||
# These are events from the bambu cloud mqtt feed and allow us to detect when a local
|
||||
# device has connected/disconnected (e.g. turned on/off)
|
||||
if json_data.get("event").get("event") == "client.connected":
|
||||
LOGGER.debug("Client connected event received.")
|
||||
self._on_disconnect() # We aren't guaranteed to recieve a client.disconnected event.
|
||||
self._on_connect()
|
||||
elif json_data.get("event").get("event") == "client.disconnected":
|
||||
LOGGER.debug("Client disconnected event received.")
|
||||
self._on_disconnect()
|
||||
else:
|
||||
self._device.info.set_online(True)
|
||||
self._watchdog.received_data()
|
||||
if json_data.get("print"):
|
||||
self._device.print_update(data=json_data.get("print"))
|
||||
# Once we receive data, if in manual refresh mode, we disconnect again.
|
||||
if self._manual_refresh_mode:
|
||||
self.disconnect()
|
||||
if json_data.get("print").get("msg", 0) == 0:
|
||||
self._refreshed= False
|
||||
elif json_data.get("info") and json_data.get("info").get("command") == "get_version":
|
||||
LOGGER.debug("Got Version Data")
|
||||
self._device.info_update(data=json_data.get("info"))
|
||||
except Exception as e:
|
||||
LOGGER.error("An exception occurred processing a message:")
|
||||
LOGGER.error(f"Exception type: {type(e)}")
|
||||
LOGGER.error(f"Exception data: {e}")
|
||||
|
||||
def subscribe(self):
|
||||
"""Subscribe to report topic"""
|
||||
LOGGER.debug(f"Subscribing: device/{self._serial}/report")
|
||||
self.client.subscribe(f"device/{self._serial}/report")
|
||||
|
||||
def publish(self, msg):
|
||||
"""Publish a custom message"""
|
||||
result = self.client.publish(f"device/{self._serial}/request", json.dumps(msg))
|
||||
status = result[0]
|
||||
if status == 0:
|
||||
LOGGER.debug(f"Sent {msg} to topic device/{self._serial}/request")
|
||||
return True
|
||||
|
||||
LOGGER.error(f"Failed to send message to topic device/{self._serial}/request")
|
||||
return False
|
||||
|
||||
async def refresh(self):
|
||||
"""Force refresh data"""
|
||||
|
||||
if self._manual_refresh_mode:
|
||||
self.connect(self.callback)
|
||||
else:
|
||||
LOGGER.debug("Force Refresh: Getting Version Info")
|
||||
self._refreshed = True
|
||||
self.publish(GET_VERSION)
|
||||
LOGGER.debug("Force Refresh: Request Push All")
|
||||
self._refreshed = True
|
||||
self.publish(PUSH_ALL)
|
||||
|
||||
self.slicer_settings.update()
|
||||
|
||||
def get_device(self):
|
||||
"""Return device"""
|
||||
return self._device
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect the Bambu Client from server"""
|
||||
LOGGER.debug(" Disconnect: Client Disconnecting")
|
||||
if self.client is not None:
|
||||
self.client.disconnect()
|
||||
self.client = None
|
||||
|
||||
async def try_connection(self):
|
||||
"""Test if we can connect to an MQTT broker."""
|
||||
LOGGER.debug("Try Connection")
|
||||
|
||||
result: queue.Queue[bool] = queue.Queue(maxsize=1)
|
||||
|
||||
def on_message(client, userdata, message):
|
||||
json_data = json.loads(message.payload)
|
||||
LOGGER.debug(f"Try Connection: Got '{json_data}'")
|
||||
if json_data.get("info") and json_data.get("info").get("command") == "get_version":
|
||||
LOGGER.debug("Got Version Command Data")
|
||||
self._device.info_update(data=json_data.get("info"))
|
||||
result.put(True)
|
||||
|
||||
self.client = mqtt.Client()
|
||||
self.client.on_connect = self.try_on_connect
|
||||
self.client.on_disconnect = self.on_disconnect
|
||||
self.client.on_message = on_message
|
||||
|
||||
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
|
||||
self.client.tls_insecure_set(True)
|
||||
if self._local_mqtt:
|
||||
self.client.username_pw_set("bblp", password=self._access_code)
|
||||
else:
|
||||
self.client.username_pw_set(self._username, password=self._auth_token)
|
||||
self._port = 8883
|
||||
|
||||
LOGGER.debug("Test connection: Connecting to %s", self.host)
|
||||
try:
|
||||
self.client.connect(self.host, self._port)
|
||||
self.client.loop_start()
|
||||
if result.get(timeout=10):
|
||||
return True
|
||||
except OSError as e:
|
||||
return False
|
||||
except queue.Empty:
|
||||
return False
|
||||
finally:
|
||||
self.disconnect()
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Async enter.
|
||||
Returns:
|
||||
The BambuLab object.
|
||||
"""
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_exc_info):
|
||||
"""Async exit.
|
||||
Args:
|
||||
_exc_info: Exec type.
|
||||
"""
|
||||
self.disconnect()
|
293
octoprint_bambu_printer/printer/pybambu/bambu_cloud.py
Normal file
293
octoprint_bambu_printer/printer/pybambu/bambu_cloud.py
Normal file
@ -0,0 +1,293 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import httpx
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .const import LOGGER
|
||||
|
||||
@dataclass
|
||||
class BambuCloud:
|
||||
|
||||
def __init__(self, region: str, email: str, username: str, auth_token: str):
|
||||
self._region = region
|
||||
self._email = email
|
||||
self._username = username
|
||||
self._auth_token = auth_token
|
||||
|
||||
def _get_authentication_token(self) -> dict:
|
||||
LOGGER.debug("Getting accessToken from Bambu Cloud")
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/user-service/user/login'
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/user-service/user/login'
|
||||
headers = {'User-Agent' : "HA Bambulab"}
|
||||
data = {'account': self._email, 'password': self._password}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.post(url, headers=headers, json=data, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
return response.json()['accessToken']
|
||||
|
||||
def _get_username_from_authentication_token(self) -> str:
|
||||
# User name is in 2nd portion of the auth token (delimited with periods)
|
||||
b64_string = self._auth_token.split(".")[1]
|
||||
# String must be multiples of 4 chars in length. For decode pad with = character
|
||||
b64_string += "=" * ((4 - len(b64_string) % 4) % 4)
|
||||
jsonAuthToken = json.loads(base64.b64decode(b64_string))
|
||||
# Gives json payload with "username":"u_<digits>" within it
|
||||
return jsonAuthToken['username']
|
||||
|
||||
# Retrieves json description of devices in the form:
|
||||
# {
|
||||
# 'message': 'success',
|
||||
# 'code': None,
|
||||
# 'error': None,
|
||||
# 'devices': [
|
||||
# {
|
||||
# 'dev_id': 'REDACTED',
|
||||
# 'name': 'Bambu P1S',
|
||||
# 'online': True,
|
||||
# 'print_status': 'SUCCESS',
|
||||
# 'dev_model_name': 'C12',
|
||||
# 'dev_product_name': 'P1S',
|
||||
# 'dev_access_code': 'REDACTED',
|
||||
# 'nozzle_diameter': 0.4
|
||||
# },
|
||||
# {
|
||||
# 'dev_id': 'REDACTED',
|
||||
# 'name': 'Bambu P1P',
|
||||
# 'online': True,
|
||||
# 'print_status': 'RUNNING',
|
||||
# 'dev_model_name': 'C11',
|
||||
# 'dev_product_name': 'P1P',
|
||||
# 'dev_access_code': 'REDACTED',
|
||||
# 'nozzle_diameter': 0.4
|
||||
# },
|
||||
# {
|
||||
# 'dev_id': 'REDACTED',
|
||||
# 'name': 'Bambu X1C',
|
||||
# 'online': True,
|
||||
# 'print_status': 'RUNNING',
|
||||
# 'dev_model_name': 'BL-P001',
|
||||
# 'dev_product_name': 'X1 Carbon',
|
||||
# 'dev_access_code': 'REDACTED',
|
||||
# 'nozzle_diameter': 0.4
|
||||
# }
|
||||
# ]
|
||||
# }
|
||||
|
||||
def test_authentication(self, region: str, email: str, username: str, auth_token: str) -> bool:
|
||||
self._region = region
|
||||
self._email = email
|
||||
self._username = username
|
||||
self._auth_token = auth_token
|
||||
try:
|
||||
self.get_device_list()
|
||||
except:
|
||||
return False
|
||||
return True
|
||||
|
||||
def login(self, region: str, email: str, password: str):
|
||||
self._region = region
|
||||
self._email = email
|
||||
self._password = password
|
||||
|
||||
self._auth_token = self._get_authentication_token()
|
||||
self._username = self._get_username_from_authentication_token()
|
||||
|
||||
def get_device_list(self) -> dict:
|
||||
LOGGER.debug("Getting device list from Bambu Cloud")
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/iot-service/api/user/bind'
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/iot-service/api/user/bind'
|
||||
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, headers=headers, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
return response.json()['devices']
|
||||
|
||||
# The slicer settings are of the following form:
|
||||
#
|
||||
# {
|
||||
# "message": "success",
|
||||
# "code": null,
|
||||
# "error": null,
|
||||
# "print": {
|
||||
# "public": [
|
||||
# {
|
||||
# "setting_id": "GP004",
|
||||
# "version": "01.09.00.15",
|
||||
# "name": "0.20mm Standard @BBL X1C",
|
||||
# "update_time": "2024-07-04 11:27:08",
|
||||
# "nickname": null
|
||||
# },
|
||||
# ...
|
||||
# }
|
||||
# "private": []
|
||||
# },
|
||||
# "printer": {
|
||||
# "public": [
|
||||
# {
|
||||
# "setting_id": "GM001",
|
||||
# "version": "01.09.00.15",
|
||||
# "name": "Bambu Lab X1 Carbon 0.4 nozzle",
|
||||
# "update_time": "2024-07-04 11:25:07",
|
||||
# "nickname": null
|
||||
# },
|
||||
# ...
|
||||
# ],
|
||||
# "private": []
|
||||
# },
|
||||
# "filament": {
|
||||
# "public": [
|
||||
# {
|
||||
# "setting_id": "GFSA01",
|
||||
# "version": "01.09.00.15",
|
||||
# "name": "Bambu PLA Matte @BBL X1C",
|
||||
# "update_time": "2024-07-04 11:29:21",
|
||||
# "nickname": null,
|
||||
# "filament_id": "GFA01"
|
||||
# },
|
||||
# ...
|
||||
# ],
|
||||
# "private": [
|
||||
# {
|
||||
# "setting_id": "PFUS46ea5c221cabe5",
|
||||
# "version": "1.9.0.14",
|
||||
# "name": "Fillamentum PLA Extrafill @Bambu Lab X1 Carbon 0.4 nozzle",
|
||||
# "update_time": "2024-07-10 06:48:17",
|
||||
# "base_id": null,
|
||||
# "filament_id": "Pc628b24",
|
||||
# "filament_type": "PLA",
|
||||
# "filament_is_support": "0",
|
||||
# "nozzle_temperature": [
|
||||
# 190,
|
||||
# 240
|
||||
# ],
|
||||
# "nozzle_hrc": "3",
|
||||
# "filament_vendor": "Fillamentum"
|
||||
# },
|
||||
# ...
|
||||
# ]
|
||||
# },
|
||||
# "settings": {}
|
||||
# }
|
||||
|
||||
def get_slicer_settings(self) -> dict:
|
||||
LOGGER.debug("Getting slicer settings from Bambu Cloud")
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/iot-service/api/slicer/setting?version=undefined'
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/iot-service/api/slicer/setting?version=undefined'
|
||||
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, headers=headers, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.error(f"Slicer settings load failed: {response.status_code}")
|
||||
return None
|
||||
return response.json()
|
||||
|
||||
# The task list is of the following form with a 'hits' array with typical 20 entries.
|
||||
#
|
||||
# "total": 531,
|
||||
# "hits": [
|
||||
# {
|
||||
# "id": 35237965,
|
||||
# "designId": 0,
|
||||
# "designTitle": "",
|
||||
# "instanceId": 0,
|
||||
# "modelId": "REDACTED",
|
||||
# "title": "REDACTED",
|
||||
# "cover": "REDACTED",
|
||||
# "status": 4,
|
||||
# "feedbackStatus": 0,
|
||||
# "startTime": "2023-12-21T19:02:16Z",
|
||||
# "endTime": "2023-12-21T19:02:35Z",
|
||||
# "weight": 34.62,
|
||||
# "length": 1161,
|
||||
# "costTime": 10346,
|
||||
# "profileId": 35276233,
|
||||
# "plateIndex": 1,
|
||||
# "plateName": "",
|
||||
# "deviceId": "REDACTED",
|
||||
# "amsDetailMapping": [
|
||||
# {
|
||||
# "ams": 4,
|
||||
# "sourceColor": "F4D976FF",
|
||||
# "targetColor": "F4D976FF",
|
||||
# "filamentId": "GFL99",
|
||||
# "filamentType": "PLA",
|
||||
# "targetFilamentType": "",
|
||||
# "weight": 34.62
|
||||
# }
|
||||
# ],
|
||||
# "mode": "cloud_file",
|
||||
# "isPublicProfile": false,
|
||||
# "isPrintable": true,
|
||||
# "deviceModel": "P1P",
|
||||
# "deviceName": "Bambu P1P",
|
||||
# "bedType": "textured_plate"
|
||||
# },
|
||||
|
||||
def get_tasklist(self) -> dict:
|
||||
if self._region == "China":
|
||||
url = 'https://api.bambulab.cn/v1/user-service/my/tasks'
|
||||
else:
|
||||
url = 'https://api.bambulab.com/v1/user-service/my/tasks'
|
||||
headers = {'Authorization': 'Bearer ' + self._auth_token, 'User-Agent' : "HA Bambulab"}
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, headers=headers, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
return response.json()
|
||||
|
||||
def get_latest_task_for_printer(self, deviceId: str) -> dict:
|
||||
LOGGER.debug(f"Getting latest task from Bambu Cloud for Printer: {deviceId}")
|
||||
data = self.get_tasklist_for_printer(deviceId)
|
||||
if len(data) != 0:
|
||||
return data[0]
|
||||
LOGGER.debug("No tasks found for printer")
|
||||
return None
|
||||
|
||||
def get_tasklist_for_printer(self, deviceId: str) -> dict:
|
||||
LOGGER.debug(f"Getting task list from Bambu Cloud for Printer: {deviceId}")
|
||||
tasks = []
|
||||
data = self.get_tasklist()
|
||||
for task in data['hits']:
|
||||
if task['deviceId'] == deviceId:
|
||||
tasks.append(task)
|
||||
return tasks
|
||||
|
||||
def get_device_type_from_device_product_name(self, device_product_name: str):
|
||||
if device_product_name == "X1 Carbon":
|
||||
return "X1C"
|
||||
return device_product_name.replace(" ", "")
|
||||
|
||||
def download(self, url: str) -> bytearray:
|
||||
LOGGER.debug(f"Downloading cover image: {url}")
|
||||
with httpx.Client(http2=True) as client:
|
||||
response = client.get(url, timeout=10)
|
||||
if response.status_code >= 400:
|
||||
LOGGER.debug(f"Received error: {response.status_code}")
|
||||
raise ValueError(response.status_code)
|
||||
return response.content
|
||||
|
||||
@property
|
||||
def username(self):
|
||||
return self._username
|
||||
|
||||
@property
|
||||
def auth_token(self):
|
||||
return self._auth_token
|
||||
|
||||
@property
|
||||
def cloud_mqtt_host(self):
|
||||
return "cn.mqtt.bambulab.com" if self._region == "China" else "us.mqtt.bambulab.com"
|
24
octoprint_bambu_printer/printer/pybambu/commands.py
Normal file
24
octoprint_bambu_printer/printer/pybambu/commands.py
Normal file
@ -0,0 +1,24 @@
|
||||
"""MQTT Commands"""
|
||||
CHAMBER_LIGHT_ON = {
|
||||
"system": {"sequence_id": "0", "command": "ledctrl", "led_node": "chamber_light", "led_mode": "on",
|
||||
"led_on_time": 500, "led_off_time": 500, "loop_times": 0, "interval_time": 0}}
|
||||
CHAMBER_LIGHT_OFF = {
|
||||
"system": {"sequence_id": "0", "command": "ledctrl", "led_node": "chamber_light", "led_mode": "off",
|
||||
"led_on_time": 500, "led_off_time": 500, "loop_times": 0, "interval_time": 0}}
|
||||
|
||||
SPEED_PROFILE_TEMPLATE = {"print": {"sequence_id": "0", "command": "print_speed", "param": ""}}
|
||||
|
||||
GET_VERSION = {"info": {"sequence_id": "0", "command": "get_version"}}
|
||||
|
||||
PAUSE = {"print": {"sequence_id": "0", "command": "pause"}}
|
||||
RESUME = {"print": {"sequence_id": "0", "command": "resume"}}
|
||||
STOP = {"print": {"sequence_id": "0", "command": "stop"}}
|
||||
|
||||
PUSH_ALL = {"pushing": {"sequence_id": "0", "command": "pushall"}}
|
||||
|
||||
START_PUSH = { "pushing": {"sequence_id": "0", "command": "start"}}
|
||||
|
||||
SEND_GCODE_TEMPLATE = {"print": {"sequence_id": "0", "command": "gcode_line", "param": ""}} # param = GCODE_EACH_LINE_SEPARATED_BY_\n
|
||||
|
||||
# X1 only currently
|
||||
GET_ACCESSORIES = {"system": {"sequence_id": "0", "command": "get_accessories", "accessory_type": "none"}}
|
1222
octoprint_bambu_printer/printer/pybambu/const.py
Normal file
1222
octoprint_bambu_printer/printer/pybambu/const.py
Normal file
File diff suppressed because it is too large
Load Diff
72
octoprint_bambu_printer/printer/pybambu/filaments.json
Normal file
72
octoprint_bambu_printer/printer/pybambu/filaments.json
Normal file
@ -0,0 +1,72 @@
|
||||
{
|
||||
"GFA00": "Bambu PLA Basic",
|
||||
"GFA01": "Bambu PLA Matte",
|
||||
"GFA02": "Bambu PLA Metal",
|
||||
"GFA05": "Bambu PLA Silk",
|
||||
"GFA07": "Bambu PLA Marble",
|
||||
"GFA08": "Bambu PLA Sparkle",
|
||||
"GFA09": "Bambu PLA Tough",
|
||||
"GFA11": "Bambu PLA Aero",
|
||||
"GFA12": "Bambu PLA Glow",
|
||||
"GFA13": "Bambu PLA Dynamic",
|
||||
"GFA15": "Bambu PLA Galaxy",
|
||||
"GFA50": "Bambu PLA-CF",
|
||||
"GFB00": "Bambu ABS",
|
||||
"GFB01": "Bambu ASA",
|
||||
"GFB02": "Bambu ASA-Aero",
|
||||
"GFB50": "Bambu ABS-GF",
|
||||
"GFB60": "PolyLite ABS",
|
||||
"GFB61": "PolyLite ASA",
|
||||
"GFB98": "Generic ASA",
|
||||
"GFB99": "Generic ABS",
|
||||
"GFC00": "Bambu PC",
|
||||
"GFC99": "Generic PC",
|
||||
"GFG00": "Bambu PETG Basic",
|
||||
"GFG01": "Bambu PETG Translucent",
|
||||
"GFG02": "Bambu PETG HF",
|
||||
"GFG50": "Bambu PETG-CF",
|
||||
"GFG60": "PolyLite PETG",
|
||||
"GFG97": "Generic PCTG",
|
||||
"GFG98": "Generic PETG-CF",
|
||||
"GFG99": "Generic PETG",
|
||||
"GFL00": "PolyLite PLA",
|
||||
"GFL01": "PolyTerra PLA",
|
||||
"GFL03": "eSUN PLA+",
|
||||
"GFL04": "Overture PLA",
|
||||
"GFL05": "Overture Matte PLA",
|
||||
"GFL95": "Generic PLA High Speed",
|
||||
"GFL96": "Generic PLA Silk",
|
||||
"GFL98": "Generic PLA-CF",
|
||||
"GFL99": "Generic PLA",
|
||||
"GFN03": "Bambu PA-CF",
|
||||
"GFN04": "Bambu PAHT-CF",
|
||||
"GFN05": "Bambu PA6-CF",
|
||||
"GFN08": "Bambu PA6-GF",
|
||||
"GFN96": "Generic PPA-GF",
|
||||
"GFN97": "Generic PPA-CF",
|
||||
"GFN98": "Generic PA-CF",
|
||||
"GFN99": "Generic PA",
|
||||
"GFP95": "Generic PP-GF",
|
||||
"GFP96": "Generic PP-CF",
|
||||
"GFP97": "Generic PP",
|
||||
"GFP98": "Generic PE-CF",
|
||||
"GFP99": "Generic PE",
|
||||
"GFR98": "Generic PHA",
|
||||
"GFR99": "Generic EVA",
|
||||
"GFS00": "Bambu Support W",
|
||||
"GFS01": "Bambu Support G",
|
||||
"GFS02": "Bambu Support For PLA",
|
||||
"GFS03": "Bambu Support For PA/PET",
|
||||
"GFS04": "Bambu PVA",
|
||||
"GFS05": "Bambu Support For PLA/PETG",
|
||||
"GFS06": "Bambu Support for ABS",
|
||||
"GFS97": "Generic BVOH",
|
||||
"GFS98": "Generic HIPS",
|
||||
"GFS99": "Generic PVA",
|
||||
"GFT01": "Bambu PET-CF",
|
||||
"GFT97": "Generic PPS",
|
||||
"GFT98": "Generic PPS-CF",
|
||||
"GFU00": "Bambu TPU 95A HF",
|
||||
"GFU01": "Bambu TPU 95A",
|
||||
"GFU99": "Generic TPU"
|
||||
}
|
1424
octoprint_bambu_printer/printer/pybambu/models.py
Normal file
1424
octoprint_bambu_printer/printer/pybambu/models.py
Normal file
File diff suppressed because it is too large
Load Diff
227
octoprint_bambu_printer/printer/pybambu/utils.py
Normal file
227
octoprint_bambu_printer/printer/pybambu/utils.py
Normal file
@ -0,0 +1,227 @@
|
||||
import math
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from .const import (
|
||||
CURRENT_STAGE_IDS,
|
||||
SPEED_PROFILE,
|
||||
FILAMENT_NAMES,
|
||||
HMS_ERRORS,
|
||||
HMS_AMS_ERRORS,
|
||||
PRINT_ERROR_ERRORS,
|
||||
HMS_SEVERITY_LEVELS,
|
||||
HMS_MODULES,
|
||||
LOGGER,
|
||||
FansEnum,
|
||||
)
|
||||
from .commands import SEND_GCODE_TEMPLATE
|
||||
|
||||
|
||||
def search(lst, predicate, default={}):
|
||||
"""Search an array for a string"""
|
||||
for item in lst:
|
||||
if predicate(item):
|
||||
return item
|
||||
return default
|
||||
|
||||
|
||||
def fan_percentage(speed):
|
||||
"""Converts a fan speed to percentage"""
|
||||
if not speed:
|
||||
return 0
|
||||
percentage = (int(speed) / 15) * 100
|
||||
return round(percentage / 10) * 10
|
||||
|
||||
|
||||
def fan_percentage_to_gcode(fan: FansEnum, percentage: int):
|
||||
"""Converts a fan speed percentage to the gcode command to set that"""
|
||||
if fan == FansEnum.PART_COOLING:
|
||||
fanString = "P1"
|
||||
elif fan == FansEnum.AUXILIARY:
|
||||
fanString = "P2"
|
||||
elif fan == FansEnum.CHAMBER:
|
||||
fanString = "P3"
|
||||
|
||||
percentage = round(percentage / 10) * 10
|
||||
speed = math.ceil(255 * percentage / 100)
|
||||
command = SEND_GCODE_TEMPLATE
|
||||
command['print']['param'] = f"M106 {fanString} S{speed}\n"
|
||||
return command
|
||||
|
||||
|
||||
def to_whole(number):
|
||||
if not number:
|
||||
return 0
|
||||
return round(number)
|
||||
|
||||
|
||||
def get_filament_name(idx, custom_filaments: dict):
|
||||
"""Converts a filament idx to a human-readable name"""
|
||||
result = FILAMENT_NAMES.get(idx, "unknown")
|
||||
if result == "unknown" and idx != "":
|
||||
result = custom_filaments.get(idx, "unknown")
|
||||
if result == "unknown" and idx != "":
|
||||
LOGGER.debug(f"UNKNOWN FILAMENT IDX: '{idx}'")
|
||||
return result
|
||||
|
||||
|
||||
def get_speed_name(id):
|
||||
"""Return the human-readable name for a speed id"""
|
||||
return SPEED_PROFILE.get(int(id), "standard")
|
||||
|
||||
|
||||
def get_current_stage(id) -> str:
|
||||
"""Return the human-readable description for a stage action"""
|
||||
return CURRENT_STAGE_IDS.get(int(id), "unknown")
|
||||
|
||||
|
||||
def get_HMS_error_text(hms_code: str):
|
||||
"""Return the human-readable description for an HMS error"""
|
||||
|
||||
ams_code = get_generic_AMS_HMS_error_code(hms_code)
|
||||
ams_error = HMS_AMS_ERRORS.get(ams_code, "")
|
||||
if ams_error != "":
|
||||
# 070X_xYxx_xxxx_xxxx = AMS X (0 based index) Slot Y (0 based index) has the error
|
||||
ams_index = int(hms_code[3:4], 16) + 1
|
||||
ams_slot = int(hms_code[6:7], 16) + 1
|
||||
ams_error = ams_error.replace('AMS1', f"AMS{ams_index}")
|
||||
ams_error = ams_error.replace('slot 1', f"slot {ams_slot}")
|
||||
return ams_error
|
||||
|
||||
return HMS_ERRORS.get(hms_code, "unknown")
|
||||
|
||||
|
||||
def get_print_error_text(print_error_code: str):
|
||||
"""Return the human-readable description for a print error"""
|
||||
|
||||
hex_conversion = f'0{int(print_error_code):x}'
|
||||
print_error_code = hex_conversion[slice(0,4,1)] + "_" + hex_conversion[slice(4,8,1)]
|
||||
print_error = PRINT_ERROR_ERRORS.get(print_error_code.upper(), "")
|
||||
if print_error != "":
|
||||
return print_error
|
||||
|
||||
return PRINT_ERROR_ERRORS.get(print_error_code, "unknown")
|
||||
|
||||
|
||||
def get_HMS_severity(code: int) -> str:
|
||||
uint_code = code >> 16
|
||||
if code > 0 and uint_code in HMS_SEVERITY_LEVELS:
|
||||
return HMS_SEVERITY_LEVELS[uint_code]
|
||||
return HMS_SEVERITY_LEVELS["default"]
|
||||
|
||||
|
||||
def get_HMS_module(attr: int) -> str:
|
||||
uint_attr = (attr >> 24) & 0xFF
|
||||
if attr > 0 and uint_attr in HMS_MODULES:
|
||||
return HMS_MODULES[uint_attr]
|
||||
return HMS_MODULES["default"]
|
||||
|
||||
|
||||
def get_generic_AMS_HMS_error_code(hms_code: str):
|
||||
code1 = int(hms_code[0:4], 16)
|
||||
code2 = int(hms_code[5:9], 16)
|
||||
code3 = int(hms_code[10:14], 16)
|
||||
code4 = int(hms_code[15:19], 16)
|
||||
|
||||
# 070X_xYxx_xxxx_xxxx = AMS X (0 based index) Slot Y (0 based index) has the error
|
||||
ams_code = f"{code1 & 0xFFF8:0>4X}_{code2 & 0xF8FF:0>4X}_{code3:0>4X}_{code4:0>4X}"
|
||||
ams_error = HMS_AMS_ERRORS.get(ams_code, "")
|
||||
if ams_error != "":
|
||||
return ams_code
|
||||
|
||||
return f"{code1:0>4X}_{code2:0>4X}_{code3:0>4X}_{code4:0>4X}"
|
||||
|
||||
|
||||
def get_printer_type(modules, default):
|
||||
# Known possible values:
|
||||
#
|
||||
# A1/P1 printers are of the form:
|
||||
# {
|
||||
# "name": "esp32",
|
||||
# "project_name": "C11",
|
||||
# "sw_ver": "01.07.23.47",
|
||||
# "hw_ver": "AP04",
|
||||
# "sn": "**REDACTED**",
|
||||
# "flag": 0
|
||||
# },
|
||||
# P1P = AP04 / C11
|
||||
# P1S = AP04 / C12
|
||||
# A1Mini = AP05 / N1 or AP04 / N1 or AP07 / N1
|
||||
# A1 = AP05 / N2S
|
||||
#
|
||||
# X1C printers are of the form:
|
||||
# {
|
||||
# "hw_ver": "AP05",
|
||||
# "name": "rv1126",
|
||||
# "sn": "**REDACTED**",
|
||||
# "sw_ver": "00.00.28.55"
|
||||
# },
|
||||
# X1C = AP05
|
||||
#
|
||||
# X1E printers are of the form:
|
||||
# {
|
||||
# "flag": 0,
|
||||
# "hw_ver": "AP02",
|
||||
# "name": "ap",
|
||||
# "sn": "**REDACTED**",
|
||||
# "sw_ver": "00.00.32.14"
|
||||
# }
|
||||
# X1E = AP02
|
||||
|
||||
apNode = search(modules, lambda x: x.get('hw_ver', "").find("AP0") == 0)
|
||||
if len(apNode.keys()) > 1:
|
||||
hw_ver = apNode['hw_ver']
|
||||
project_name = apNode.get('project_name', '')
|
||||
if hw_ver == 'AP02':
|
||||
return 'X1E'
|
||||
elif project_name == 'N1':
|
||||
return 'A1MINI'
|
||||
elif hw_ver == 'AP04':
|
||||
if project_name == 'C11':
|
||||
return 'P1P'
|
||||
if project_name == 'C12':
|
||||
return 'P1S'
|
||||
elif hw_ver == 'AP05':
|
||||
if project_name == 'N2S':
|
||||
return 'A1'
|
||||
if project_name == '':
|
||||
return 'X1C'
|
||||
LOGGER.debug(f"UNKNOWN DEVICE: hw_ver='{hw_ver}' / project_name='{project_name}'")
|
||||
return default
|
||||
|
||||
|
||||
def get_hw_version(modules, default):
|
||||
"""Retrieve hardware version of printer"""
|
||||
apNode = search(modules, lambda x: x.get('hw_ver', "").find("AP0") == 0)
|
||||
if len(apNode.keys()) > 1:
|
||||
return apNode.get("hw_ver")
|
||||
return default
|
||||
|
||||
|
||||
def get_sw_version(modules, default):
|
||||
"""Retrieve software version of printer"""
|
||||
ota = search(modules, lambda x: x.get('name', "") == "ota")
|
||||
if len(ota.keys()) > 1:
|
||||
return ota.get("sw_ver")
|
||||
return default
|
||||
|
||||
|
||||
def get_start_time(timestamp):
|
||||
"""Return start time of a print"""
|
||||
if timestamp == 0:
|
||||
return None
|
||||
return datetime.fromtimestamp(timestamp)
|
||||
|
||||
|
||||
def get_end_time(remaining_time):
|
||||
"""Calculate the end time of a print"""
|
||||
end_time = round_minute(datetime.now() + timedelta(minutes=remaining_time))
|
||||
return end_time
|
||||
|
||||
|
||||
def round_minute(date: datetime = None, round_to: int = 1):
|
||||
""" Round datetime object to minutes"""
|
||||
if not date:
|
||||
date = datetime.now()
|
||||
date = date.replace(second=0, microsecond=0)
|
||||
delta = date.minute % round_to
|
||||
return date.replace(minute=date.minute - delta)
|
Reference in New Issue
Block a user