jneilliii 5c8a9787d4 0.1.8rc8
* updates to pybambu module
* update cloud login workflow, #59
2024-11-09 20:01:41 -05:00

1457 lines
54 KiB
Python

import math
from dataclasses import dataclass, field
from datetime import datetime
from dateutil import parser, tz
from packaging import version
from .utils import (
search,
fan_percentage,
fan_percentage_to_gcode,
get_current_stage,
get_filament_name,
get_printer_type,
get_speed_name,
get_hw_version,
get_sw_version,
get_start_time,
get_end_time,
get_HMS_error_text,
get_print_error_text,
get_generic_AMS_HMS_error_code,
get_HMS_severity,
get_HMS_module,
)
from .const import (
LOGGER,
Features,
FansEnum,
Home_Flag_Values,
SdcardState,
SPEED_PROFILE,
GCODE_STATE_OPTIONS,
PRINT_TYPE_OPTIONS,
)
from .commands import (
CHAMBER_LIGHT_ON,
CHAMBER_LIGHT_OFF,
SPEED_PROFILE_TEMPLATE,
)
class Device:
def __init__(self, client):
self._client = client
self.temperature = Temperature()
self.lights = Lights(client = client)
self.info = Info(client = client)
self.print_job = PrintJob(client = client)
self.fans = Fans(client = client)
self.speed = Speed(client = client)
self.stage = StageAction()
self.ams = AMSList(client = client)
self.external_spool = ExternalSpool(client = client)
self.hms = HMSList(client = client)
self.print_error = PrintErrorList(client = client)
self.camera = Camera()
self.home_flag = HomeFlag(client=client)
self.push_all_data = None
self.get_version_data = None
if self.supports_feature(Features.CAMERA_IMAGE):
self.chamber_image = ChamberImage(client = client)
self.cover_image = CoverImage(client = client)
def print_update(self, data) -> bool:
send_event = False
send_event = send_event | self.info.print_update(data = data)
send_event = send_event | self.print_job.print_update(data = data)
send_event = send_event | self.temperature.print_update(data = data)
send_event = send_event | self.lights.print_update(data = data)
send_event = send_event | self.fans.print_update(data = data)
send_event = send_event | self.speed.print_update(data = data)
send_event = send_event | self.stage.print_update(data = data)
send_event = send_event | self.ams.print_update(data = data)
send_event = send_event | self.external_spool.print_update(data = data)
send_event = send_event | self.hms.print_update(data = data)
send_event = send_event | self.print_error.print_update(data = data)
send_event = send_event | self.camera.print_update(data = data)
send_event = send_event | self.home_flag.print_update(data = data)
if send_event and self._client.callback is not None:
self._client.callback("event_printer_data_update")
if data.get("msg", 0) == 0:
self.push_all_data = data
def info_update(self, data):
self.info.info_update(data = data)
self.home_flag.info_update(data = data)
self.ams.info_update(data = data)
if data.get("command") == "get_version":
self.get_version_data = data
def supports_feature(self, feature):
if feature == Features.AUX_FAN:
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
elif feature == Features.CHAMBER_LIGHT:
return True
elif feature == Features.CHAMBER_FAN:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E" or self.info.device_type == "P1P" or self.info.device_type == "P1S"
elif feature == Features.CHAMBER_TEMPERATURE:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.CURRENT_STAGE:
return True
elif feature == Features.PRINT_LAYERS:
return True
elif feature == Features.AMS:
return len(self.ams.data) != 0
elif feature == Features.EXTERNAL_SPOOL:
return True
elif feature == Features.K_VALUE:
return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI"
elif feature == Features.START_TIME:
return False
elif feature == Features.START_TIME_GENERATED:
return True
elif feature == Features.AMS_TEMPERATURE:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.CAMERA_RTSP:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.CAMERA_IMAGE:
return (self._client.host != "") and (self._client._access_code != "") and (self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI")
elif feature == Features.DOOR_SENSOR:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.MANUAL_MODE:
return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI"
elif feature == Features.AMS_FILAMENT_REMAINING:
# Technically this is not the AMS Lite but that's currently tied to only these printer types.
return self.info.device_type != "A1" and self.info.device_type != "A1MINI"
return False
def get_active_tray(self):
if self.supports_feature(Features.AMS):
if self.ams.tray_now == 255:
return None
if self.ams.tray_now == 254:
return self.external_spool
active_ams = self.ams.data[math.floor(self.ams.tray_now / 4)]
active_tray = self.ams.tray_now % 4
return None if active_ams is None else active_ams.tray[active_tray]
else:
return self.external_spool
@dataclass
class Lights:
"""Return all light related info"""
chamber_light: str
chamber_light_override: str
work_light: str
def __init__(self, client):
self._client = client
self.chamber_light = "unknown"
self.work_light = "unknown"
self.chamber_light_override = ""
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# "lights_report": [
# {
# "node": "chamber_light",
# "mode": "on"
# },
# {
# "node": "work_light", # X1 only
# "mode": "flashing"
# }
# ],
chamber_light = \
search(data.get("lights_report", []), lambda x: x.get('node', "") == "chamber_light",
{"mode": self.chamber_light}).get("mode")
if self.chamber_light_override != "":
if self.chamber_light_override == chamber_light:
self.chamber_light_override = ""
else:
self.chamber_light = chamber_light
self.work_light = \
search(data.get("lights_report", []), lambda x: x.get('node', "") == "work_light",
{"mode": self.work_light}).get("mode")
return (old_data != f"{self.__dict__}")
def TurnChamberLightOn(self):
self.chamber_light = "on"
self.chamber_light_override = "on"
if self._client.callback is not None:
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_ON)
def TurnChamberLightOff(self):
self.chamber_light = "off"
self.chamber_light_override = "off"
if self._client.callback is not None:
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_OFF)
@dataclass
class Camera:
"""Return camera related info"""
recording: str
resolution: str
rtsp_url: str
timelapse: str
def __init__(self):
self.recording = ''
self.resolution = ''
self.rtsp_url = None
self.timelapse = ''
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# "ipcam": {
# "ipcam_dev": "1",
# "ipcam_record": "enable",
# "mode_bits": 2,
# "resolution": "1080p",
# "rtsp_url": "rtsps://192.168.1.64/streaming/live/1",
# "timelapse": "disable",
# "tutk_server": "disable"
# }
self.timelapse = data.get("ipcam", {}).get("timelapse", self.timelapse)
self.recording = data.get("ipcam", {}).get("ipcam_record", self.recording)
self.resolution = data.get("ipcam", {}).get("resolution", self.resolution)
self.rtsp_url = data.get("ipcam", {}).get("rtsp_url", self.rtsp_url)
return (old_data != f"{self.__dict__}")
@dataclass
class Temperature:
"""Return all temperature related info"""
bed_temp: int
target_bed_temp: int
chamber_temp: int
nozzle_temp: int
target_nozzle_temp: int
def __init__(self):
self.bed_temp = 0
self.target_bed_temp = 0
self.chamber_temp = 0
self.nozzle_temp = 0
self.target_nozzle_temp = 0
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self.bed_temp = round(data.get("bed_temper", self.bed_temp))
self.target_bed_temp = round(data.get("bed_target_temper", self.target_bed_temp))
self.chamber_temp = round(data.get("chamber_temper", self.chamber_temp))
self.nozzle_temp = round(data.get("nozzle_temper", self.nozzle_temp))
self.target_nozzle_temp = round(data.get("nozzle_target_temper", self.target_nozzle_temp))
return (old_data != f"{self.__dict__}")
@dataclass
class Fans:
"""Return all fan related info"""
_aux_fan_speed_percentage: int
_aux_fan_speed: int
_aux_fan_speed_override: int
_aux_fan_speed_override_time: datetime
_chamber_fan_speed_percentage: int
_chamber_fan_speed: int
_chamber_fan_speed_override: int
_chamber_fan_speed_override_time: datetime
_cooling_fan_speed_percentage: int
_cooling_fan_speed: int
_cooling_fan_speed_override: int
_cooling_fan_speed_override_time: datetime
_heatbreak_fan_speed_percentage: int
_heatbreak_fan_speed: int
def __init__(self, client):
self._client = client
self._aux_fan_speed_percentage = 0
self._aux_fan_speed = 0
self._aux_fan_speed_override = 0
self._aux_fan_speed_override_time = None
self._chamber_fan_speed_percentage = 0
self._chamber_fan_speed = 0
self._chamber_fan_speed_override = 0
self._chamber_fan_speed_override_time = None
self._cooling_fan_speed_percentage = 0
self._cooling_fan_speed = 0
self._cooling_fan_speed_override = 0
self._cooling_fan_speed_override_time = None
self._heatbreak_fan_speed_percentage = 0
self._heatbreak_fan_speed = 0
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self._aux_fan_speed = data.get("big_fan1_speed", self._aux_fan_speed)
self._aux_fan_speed_percentage = fan_percentage(self._aux_fan_speed)
if self._aux_fan_speed_override_time is not None:
delta = datetime.now() - self._aux_fan_speed_override_time
if delta.seconds > 5:
self._aux_fan_speed_override_time = None
self._chamber_fan_speed = data.get("big_fan2_speed", self._chamber_fan_speed)
self._chamber_fan_speed_percentage = fan_percentage(self._chamber_fan_speed)
if self._chamber_fan_speed_override_time is not None:
delta = datetime.now() - self._chamber_fan_speed_override_time
if delta.seconds > 5:
self._chamber_fan_speed_override_time = None
self._cooling_fan_speed = data.get("cooling_fan_speed", self._cooling_fan_speed)
self._cooling_fan_speed_percentage = fan_percentage(self._cooling_fan_speed)
if self._cooling_fan_speed_override_time is not None:
delta = datetime.now() - self._cooling_fan_speed_override_time
if delta.seconds > 5:
self._cooling_fan_speed_override_time = None
self._heatbreak_fan_speed = data.get("heatbreak_fan_speed", self._heatbreak_fan_speed)
self._heatbreak_fan_speed_percentage = fan_percentage(self._heatbreak_fan_speed)
return (old_data != f"{self.__dict__}")
def set_fan_speed(self, fan: FansEnum, percentage: int):
"""Set fan speed"""
percentage = round(percentage / 10) * 10
command = fan_percentage_to_gcode(fan, percentage)
if fan == FansEnum.PART_COOLING:
self._cooling_fan_speed = percentage
self._cooling_fan_speed_override_time = datetime.now()
elif fan == FansEnum.AUXILIARY:
self._aux_fan_speed_override = percentage
self._aux_fan_speed_override_time = datetime.now()
elif fan == FansEnum.CHAMBER:
self._chamber_fan_speed_override = percentage
self._chamber_fan_speed_override_time = datetime.now()
LOGGER.debug(command)
self._client.publish(command)
if self._client.callback is not None:
self._client.callback("event_printer_data_update")
def get_fan_speed(self, fan: FansEnum) -> int:
if fan == FansEnum.PART_COOLING:
if self._cooling_fan_speed_override_time is not None:
return self._cooling_fan_speed_override
else:
return self._cooling_fan_speed_percentage
elif fan == FansEnum.AUXILIARY:
if self._aux_fan_speed_override_time is not None:
return self._aux_fan_speed_override
else:
return self._aux_fan_speed_percentage
elif fan == FansEnum.CHAMBER:
if self._chamber_fan_speed_override_time is not None:
return self._chamber_fan_speed_override
else:
return self._chamber_fan_speed_percentage
elif fan == FansEnum.HEATBREAK:
return self._heatbreak_fan_speed_percentage
@dataclass
class PrintJob:
"""Return all information related content"""
print_percentage: int
gcode_state: str
file_type_icon: str
gcode_file: str
subtask_name: str
start_time: datetime
end_time: datetime
remaining_time: int
current_layer: int
total_layers: int
print_error: int
print_weight: float
print_length: int
print_bed_type: str
print_type: str
_ams_print_weights: float
_ams_print_lengths: float
@property
def get_ams_print_weights(self) -> float:
values = {}
for i in range(16):
if self._ams_print_weights[i] != 0:
values[f"AMS Slot {i+1}"] = self._ams_print_weights[i]
return values
@property
def get_ams_print_lengths(self) -> float:
values = {}
for i in range(16):
if self._ams_print_lengths[i] != 0:
values[f"AMS Slot {i+1}"] = self._ams_print_lengths[i]
return values
def __init__(self, client):
self._client = client
self.print_percentage = 0
self.gcode_state = "unknown"
self.gcode_file = ""
self.subtask_name = ""
self.start_time = None
self.end_time = None
self.remaining_time = 0
self.current_layer = 0
self.total_layers = 0
self.print_error = 0
self.print_weight = 0
self._ams_print_weights = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self._ams_print_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.print_length = 0
self.print_bed_type = "unknown"
self.file_type_icon = "mdi:file"
self.print_type = ""
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# Example payload:
# {
# "print": {
# "gcode_start_time": "1681479206",
# "gcode_state": "IDLE",
# "mc_print_stage": "1",
# "mc_percent": 100,
# "mc_remaining_time": 0,
# "wifi_signal": "-53dBm",
# "print_type": "idle",
# "ipcam": {
# "ipcam_dev": "1",
# "ipcam_record": "enable"
# "resolution": "1080p", # X1 only
# "timelapse": "disable"
# },
# "layer_num": 0,
# "total_layer_num": 0,
self.print_percentage = data.get("mc_percent", self.print_percentage)
previous_gcode_state = self.gcode_state
self.gcode_state = data.get("gcode_state", self.gcode_state)
if previous_gcode_state != self.gcode_state:
LOGGER.debug(f"GCODE_STATE: {previous_gcode_state} -> {self.gcode_state}")
if self.gcode_state.lower() not in GCODE_STATE_OPTIONS:
LOGGER.error(f"Unknown gcode_state. Please log an issue : '{self.gcode_state}'")
self.gcode_state = "unknown"
if previous_gcode_state != self.gcode_state:
LOGGER.debug(f"GCODE_STATE: {previous_gcode_state} -> {self.gcode_state}")
self.gcode_file = data.get("gcode_file", self.gcode_file)
self.print_type = data.get("print_type", self.print_type)
if self.print_type.lower() not in PRINT_TYPE_OPTIONS:
if self.print_type != "":
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'")
self.print_type = "unknown"
self.subtask_name = data.get("subtask_name", self.subtask_name)
self.file_type_icon = "mdi:file" if self.print_type != "cloud" else "mdi:cloud-outline"
self.current_layer = data.get("layer_num", self.current_layer)
self.total_layers = data.get("total_layer_num", self.total_layers)
# Initialize task data at startup.
if previous_gcode_state == "unknown" and self.gcode_state != "unknown":
self._update_task_data()
# Calculate start / end time after we update task data so we don't stomp on prepopulated values while idle on integration start.
if data.get("gcode_start_time") is not None:
if self.start_time != get_start_time(int(data.get("gcode_start_time"))):
LOGGER.debug(f"GCODE START TIME: {self.start_time}")
self.start_time = get_start_time(int(data.get("gcode_start_time")))
# Generate the end_time from the remaining_time mqtt payload value if present.
if data.get("mc_remaining_time") is not None:
existing_remaining_time = self.remaining_time
self.remaining_time = data.get("mc_remaining_time")
if existing_remaining_time != self.remaining_time:
self.end_time = get_end_time(self.remaining_time)
LOGGER.debug(f"END TIME2: {self.end_time}")
# Handle print start
previously_idle = previous_gcode_state == "IDLE" or previous_gcode_state == "FAILED" or previous_gcode_state == "FINISH"
currently_idle = self.gcode_state == "IDLE" or self.gcode_state == "FAILED" or self.gcode_state == "FINISH"
if previously_idle and not currently_idle:
if self._client.callback is not None:
self._client.callback("event_print_started")
# Generate the start_time for P1P/S when printer moves from idle to another state. Original attempt with remaining time
# becoming non-zero didn't work as it never bounced to zero in at least the scenario where a print was canceled.
if self._client._device.supports_feature(Features.START_TIME_GENERATED):
# We can use the existing get_end_time helper to format date.now() as desired by passing 0.
self.start_time = get_end_time(0)
# Make sure we don't keep using a stale end time.
self.end_time = None
LOGGER.debug(f"GENERATED START TIME: {self.start_time}")
# Update task data if bambu cloud connected
self._update_task_data()
# When a print is canceled by the user, this is the payload that's sent. A couple of seconds later
# print_error will be reset to zero.
# {
# "print": {
# "print_error": 50348044,
# }
# }
isCanceledPrint = False
if data.get("print_error") == 50348044 and self.print_error == 0:
isCanceledPrint = True
if self._client.callback is not None:
self._client.callback("event_print_canceled")
self.print_error = data.get("print_error", self.print_error)
# Handle print failed
if previous_gcode_state != "unknown" and previous_gcode_state != "FAILED" and self.gcode_state == "FAILED":
if not isCanceledPrint:
if self._client.callback is not None:
self._client.callback("event_print_failed")
# Handle print finish
if previous_gcode_state != "unknown" and previous_gcode_state != "FINISH" and self.gcode_state == "FINISH":
if self._client.callback is not None:
self._client.callback("event_print_finished")
if currently_idle and not previously_idle and previous_gcode_state != "unknown":
if self.start_time != None:
# self.end_time isn't updated if we hit an AMS retract at print end but the printer does count that entire
# paused time as usage hours. So we need to use the current time instead of the last recorded end time in
# our calculation here.
duration = datetime.now() - self.start_time
# Round usage hours to 2 decimal places (about 1/2 a minute accuracy)
new_hours = round((duration.seconds / 60 / 60) * 100) / 100
LOGGER.debug(f"NEW USAGE HOURS: {new_hours}")
self._client._device.info.usage_hours += new_hours
return (old_data != f"{self.__dict__}")
# The task list is of the following form with a 'hits' array with typical 20 entries.
#
# "total": 531,
# "hits": [
# {
# "id": 35237965,
# "designId": 0,
# "designTitle": "",
# "instanceId": 0,
# "modelId": "REDACTED",
# "title": "REDACTED",
# "cover": "REDACTED",
# "status": 4,
# "feedbackStatus": 0,
# "startTime": "2023-12-21T19:02:16Z",
# "endTime": "2023-12-21T19:02:35Z",
# "weight": 34.62,
# "length": 1161,
# "costTime": 10346,
# "profileId": 35276233,
# "plateIndex": 1,
# "plateName": "",
# "deviceId": "REDACTED",
# "amsDetailMapping": [
# {
# "ams": 4,
# "sourceColor": "F4D976FF",
# "targetColor": "F4D976FF",
# "filamentId": "GFL99",
# "filamentType": "PLA",
# "targetFilamentType": "",
# "weight": 34.62
# }
# ],
# "mode": "cloud_file",
# "isPublicProfile": false,
# "isPrintable": true,
# "deviceModel": "P1P",
# "deviceName": "Bambu P1P",
# "bedType": "textured_plate"
# },
def _update_task_data(self):
if self._client.bambu_cloud.auth_token != "":
self._task_data = self._client.bambu_cloud.get_latest_task_for_printer(self._client._serial)
if self._task_data is None:
LOGGER.debug("No bambu cloud task data found for printer.")
self._client._device.cover_image.set_jpeg(None)
self.print_weight = 0
self._ams_print_weights = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self._ams_print_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.print_length = 0
self.print_bed_type = "unknown"
self.start_time = None
self.end_time = None
else:
LOGGER.debug("Updating bambu cloud task data found for printer.")
url = self._task_data.get('cover', '')
if url != "":
data = self._client.bambu_cloud.download(url)
self._client._device.cover_image.set_jpeg(data)
self.print_length = self._task_data.get('length', self.print_length * 100) / 100
self.print_bed_type = self._task_data.get('bedType', self.print_bed_type)
self.print_weight = self._task_data.get('weight', self.print_weight)
ams_print_data = self._task_data.get('amsDetailMapping', [])
self._ams_print_weights = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self._ams_print_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
if self.print_weight != 0:
for ams_data in ams_print_data:
index = ams_data['ams']
weight = ams_data['weight']
self._ams_print_weights[index] = weight
self._ams_print_lengths[index] = self.print_length * weight / self.print_weight
status = self._task_data['status']
LOGGER.debug(f"CLOUD PRINT STATUS: {status}")
if self._client._device.supports_feature(Features.START_TIME_GENERATED) and (status == 4):
# If we generate the start time (not X1), then rely more heavily on the cloud task data and
# do so uniformly so we always have matched start/end times.
# "startTime": "2023-12-21T19:02:16Z"
cloud_time_str = self._task_data.get('startTime', "")
LOGGER.debug(f"CLOUD START TIME1: {self.start_time}")
if cloud_time_str != "":
local_dt = parser.parse(cloud_time_str).astimezone(tz.tzlocal())
# Convert it to timestamp and back to get rid of timezone in printed output to match datetime objects created from mqtt timestamps.
local_dt = datetime.fromtimestamp(local_dt.timestamp())
self.start_time = local_dt
LOGGER.debug(f"CLOUD START TIME2: {self.start_time}")
# "endTime": "2023-12-21T19:02:35Z"
cloud_time_str = self._task_data.get('endTime', "")
LOGGER.debug(f"CLOUD END TIME1: {self.end_time}")
if cloud_time_str != "":
local_dt = parser.parse(cloud_time_str).astimezone(tz.tzlocal())
# Convert it to timestamp and back to get rid of timezone in printed output to match datetime objects created from mqtt timestamps.
local_dt = datetime.fromtimestamp(local_dt.timestamp())
self.end_time = local_dt
LOGGER.debug(f"CLOUD END TIME2: {self.end_time}")
@dataclass
class Info:
"""Return all device related content"""
# Device state
serial: str
device_type: str
wifi_signal: int
device_type: str
hw_ver: str
sw_ver: str
online: bool
new_version_state: int
mqtt_mode: str
nozzle_diameter: float
nozzle_type: str
usage_hours: float
def __init__(self, client):
self._client = client
self.serial = self._client._serial
self.device_type = self._client._device_type.upper()
self.wifi_signal = 0
self.hw_ver = "unknown"
self.sw_ver = "unknown"
self.online = False
self.new_version_state = 0
self.mqtt_mode = "local" if self._client.local_mqtt else "bambu_cloud"
self.nozzle_diameter = 0
self.nozzle_type = "unknown"
self.usage_hours = client._usage_hours
def set_online(self, online):
if self.online != online:
self.online = online
if self._client.callback is not None:
self._client.callback("event_printer_data_update")
def info_update(self, data):
# Example payload:
# {
# "info": {
# "command": "get_version",
# "sequence_id": "20004",
# "module": [
# {
# "name": "ota",
# "project_name": "C11",
# "sw_ver": "01.02.03.00",
# "hw_ver": "OTA",
# "sn": "..."
# },
# {
# "name": "esp32",
# "project_name": "C11",
# "sw_ver": "00.03.12.31",
# "hw_ver": "AP04",
# "sn": "..."
# },
modules = data.get("module", [])
self.device_type = get_printer_type(modules, self.device_type)
LOGGER.debug(f"Device is {self.device_type}")
self.hw_ver = get_hw_version(modules, self.hw_ver)
self.sw_ver = get_sw_version(modules, self.sw_ver)
if self._client.callback is not None:
self._client.callback("event_printer_info_update")
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# Example payload:
# {
# "print": {
# "gcode_start_time": "1681479206",
# "gcode_state": "IDLE",
# "mc_print_stage": "1",
# "mc_percent": 100,
# "mc_remaining_time": 0,
# "wifi_signal": "-53dBm",
# "print_type": "idle",
# "ipcam": {
# "ipcam_dev": "1",
# "ipcam_record": "enable"
# "resolution": "1080p", # X1 only
# "timelapse": "disable"
# },
# "layer_num": 0,
# "total_layer_num": 0,
self.wifi_signal = int(data.get("wifi_signal", str(self.wifi_signal)).replace("dBm", ""))
# Version data is provided differently for X1 and P1
# P1P example:
# "upgrade_state": {
# "sequence_id": 0,
# "progress": "",
# "status": "",
# "consistency_request": false,
# "dis_state": 1,
# "err_code": 0,
# "force_upgrade": false,
# "message": "",
# "module": "",
# "new_version_state": 1,
# "new_ver_list": [
# {
# "name": "ota",
# "cur_ver": "01.02.03.00",
# "new_ver": "01.03.00.00"
# },
# {
# "name": "ams/0",
# "cur_ver": "00.00.05.96",
# "new_ver": "00.00.06.32"
# }
# ]
# },
# X1 example:
# "upgrade_state": {
# "ahb_new_version_number": "",
# "ams_new_version_number": "",
# "consistency_request": false,
# "dis_state": 0,
# "err_code": 0,
# "force_upgrade": false,
# "message": "",
# "module": "null",
# "new_version_state": 2,
# "ota_new_version_number": "",
# "progress": "0",
# "sequence_id": 0,
# "status": "IDLE"
# },
# The 'new_version_state' value is common to indicate a new upgrade is available.
# Observed values so far are:
# 1 - upgrade available
# 2 - no upgrades available
# And the P1P lists it's versions in new_ver_list as a structured set of data with old
# and new versions provided for each component. While the X1 lists only the new version
# in separate string properties.
self.new_version_state = data.get("upgrade_state", {}).get("new_version_state", self.new_version_state)
# "nozzle_diameter": "0.4",
# "nozzle_type": "hardened_steel",
self.nozzle_diameter = float(data.get("nozzle_diameter", self.nozzle_diameter))
self.nozzle_type = data.get("nozzle_type", self.nozzle_type)
return (old_data != f"{self.__dict__}")
@property
def has_bambu_cloud_connection(self) -> bool:
return self._client.bambu_cloud.auth_token != ""
@dataclass
class AMSInstance:
"""Return all AMS instance related info"""
serial: str
sw_version: str
hw_version: str
humidity_index: int
temperature: int
tray: list["AMSTray"]
def __init__(self, client):
self.serial = ""
self.sw_version = ""
self.hw_version = ""
self.humidity_index = 0
self.temperature = 0
self.tray = [None] * 4
self.tray[0] = AMSTray(client)
self.tray[1] = AMSTray(client)
self.tray[2] = AMSTray(client)
self.tray[3] = AMSTray(client)
@dataclass
class AMSList:
"""Return all AMS related info"""
tray_now: int
data: list[AMSInstance]
def __init__(self, client):
self._client = client
self.tray_now = 0
self.data = [None] * 4
self._first_initialization_done = False
def info_update(self, data):
old_data = f"{self.__dict__}"
# First determine if this the version info data or the json payload data. We use the version info to determine
# what devices to add to humidity_index assistant and add all the sensors as entities. And then then json payload data
# to populate the values for all those entities.
# The module entries are of this form (P1/X1):
# {
# "name": "ams/0",
# "project_name": "",
# "sw_ver": "00.00.05.96",
# "loader_ver": "00.00.00.00",
# "ota_ver": "00.00.00.00",
# "hw_ver": "AMS08",
# "sn": "<SERIAL>"
# }
# AMS Lite of the form:
# {
# "name": "ams_f1/0",
# "project_name": "",
# "sw_ver": "00.00.07.89",
# "loader_ver": "00.00.00.00",
# "ota_ver": "00.00.00.00",
# "hw_ver": "AMS_F102",
# "sn": "**REDACTED**"
# }
data_changed = False
module_list = data.get("module", [])
for module in module_list:
name = module["name"]
index = -1
if name.startswith("ams/"):
index = int(name[4])
elif name.startswith("ams_f1/"):
index = int(name[7])
if index != -1:
# Sometimes we get incomplete version data. We have to skip if that occurs since the serial number is
# required as part of the home assistant device identity.
if not module['sn'] == '':
# May get data before info so create entries if necessary
if self.data[index] is None:
self.data[index] = AMSInstance(self._client)
if self.data[index].serial != module['sn']:
data_changed = True
self.data[index].serial = module['sn']
if self.data[index].sw_version != module['sw_ver']:
data_changed = True
self.data[index].sw_version = module['sw_ver']
if self.data[index].hw_version != module['hw_ver']:
data_changed = True
self.data[index].hw_version = module['hw_ver']
elif not self._first_initialization_done:
self._first_initialization_done = True
data_changed = True
data_changed = data_changed or (old_data != f"{self.__dict__}")
if data_changed:
if self._client.callback is not None:
self._client.callback("event_ams_info_update")
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# AMS json payload is of the form:
# "ams": {
# "ams": [
# {
# "id": "0",
# "humidity": "4",
# "temp": "0.0",
# "tray": [
# {
# "id": "0",
# "remain": -1,
# "k": 0.019999999552965164, # P1P only
# "n": 1.399999976158142, # P1P only
# "tag_uid": "0000000000000000",
# "tray_id_name": "",
# "tray_info_idx": "GFL99",
# "tray_type": "PLA",
# "tray_sub_brands": "",
# "tray_color": "FFFF00FF", # RRGGBBAA
# "tray_weight": "0",
# "tray_diameter": "0.00",
# "drying_temp": "0",
# "drying_time": "0",
# "bed_temp_type": "0",
# "bed_temp": "0",
# "nozzle_temp_max": "240",
# "nozzle_temp_min": "190",
# "remain": 100,
# "xcam_info": "000000000000000000000000",
# "tray_uuid": "00000000000000000000000000000000"
# },
# {
# "id": "1",
# ...
# },
# {
# "id": "2",
# ...
# },
# {
# "id": "3",
# ...
# }
# ]
# }
# ],
# "ams_exist_bits": "1",
# "tray_exist_bits": "f",
# "tray_is_bbl_bits": "f",
# "tray_now": "255",
# "tray_read_done_bits": "f",
# "tray_reading_bits": "0",
# "tray_tar": "255",
# "version": 3,
# "insert_flag": true,
# "power_on_flag": false
# },
ams_data = data.get("ams", [])
if len(ams_data) != 0:
self.tray_now = int(ams_data.get('tray_now', self.tray_now))
ams_list = ams_data.get("ams", [])
for ams in ams_list:
index = int(ams['id'])
# May get data before info so create entry if necessary
if self.data[index] is None:
self.data[index] = AMSInstance(self._client)
if self.data[index].humidity_index != int(ams['humidity']):
self.data[index].humidity_index = int(ams['humidity'])
if self.data[index].temperature != float(ams['temp']):
self.data[index].temperature = float(ams['temp'])
tray_list = ams['tray']
for tray in tray_list:
tray_id = int(tray['id'])
self.data[index].tray[tray_id].print_update(tray)
data_changed = (old_data != f"{self.__dict__}")
return data_changed
@dataclass
class AMSTray:
"""Return all AMS tray related info"""
empty: bool
idx: int
name: str
type: str
sub_brands: str
color: str
nozzle_temp_min: int
nozzle_temp_max: int
remain: int
k: float
tag_uid: str
tray_uuid: str
def __init__(self, client):
self._client = client
self.empty = True
self.idx = ""
self.name = ""
self.type = ""
self.sub_brands = ""
self.color = "00000000" # RRGGBBAA
self.nozzle_temp_min = 0
self.nozzle_temp_max = 0
self.remain = 0
self.k = 0
self.tag_uid = ""
self.tray_uuid = ""
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
if len(data) == 1:
# If the data is exactly one entry then it's just the ID and the tray is empty.
self.empty = True
self.idx = ""
self.name = "Empty"
self.type = "Empty"
self.sub_brands = ""
self.color = "00000000" # RRGGBBAA
self.nozzle_temp_min = 0
self.nozzle_temp_max = 0
self.remain = 0
self.tag_uid = ""
self.tray_uuid = ""
self.k = 0
else:
self.empty = False
self.idx = data.get('tray_info_idx', self.idx)
self.name = get_filament_name(self.idx, self._client.slicer_settings.custom_filaments)
self.type = data.get('tray_type', self.type)
self.sub_brands = data.get('tray_sub_brands', self.sub_brands)
self.color = data.get('tray_color', self.color)
self.nozzle_temp_min = data.get('nozzle_temp_min', self.nozzle_temp_min)
self.nozzle_temp_max = data.get('nozzle_temp_max', self.nozzle_temp_max)
self.remain = data.get('remain', self.remain)
self.tag_uid = data.get('tag_uid', self.tag_uid)
self.tray_uuid = data.get('tray_uuid', self.tray_uuid)
self.k = data.get('k', self.k)
return (old_data != f"{self.__dict__}")
@dataclass
class ExternalSpool(AMSTray):
"""Return the virtual tray related info"""
def __init__(self, client):
super().__init__(client)
self._client = client
def print_update(self, data) -> bool:
# P1P virtual tray example
# "vt_tray": {
# "id": "254",
# "tag_uid": "0000000000000000",
# "tray_id_name": "",
# "tray_info_idx": "GFB99",
# "tray_type": "ABS",
# "tray_sub_brands": "",
# "tray_color": "000000FF",
# "tray_weight": "0",
# "tray_diameter": "0.00",
# "tray_temp": "0",
# "tray_time": "0",
# "bed_temp_type": "0",
# "bed_temp": "0",
# "nozzle_temp_max": "280",
# "nozzle_temp_min": "240",
# "remain": 100,
# "xcam_info": "000000000000000000000000",
# "tray_uuid": "00000000000000000000000000000000",
# "remain": 0,
# "k": 0.029999999329447746,
# "n": 1.399999976158142
# },
#
# This is exact same data as the AMS exposes so we can just defer to the AMSTray object
# to parse this json.
received_virtual_tray_data = False
tray_data = data.get("vt_tray", {})
if len(tray_data) != 0:
received_virtual_tray_data = super().print_update(tray_data)
return received_virtual_tray_data
@dataclass
class Speed:
"""Return speed profile information"""
_id: int
name: str
modifier: int
def __init__(self, client):
self._client = client
self._id = 2
self.name = get_speed_name(2)
self.modifier = 100
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self._id = int(data.get("spd_lvl", self._id))
self.name = get_speed_name(self._id)
self.modifier = int(data.get("spd_mag", self.modifier))
return (old_data != f"{self.__dict__}")
def SetSpeed(self, option: str):
for id, speed in SPEED_PROFILE.items():
if option == speed:
self._id = id
self.name = speed
command = SPEED_PROFILE_TEMPLATE
command['print']['param'] = f"{id}"
self._client.publish(command)
if self._client.callback is not None:
self._client.callback("event_speed_update")
@dataclass
class StageAction:
"""Return Stage Action information"""
_id: int
_print_type: str
description: str
def __init__(self):
self._id = 255
self._print_type = ""
self.description = get_current_stage(self._id)
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self._print_type = data.get("print_type", self._print_type)
if self._print_type.lower() not in PRINT_TYPE_OPTIONS:
self._print_type = "unknown"
self._id = int(data.get("stg_cur", self._id))
if (self._print_type == "idle") and (self._id == 0):
# On boot the printer reports stg_cur == 0 incorrectly instead of 255. Attempt to correct for this.
self._id = 255
self.description = get_current_stage(self._id)
return (old_data != f"{self.__dict__}")
@dataclass
class HMSList:
"""Return all HMS related info"""
_count: int
_errors: dict
def __init__(self, client):
self._client = client
self._count = 0
self._errors = {}
self._errors["Count"] = 0
def print_update(self, data) -> bool:
# Example payload:
# "hms": [
# {
# "attr": 50331904, # In hex this is 0300 0100
# "code": 65543 # In hex this is 0001 0007
# }
# ],
# So this is HMS_0300_0100_0001_0007:
# https://wiki.bambulab.com/en/x1/troubleshooting/hmscode/0300_0100_0001_0007
# 'The heatbed temperature is abnormal; the sensor may have an open circuit.'
if 'hms' in data.keys():
hmsList = data.get('hms', [])
self._count = len(hmsList)
errors = {}
errors["Count"] = self._count
index: int = 0
for hms in hmsList:
index = index + 1
attr = int(hms['attr'])
code = int(hms['code'])
hms_notif = HMSNotification(attr=attr, code=code)
errors[f"{index}-Error"] = f"HMS_{hms_notif.hms_code}: {get_HMS_error_text(hms_notif.hms_code)}"
errors[f"{index}-Wiki"] = hms_notif.wiki_url
errors[f"{index}-Severity"] = hms_notif.severity
#LOGGER.debug(f"HMS error for '{hms_notif.module}' and severity '{hms_notif.severity}': HMS_{hms_notif.hms_code}")
#errors[f"{index}-Module"] = hms_notif.module # commented out to avoid bloat with current structure
if self._errors != errors:
LOGGER.debug("Updating HMS error list.")
self._errors = errors
if self._count != 0:
LOGGER.warning(f"HMS ERRORS: {errors}")
if self._client.callback is not None:
self._client.callback("event_hms_errors")
return True
return False
@property
def errors(self) -> dict:
#LOGGER.debug(f"PROPERTYCALL: get_hms_errors")
return self._errors
@property
def error_count(self) -> int:
return self._count
@dataclass
class PrintErrorList:
"""Return all print_error related info"""
_error: dict
_count: int
def __init__(self, client):
self._error = None
self._count = 0
self._client = client
def print_update(self, data) -> bool:
# Example payload:
# "print_error": 117473286
# So this is 07008006 which we make more human readable to 0700-8006
# https://e.bambulab.com/query.php?lang=en
# 'Unable to feed filament into the extruder. This could be due to entangled filament or a stuck spool. If not, please check if the AMS PTFE tube is connected.'
if 'print_error' in data.keys():
errors = None
print_error_code = data.get('print_error')
if print_error_code != 0:
hex_conversion = f'0{int(print_error_code):x}'
print_error_code_hex = hex_conversion[slice(0,4,1)] + "_" + hex_conversion[slice(4,8,1)]
errors = {}
errors[f"Code"] = f"{print_error_code_hex.upper()}"
errors[f"Error"] = f"{print_error_code_hex.upper()}: {get_print_error_text(print_error_code)}"
# LOGGER.warning(f"PRINT ERRORS: {errors}") # This will emit a message to home assistant log every 1 second if enabled
if self._error != errors:
self._error = errors
if self._client.callback is not None:
self._client.callback("event_print_error")
# We send the error event directly so always return False for the general data event.
return False
@property
def error(self) -> dict:
return self._error
@property
def on(self) -> int:
return self._error is not None
@dataclass
class HMSNotification:
"""Return an HMS object and all associated details"""
attr: int
code: int
def __init__(self, attr: int = 0, code: int = 0):
self.attr = attr
self.code = code
@property
def severity(self):
return get_HMS_severity(self.code)
@property
def module(self):
return get_HMS_module(self.attr)
@property
def hms_code(self):
if self.attr > 0 and self.code > 0:
return f'{int(self.attr / 0x10000):0>4X}_{self.attr & 0xFFFF:0>4X}_{int(self.code / 0x10000):0>4X}_{self.code & 0xFFFF:0>4X}' # 0300_0100_0001_0007
return ""
@property
def wiki_url(self):
if self.attr > 0 and self.code > 0:
return f"https://wiki.bambulab.com/en/x1/troubleshooting/hmscode/{get_generic_AMS_HMS_error_code(self.hms_code)}"
return ""
@dataclass
class ChamberImage:
"""Returns the latest jpeg data from the P1P camera"""
def __init__(self, client):
self._client = client
self._bytes = bytearray()
def set_jpeg(self, bytes):
self._bytes = bytes
def get_jpeg(self) -> bytearray:
return self._bytes.copy()
@dataclass
class CoverImage:
"""Returns the cover image from the Bambu API"""
def __init__(self, client):
self._client = client
self._bytes = bytearray()
self._image_last_updated = datetime.now()
if self._client.callback is not None:
self._client.callback("event_printer_cover_image_update")
def set_jpeg(self, bytes):
self._bytes = bytes
self._image_last_updated = datetime.now()
def get_jpeg(self) -> bytearray:
return self._bytes
def get_last_update_time(self) -> datetime:
return self._image_last_updated
@dataclass
class HomeFlag:
"""Contains parsed _values from the homeflag sensor"""
_value: int
_sw_ver: str
_device_type: str
def __init__(self, client):
self._value = 0
self._client = client
self._sw_ver = ""
self._device_type = ""
def info_update(self, data):
modules = data.get("module", [])
self._device_type = get_printer_type(modules, self._device_type)
self._sw_ver = get_sw_version(modules, self._sw_ver)
def print_update(self, data: dict) -> bool:
old_data = f"{self.__dict__}"
self._value = int(data.get("home_flag", str(self._value)))
return (old_data != f"{self.__dict__}")
@property
def door_open(self) -> bool or None:
if not self.door_open_available:
return None
return (self._value & Home_Flag_Values.DOOR_OPEN) != 0
@property
def door_open_available(self) -> bool:
if not self._client._device.supports_feature(Features.DOOR_SENSOR):
return False
if (self._device_type in ["X1", "X1C"] and version.parse(self._sw_ver) < version.parse("01.07.00.00")):
return False
return True
@property
def x_axis_homed(self) -> bool:
return (self._value & Home_Flag_Values.X_AXIS) != 0
@property
def y_axis_homed(self) -> bool:
return (self._value & Home_Flag_Values.Y_AXIS) != 0
@property
def z_axis_homed(self) -> bool:
return (self._value & Home_Flag_Values.Z_AXIS) != 0
@property
def homed(self) -> bool:
return self.x_axis_homed and self.y_axis_homed and self.z_axis_homed
@property
def is_220V(self) -> bool:
return (self._value & Home_Flag_Values.VOLTAGE220) != 0
@property
def xcam_autorecovery_steploss(self) -> bool:
return (self._value & Home_Flag_Values.XCAM_AUTO_RECOVERY_STEP_LOSS) != 0
@property
def camera_recording(self) -> bool:
return (self._value & Home_Flag_Values.CAMERA_RECORDING) != 0
@property
def ams_calibrate_remaining(self) -> bool:
return (self._value & Home_Flag_Values.AMS_CALIBRATE_REMAINING) != 0
@property
def sdcard_present(self) -> bool:
return (self._value & Home_Flag_Values.SD_CARD_STATE) != SdcardState.NO_SDCARD
@property
def sdcard_normal(self) -> bool:
return self.sdcard_present and (self._value & Home_Flag_Values.HAS_SDCARD_ABNORMAL) != SdcardState.HAS_SDCARD_ABNORMAL
@property
def ams_auto_switch_filament(self) -> bool:
return (self._value & Home_Flag_Values.AMS_AUTO_SWITCH) != 0
@property
def wired_network_connection(self):
return (self._value & Home_Flag_Values.WIRED_NETWORK) != 0
@property
def xcam_prompt_sound(self) -> bool:
return (self._value & Home_Flag_Values.XCAM_ALLOW_PROMPT_SOUND) != 0
@property
def supports_motor_noise_calibration(self) -> bool:
return (self._value & Home_Flag_Values.SUPPORTS_MOTOR_CALIBRATION) != 0
@property
def p1s_upgrade_supported(self) -> bool:
return (self._value & Home_Flag_Values.SUPPORTED_PLUS) != 0
@property
def p1s_upgrade_installed(self) -> bool:
return (self._value & Home_Flag_Values.INSTALLED_PLUS) != 0
class SlicerSettings:
custom_filaments: dict = field(default_factory=dict)
def __init__(self, client):
self._client = client
self.custom_filaments = {}
def _load_custom_filaments(self, slicer_settings: dict):
if 'private' in slicer_settings["filament"]:
for filament in slicer_settings['filament']['private']:
name = filament["name"]
if " @" in name:
name = name[:name.index(" @")]
if filament.get("filament_id", "") != "":
self.custom_filaments[filament["filament_id"]] = name
LOGGER.debug("Got custom filaments: %s", self.custom_filaments)
def update(self):
self.custom_filaments = {}
if self._client.bambu_cloud.auth_token != "" and self._client.local_mqtt is False:
LOGGER.debug("Loading slicer settings")
slicer_settings = self._client.bambu_cloud.get_slicer_settings()
if slicer_settings is not None:
self._load_custom_filaments(slicer_settings)