1425 lines
53 KiB
Python
Raw Normal View History

import math
from dataclasses import dataclass, field
from datetime import datetime
from dateutil import parser, tz
from packaging import version
from .utils import (
search,
fan_percentage,
fan_percentage_to_gcode,
get_current_stage,
get_filament_name,
get_printer_type,
get_speed_name,
get_hw_version,
get_sw_version,
get_start_time,
get_end_time,
get_HMS_error_text,
get_print_error_text,
get_generic_AMS_HMS_error_code,
get_HMS_severity,
get_HMS_module,
)
from .const import (
LOGGER,
Features,
FansEnum,
Home_Flag_Values,
SdcardState,
SPEED_PROFILE,
GCODE_STATE_OPTIONS,
PRINT_TYPE_OPTIONS,
)
from .commands import (
CHAMBER_LIGHT_ON,
CHAMBER_LIGHT_OFF,
SPEED_PROFILE_TEMPLATE,
)
class Device:
def __init__(self, client):
self._client = client
self.temperature = Temperature()
self.lights = Lights(client = client)
self.info = Info(client = client)
self.print_job = PrintJob(client = client)
self.fans = Fans(client = client)
self.speed = Speed(client = client)
self.stage = StageAction()
self.ams = AMSList(client = client)
self.external_spool = ExternalSpool(client = client)
self.hms = HMSList(client = client)
self.print_error = PrintErrorList(client = client)
self.camera = Camera()
self.home_flag = HomeFlag(client=client)
self.push_all_data = None
self.get_version_data = None
if self.supports_feature(Features.CAMERA_IMAGE):
self.chamber_image = ChamberImage(client = client)
self.cover_image = CoverImage(client = client)
def print_update(self, data) -> bool:
send_event = False
send_event = send_event | self.info.print_update(data = data)
send_event = send_event | self.print_job.print_update(data = data)
send_event = send_event | self.temperature.print_update(data = data)
send_event = send_event | self.lights.print_update(data = data)
send_event = send_event | self.fans.print_update(data = data)
send_event = send_event | self.speed.print_update(data = data)
send_event = send_event | self.stage.print_update(data = data)
send_event = send_event | self.ams.print_update(data = data)
send_event = send_event | self.external_spool.print_update(data = data)
send_event = send_event | self.hms.print_update(data = data)
send_event = send_event | self.print_error.print_update(data = data)
send_event = send_event | self.camera.print_update(data = data)
send_event = send_event | self.home_flag.print_update(data = data)
if send_event and self._client.callback is not None:
LOGGER.debug("event_printer_data_update")
self._client.callback("event_printer_data_update")
if data.get("msg", 0) == 0:
self.push_all_data = data
def info_update(self, data):
self.info.info_update(data = data)
self.home_flag.info_update(data = data)
self.ams.info_update(data = data)
if data.get("command") == "get_version":
self.get_version_data = data
def supports_feature(self, feature):
if feature == Features.AUX_FAN:
return True
elif feature == Features.CHAMBER_LIGHT:
return True
elif feature == Features.CHAMBER_FAN:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E" or self.info.device_type == "P1P" or self.info.device_type == "P1S"
elif feature == Features.CHAMBER_TEMPERATURE:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.CURRENT_STAGE:
return True
elif feature == Features.PRINT_LAYERS:
return True
elif feature == Features.AMS:
return len(self.ams.data) != 0
elif feature == Features.EXTERNAL_SPOOL:
return True
elif feature == Features.K_VALUE:
return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI"
elif feature == Features.START_TIME:
return False
elif feature == Features.START_TIME_GENERATED:
return True
elif feature == Features.AMS_TEMPERATURE:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.CAMERA_RTSP:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.CAMERA_IMAGE:
return (self._client.host != "") and (self._client._access_code != "") and (self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI")
elif feature == Features.DOOR_SENSOR:
return self.info.device_type == "X1" or self.info.device_type == "X1C" or self.info.device_type == "X1E"
elif feature == Features.MANUAL_MODE:
return self.info.device_type == "P1P" or self.info.device_type == "P1S" or self.info.device_type == "A1" or self.info.device_type == "A1MINI"
return False
def get_active_tray(self):
if self.supports_feature(Features.AMS):
if self.ams.tray_now == 255:
return None
if self.ams.tray_now == 254:
return self.external_spool
active_ams = self.ams.data[math.floor(self.ams.tray_now / 4)]
active_tray = self.ams.tray_now % 4
return None if active_ams is None else active_ams.tray[active_tray]
else:
return self.external_spool
@dataclass
class Lights:
"""Return all light related info"""
chamber_light: str
chamber_light_override: str
work_light: str
def __init__(self, client):
self._client = client
self.chamber_light = "unknown"
self.work_light = "unknown"
self.chamber_light_override = ""
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# "lights_report": [
# {
# "node": "chamber_light",
# "mode": "on"
# },
# {
# "node": "work_light", # X1 only
# "mode": "flashing"
# }
# ],
chamber_light = \
search(data.get("lights_report", []), lambda x: x.get('node', "") == "chamber_light",
{"mode": self.chamber_light}).get("mode")
if self.chamber_light_override != "":
if self.chamber_light_override == chamber_light:
self.chamber_light_override = ""
else:
self.chamber_light = chamber_light
self.work_light = \
search(data.get("lights_report", []), lambda x: x.get('node', "") == "work_light",
{"mode": self.work_light}).get("mode")
return (old_data != f"{self.__dict__}")
def TurnChamberLightOn(self):
self.chamber_light = "on"
self.chamber_light_override = "on"
if self._client.callback is not None:
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_ON)
def TurnChamberLightOff(self):
self.chamber_light = "off"
self.chamber_light_override = "off"
if self._client.callback is not None:
self._client.callback("event_light_update")
self._client.publish(CHAMBER_LIGHT_OFF)
@dataclass
class Camera:
"""Return camera related info"""
recording: str
resolution: str
rtsp_url: str
timelapse: str
def __init__(self):
self.recording = ''
self.resolution = ''
self.rtsp_url = None
self.timelapse = ''
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# "ipcam": {
# "ipcam_dev": "1",
# "ipcam_record": "enable",
# "mode_bits": 2,
# "resolution": "1080p",
# "rtsp_url": "rtsps://192.168.1.64/streaming/live/1",
# "timelapse": "disable",
# "tutk_server": "disable"
# }
self.timelapse = data.get("ipcam", {}).get("timelapse", self.timelapse)
self.recording = data.get("ipcam", {}).get("ipcam_record", self.recording)
self.resolution = data.get("ipcam", {}).get("resolution", self.resolution)
self.rtsp_url = data.get("ipcam", {}).get("rtsp_url", self.rtsp_url)
return (old_data != f"{self.__dict__}")
@dataclass
class Temperature:
"""Return all temperature related info"""
bed_temp: int
target_bed_temp: int
chamber_temp: int
nozzle_temp: int
target_nozzle_temp: int
def __init__(self):
self.bed_temp = 0
self.target_bed_temp = 0
self.chamber_temp = 0
self.nozzle_temp = 0
self.target_nozzle_temp = 0
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self.bed_temp = round(data.get("bed_temper", self.bed_temp))
self.target_bed_temp = round(data.get("bed_target_temper", self.target_bed_temp))
self.chamber_temp = round(data.get("chamber_temper", self.chamber_temp))
self.nozzle_temp = round(data.get("nozzle_temper", self.nozzle_temp))
self.target_nozzle_temp = round(data.get("nozzle_target_temper", self.target_nozzle_temp))
return (old_data != f"{self.__dict__}")
@dataclass
class Fans:
"""Return all fan related info"""
_aux_fan_speed_percentage: int
_aux_fan_speed: int
_aux_fan_speed_override: int
_aux_fan_speed_override_time: datetime
_chamber_fan_speed_percentage: int
_chamber_fan_speed: int
_chamber_fan_speed_override: int
_chamber_fan_speed_override_time: datetime
_cooling_fan_speed_percentage: int
_cooling_fan_speed: int
_cooling_fan_speed_override: int
_cooling_fan_speed_override_time: datetime
_heatbreak_fan_speed_percentage: int
_heatbreak_fan_speed: int
def __init__(self, client):
self._client = client
self._aux_fan_speed_percentage = 0
self._aux_fan_speed = 0
self._aux_fan_speed_override = 0
self._aux_fan_speed_override_time = None
self._chamber_fan_speed_percentage = 0
self._chamber_fan_speed = 0
self._chamber_fan_speed_override = 0
self._chamber_fan_speed_override_time = None
self._cooling_fan_speed_percentage = 0
self._cooling_fan_speed = 0
self._cooling_fan_speed_override = 0
self._cooling_fan_speed_override_time = None
self._heatbreak_fan_speed_percentage = 0
self._heatbreak_fan_speed = 0
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self._aux_fan_speed = data.get("big_fan1_speed", self._aux_fan_speed)
self._aux_fan_speed_percentage = fan_percentage(self._aux_fan_speed)
if self._aux_fan_speed_override_time is not None:
delta = datetime.now() - self._aux_fan_speed_override_time
if delta.seconds > 5:
self._aux_fan_speed_override_time = None
self._chamber_fan_speed = data.get("big_fan2_speed", self._chamber_fan_speed)
self._chamber_fan_speed_percentage = fan_percentage(self._chamber_fan_speed)
if self._chamber_fan_speed_override_time is not None:
delta = datetime.now() - self._chamber_fan_speed_override_time
if delta.seconds > 5:
self._chamber_fan_speed_override_time = None
self._cooling_fan_speed = data.get("cooling_fan_speed", self._cooling_fan_speed)
self._cooling_fan_speed_percentage = fan_percentage(self._cooling_fan_speed)
if self._cooling_fan_speed_override_time is not None:
delta = datetime.now() - self._cooling_fan_speed_override_time
if delta.seconds > 5:
self._cooling_fan_speed_override_time = None
self._heatbreak_fan_speed = data.get("heatbreak_fan_speed", self._heatbreak_fan_speed)
self._heatbreak_fan_speed_percentage = fan_percentage(self._heatbreak_fan_speed)
return (old_data != f"{self.__dict__}")
def set_fan_speed(self, fan: FansEnum, percentage: int):
"""Set fan speed"""
percentage = round(percentage / 10) * 10
command = fan_percentage_to_gcode(fan, percentage)
if fan == FansEnum.PART_COOLING:
self._cooling_fan_speed = percentage
self._cooling_fan_speed_override_time = datetime.now()
elif fan == FansEnum.AUXILIARY:
self._aux_fan_speed_override = percentage
self._aux_fan_speed_override_time = datetime.now()
elif fan == FansEnum.CHAMBER:
self._chamber_fan_speed_override = percentage
self._chamber_fan_speed_override_time = datetime.now()
LOGGER.debug(command)
self._client.publish(command)
if self._client.callback is not None:
self._client.callback("event_printer_data_update")
def get_fan_speed(self, fan: FansEnum) -> int:
if fan == FansEnum.PART_COOLING:
if self._cooling_fan_speed_override_time is not None:
return self._cooling_fan_speed_override
else:
return self._cooling_fan_speed_percentage
elif fan == FansEnum.AUXILIARY:
if self._aux_fan_speed_override_time is not None:
return self._aux_fan_speed_override
else:
return self._aux_fan_speed_percentage
elif fan == FansEnum.CHAMBER:
if self._chamber_fan_speed_override_time is not None:
return self._chamber_fan_speed_override
else:
return self._chamber_fan_speed_percentage
elif fan == FansEnum.HEATBREAK:
return self._heatbreak_fan_speed_percentage
@dataclass
class PrintJob:
"""Return all information related content"""
print_percentage: int
gcode_state: str
file_type_icon: str
gcode_file: str
subtask_name: str
start_time: datetime
end_time: datetime
remaining_time: int
current_layer: int
total_layers: int
print_error: int
print_weight: float
print_length: int
print_bed_type: str
print_type: str
_ams_print_weights: float
_ams_print_lengths: float
@property
def get_ams_print_weights(self) -> float:
values = {}
for i in range(16):
if self._ams_print_weights[i] != 0:
values[f"AMS Slot {i}"] = self._ams_print_weights[i]
return values
@property
def get_ams_print_lengths(self) -> float:
values = {}
for i in range(16):
if self._ams_print_lengths[i] != 0:
values[f"AMS Slot {i}"] = self._ams_print_lengths[i]
return values
def __init__(self, client):
self._client = client
self.print_percentage = 0
self.gcode_state = "unknown"
self.gcode_file = ""
self.subtask_name = ""
self.start_time = None
self.end_time = None
self.remaining_time = 0
self.current_layer = 0
self.total_layers = 0
self.print_error = 0
self.print_weight = 0
self._ams_print_weights = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self._ams_print_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.print_length = 0
self.print_bed_type = "unknown"
self.file_type_icon = "mdi:file"
self.print_type = ""
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# Example payload:
# {
# "print": {
# "gcode_start_time": "1681479206",
# "gcode_state": "IDLE",
# "mc_print_stage": "1",
# "mc_percent": 100,
# "mc_remaining_time": 0,
# "wifi_signal": "-53dBm",
# "print_type": "idle",
# "ipcam": {
# "ipcam_dev": "1",
# "ipcam_record": "enable"
# "resolution": "1080p", # X1 only
# "timelapse": "disable"
# },
# "layer_num": 0,
# "total_layer_num": 0,
self.print_percentage = data.get("mc_percent", self.print_percentage)
previous_gcode_state = self.gcode_state
self.gcode_state = data.get("gcode_state", self.gcode_state)
if previous_gcode_state != self.gcode_state:
LOGGER.debug(f"GCODE_STATE: {previous_gcode_state} -> {self.gcode_state}")
if self.gcode_state.lower() not in GCODE_STATE_OPTIONS:
LOGGER.error(f"Unknown gcode_state. Please log an issue : '{self.gcode_state}'")
self.gcode_state = "unknown"
if previous_gcode_state != self.gcode_state:
LOGGER.debug(f"GCODE_STATE: {previous_gcode_state} -> {self.gcode_state}")
self.gcode_file = data.get("gcode_file", self.gcode_file)
self.print_type = data.get("print_type", self.print_type)
if self.print_type.lower() not in PRINT_TYPE_OPTIONS:
LOGGER.debug(f"Unknown print_type. Please log an issue : '{self.print_type}'")
self.print_type = "unknown"
self.subtask_name = data.get("subtask_name", self.subtask_name)
self.file_type_icon = "mdi:file" if self.print_type != "cloud" else "mdi:cloud-outline"
self.current_layer = data.get("layer_num", self.current_layer)
self.total_layers = data.get("total_layer_num", self.total_layers)
# Initialize task data at startup.
if previous_gcode_state == "unknown" and self.gcode_state != "unknown":
self._update_task_data()
# Calculate start / end time after we update task data so we don't stomp on prepopulated values while idle on integration start.
if data.get("gcode_start_time") is not None:
if self.start_time != get_start_time(int(data.get("gcode_start_time"))):
LOGGER.debug(f"GCODE START TIME: {self.start_time}")
self.start_time = get_start_time(int(data.get("gcode_start_time")))
# Generate the end_time from the remaining_time mqtt payload value if present.
if data.get("mc_remaining_time") is not None:
existing_remaining_time = self.remaining_time
self.remaining_time = data.get("mc_remaining_time")
if self.start_time is None:
self.end_time = None
elif existing_remaining_time != self.remaining_time:
self.end_time = get_end_time(self.remaining_time)
LOGGER.debug(f"END TIME2: {self.end_time}")
# Handle print start
previously_idle = previous_gcode_state == "IDLE" or previous_gcode_state == "FAILED" or previous_gcode_state == "FINISH"
currently_idle = self.gcode_state == "IDLE" or self.gcode_state == "FAILED" or self.gcode_state == "FINISH"
if previously_idle and not currently_idle:
if self._client.callback is not None:
self._client.callback("event_print_started")
# Generate the start_time for P1P/S when printer moves from idle to another state. Original attempt with remaining time
# becoming non-zero didn't work as it never bounced to zero in at least the scenario where a print was canceled.
if self._client._device.supports_feature(Features.START_TIME_GENERATED):
# We can use the existing get_end_time helper to format date.now() as desired by passing 0.
self.start_time = get_end_time(0)
# Make sure we don't keep using a stale end time.
self.end_time = None
LOGGER.debug(f"GENERATED START TIME: {self.start_time}")
# Update task data if bambu cloud connected
self._update_task_data()
# When a print is canceled by the user, this is the payload that's sent. A couple of seconds later
# print_error will be reset to zero.
# {
# "print": {
# "print_error": 50348044,
# }
# }
isCanceledPrint = False
if data.get("print_error") == 50348044 and self.print_error == 0:
isCanceledPrint = True
if self._client.callback is not None:
self._client.callback("event_print_canceled")
self.print_error = data.get("print_error", self.print_error)
# Handle print failed
if previous_gcode_state != "unknown" and previous_gcode_state != "FAILED" and self.gcode_state == "FAILED":
if not isCanceledPrint:
if self._client.callback is not None:
self._client.callback("event_print_failed")
# Handle print finish
if previous_gcode_state != "unknown" and previous_gcode_state != "FINISH" and self.gcode_state == "FINISH":
if self._client.callback is not None:
self._client.callback("event_print_finished")
if currently_idle and not previously_idle and previous_gcode_state != "unknown":
if self.start_time != None:
# self.end_time isn't updated if we hit an AMS retract at print end but the printer does count that entire
# paused time as usage hours. So we need to use the current time instead of the last recorded end time in
# our calculation here.
duration = datetime.now() - self.start_time
# Round usage hours to 2 decimal places (about 1/2 a minute accuracy)
new_hours = round((duration.seconds / 60 / 60) * 100) / 100
LOGGER.debug(f"NEW USAGE HOURS: {new_hours}")
self._client._device.info.usage_hours += new_hours
return (old_data != f"{self.__dict__}")
# The task list is of the following form with a 'hits' array with typical 20 entries.
#
# "total": 531,
# "hits": [
# {
# "id": 35237965,
# "designId": 0,
# "designTitle": "",
# "instanceId": 0,
# "modelId": "REDACTED",
# "title": "REDACTED",
# "cover": "REDACTED",
# "status": 4,
# "feedbackStatus": 0,
# "startTime": "2023-12-21T19:02:16Z",
# "endTime": "2023-12-21T19:02:35Z",
# "weight": 34.62,
# "length": 1161,
# "costTime": 10346,
# "profileId": 35276233,
# "plateIndex": 1,
# "plateName": "",
# "deviceId": "REDACTED",
# "amsDetailMapping": [
# {
# "ams": 4,
# "sourceColor": "F4D976FF",
# "targetColor": "F4D976FF",
# "filamentId": "GFL99",
# "filamentType": "PLA",
# "targetFilamentType": "",
# "weight": 34.62
# }
# ],
# "mode": "cloud_file",
# "isPublicProfile": false,
# "isPrintable": true,
# "deviceModel": "P1P",
# "deviceName": "Bambu P1P",
# "bedType": "textured_plate"
# },
def _update_task_data(self):
if self._client.bambu_cloud.auth_token != "":
self._task_data = self._client.bambu_cloud.get_latest_task_for_printer(self._client._serial)
if self._task_data is None:
LOGGER.debug("No bambu cloud task data found for printer.")
self._client._device.cover_image.set_jpeg(None)
self.print_weight = 0
self._ams_print_weights = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self._ams_print_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.print_length = 0
self.print_bed_type = "unknown"
self.start_time = None
self.end_time = None
else:
LOGGER.debug("Updating bambu cloud task data found for printer.")
url = self._task_data.get('cover', '')
if url != "":
data = self._client.bambu_cloud.download(url)
self._client._device.cover_image.set_jpeg(data)
self.print_length = self._task_data.get('length', self.print_length * 100) / 100
self.print_bed_type = self._task_data.get('bedType', self.print_bed_type)
self.print_weight = self._task_data.get('weight', self.print_weight)
ams_print_data = self._task_data.get('amsDetailMapping', [])
self._ams_print_weights = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self._ams_print_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
if self.print_weight != 0:
for ams_data in ams_print_data:
index = ams_data['ams']
weight = ams_data['weight']
self._ams_print_weights[index] = weight
self._ams_print_lengths[index] = self.print_length * weight / self.print_weight
status = self._task_data['status']
LOGGER.debug(f"CLOUD PRINT STATUS: {status}")
if self._client._device.supports_feature(Features.START_TIME_GENERATED) and (status == 4):
# If we generate the start time (not X1), then rely more heavily on the cloud task data and
# do so uniformly so we always have matched start/end times.
# "startTime": "2023-12-21T19:02:16Z"
cloud_time_str = self._task_data.get('startTime', "")
LOGGER.debug(f"CLOUD START TIME1: {self.start_time}")
if cloud_time_str != "":
local_dt = parser.parse(cloud_time_str).astimezone(tz.tzlocal())
# Convert it to timestamp and back to get rid of timezone in printed output to match datetime objects created from mqtt timestamps.
local_dt = datetime.fromtimestamp(local_dt.timestamp())
self.start_time = local_dt
LOGGER.debug(f"CLOUD START TIME2: {self.start_time}")
# "endTime": "2023-12-21T19:02:35Z"
cloud_time_str = self._task_data.get('endTime', "")
LOGGER.debug(f"CLOUD END TIME1: {self.end_time}")
if cloud_time_str != "":
local_dt = parser.parse(cloud_time_str).astimezone(tz.tzlocal())
# Convert it to timestamp and back to get rid of timezone in printed output to match datetime objects created from mqtt timestamps.
local_dt = datetime.fromtimestamp(local_dt.timestamp())
self.end_time = local_dt
LOGGER.debug(f"CLOUD END TIME2: {self.end_time}")
@dataclass
class Info:
"""Return all device related content"""
# Device state
serial: str
device_type: str
wifi_signal: int
device_type: str
hw_ver: str
sw_ver: str
online: bool
new_version_state: int
mqtt_mode: str
nozzle_diameter: float
nozzle_type: str
usage_hours: float
def __init__(self, client):
self._client = client
self.serial = self._client._serial
self.device_type = self._client._device_type.upper()
self.wifi_signal = 0
self.hw_ver = "unknown"
self.sw_ver = "unknown"
self.online = False
self.new_version_state = 0
self.mqtt_mode = "local" if self._client._local_mqtt else "bambu_cloud"
self.nozzle_diameter = 0
self.nozzle_type = "unknown"
self.usage_hours = client._usage_hours
def set_online(self, online):
if self.online != online:
self.online = online
if self._client.callback is not None:
self._client.callback("event_printer_data_update")
def info_update(self, data):
# Example payload:
# {
# "info": {
# "command": "get_version",
# "sequence_id": "20004",
# "module": [
# {
# "name": "ota",
# "project_name": "C11",
# "sw_ver": "01.02.03.00",
# "hw_ver": "OTA",
# "sn": "..."
# },
# {
# "name": "esp32",
# "project_name": "C11",
# "sw_ver": "00.03.12.31",
# "hw_ver": "AP04",
# "sn": "..."
# },
modules = data.get("module", [])
self.device_type = get_printer_type(modules, self.device_type)
LOGGER.debug(f"Device is {self.device_type}")
self.hw_ver = get_hw_version(modules, self.hw_ver)
self.sw_ver = get_sw_version(modules, self.sw_ver)
if self._client.callback is not None:
self._client.callback("event_printer_info_update")
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# Example payload:
# {
# "print": {
# "gcode_start_time": "1681479206",
# "gcode_state": "IDLE",
# "mc_print_stage": "1",
# "mc_percent": 100,
# "mc_remaining_time": 0,
# "wifi_signal": "-53dBm",
# "print_type": "idle",
# "ipcam": {
# "ipcam_dev": "1",
# "ipcam_record": "enable"
# "resolution": "1080p", # X1 only
# "timelapse": "disable"
# },
# "layer_num": 0,
# "total_layer_num": 0,
self.wifi_signal = int(data.get("wifi_signal", str(self.wifi_signal)).replace("dBm", ""))
# Version data is provided differently for X1 and P1
# P1P example:
# "upgrade_state": {
# "sequence_id": 0,
# "progress": "",
# "status": "",
# "consistency_request": false,
# "dis_state": 1,
# "err_code": 0,
# "force_upgrade": false,
# "message": "",
# "module": "",
# "new_version_state": 1,
# "new_ver_list": [
# {
# "name": "ota",
# "cur_ver": "01.02.03.00",
# "new_ver": "01.03.00.00"
# },
# {
# "name": "ams/0",
# "cur_ver": "00.00.05.96",
# "new_ver": "00.00.06.32"
# }
# ]
# },
# X1 example:
# "upgrade_state": {
# "ahb_new_version_number": "",
# "ams_new_version_number": "",
# "consistency_request": false,
# "dis_state": 0,
# "err_code": 0,
# "force_upgrade": false,
# "message": "",
# "module": "null",
# "new_version_state": 2,
# "ota_new_version_number": "",
# "progress": "0",
# "sequence_id": 0,
# "status": "IDLE"
# },
# The 'new_version_state' value is common to indicate a new upgrade is available.
# Observed values so far are:
# 1 - upgrade available
# 2 - no upgrades available
# And the P1P lists it's versions in new_ver_list as a structured set of data with old
# and new versions provided for each component. While the X1 lists only the new version
# in separate string properties.
self.new_version_state = data.get("upgrade_state", {}).get("new_version_state", self.new_version_state)
# "nozzle_diameter": "0.4",
# "nozzle_type": "hardened_steel",
self.nozzle_diameter = float(data.get("nozzle_diameter", self.nozzle_diameter))
self.nozzle_type = data.get("nozzle_type", self.nozzle_type)
return (old_data != f"{self.__dict__}")
@property
def has_bambu_cloud_connection(self) -> bool:
return self._client.bambu_cloud.auth_token != ""
@dataclass
class AMSInstance:
"""Return all AMS instance related info"""
def __init__(self, client):
self.serial = ""
self.sw_version = ""
self.hw_version = ""
self.humidity_index = 0
self.temperature = 0
self.tray = [None] * 4
self.tray[0] = AMSTray(client)
self.tray[1] = AMSTray(client)
self.tray[2] = AMSTray(client)
self.tray[3] = AMSTray(client)
@dataclass
class AMSList:
"""Return all AMS related info"""
def __init__(self, client):
self._client = client
self.tray_now = 0
self.data = [None] * 4
def info_update(self, data):
old_data = f"{self.__dict__}"
# First determine if this the version info data or the json payload data. We use the version info to determine
# what devices to add to humidity_index assistant and add all the sensors as entities. And then then json payload data
# to populate the values for all those entities.
# The module entries are of this form (P1/X1):
# {
# "name": "ams/0",
# "project_name": "",
# "sw_ver": "00.00.05.96",
# "loader_ver": "00.00.00.00",
# "ota_ver": "00.00.00.00",
# "hw_ver": "AMS08",
# "sn": "<SERIAL>"
# }
# AMS Lite of the form:
# {
# "name": "ams_f1/0",
# "project_name": "",
# "sw_ver": "00.00.07.89",
# "loader_ver": "00.00.00.00",
# "ota_ver": "00.00.00.00",
# "hw_ver": "AMS_F102",
# "sn": "**REDACTED**"
# }
data_changed = False
module_list = data.get("module", [])
for module in module_list:
name = module["name"]
index = -1
if name.startswith("ams/"):
index = int(name[4])
elif name.startswith("ams_f1/"):
index = int(name[7])
if index != -1:
# Sometimes we get incomplete version data. We have to skip if that occurs since the serial number is
# requires as part of the home assistant device identity.
if not module['sn'] == '':
# May get data before info so create entries if necessary
if self.data[index] is None:
self.data[index] = AMSInstance(self._client)
if self.data[index].serial != module['sn']:
data_changed = True
self.data[index].serial = module['sn']
if self.data[index].sw_version != module['sw_ver']:
data_changed = True
self.data[index].sw_version = module['sw_ver']
if self.data[index].hw_version != module['hw_ver']:
data_changed = True
self.data[index].hw_version = module['hw_ver']
data_changed = data_changed or (old_data != f"{self.__dict__}")
if data_changed:
if self._client.callback is not None:
self._client.callback("event_ams_info_update")
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
# AMS json payload is of the form:
# "ams": {
# "ams": [
# {
# "id": "0",
# "humidity": "4",
# "temp": "0.0",
# "tray": [
# {
# "id": "0",
# "remain": -1,
# "k": 0.019999999552965164, # P1P only
# "n": 1.399999976158142, # P1P only
# "tag_uid": "0000000000000000",
# "tray_id_name": "",
# "tray_info_idx": "GFL99",
# "tray_type": "PLA",
# "tray_sub_brands": "",
# "tray_color": "FFFF00FF", # RRGGBBAA
# "tray_weight": "0",
# "tray_diameter": "0.00",
# "drying_temp": "0",
# "drying_time": "0",
# "bed_temp_type": "0",
# "bed_temp": "0",
# "nozzle_temp_max": "240",
# "nozzle_temp_min": "190",
# "remain": 100,
# "xcam_info": "000000000000000000000000",
# "tray_uuid": "00000000000000000000000000000000"
# },
# {
# "id": "1",
# ...
# },
# {
# "id": "2",
# ...
# },
# {
# "id": "3",
# ...
# }
# ]
# }
# ],
# "ams_exist_bits": "1",
# "tray_exist_bits": "f",
# "tray_is_bbl_bits": "f",
# "tray_now": "255",
# "tray_read_done_bits": "f",
# "tray_reading_bits": "0",
# "tray_tar": "255",
# "version": 3,
# "insert_flag": true,
# "power_on_flag": false
# },
ams_data = data.get("ams", [])
if len(ams_data) != 0:
self.tray_now = int(ams_data.get('tray_now', self.tray_now))
ams_list = ams_data.get("ams", [])
for ams in ams_list:
index = int(ams['id'])
# May get data before info so create entry if necessary
if self.data[index] is None:
self.data[index] = AMSInstance(self._client)
if self.data[index].humidity_index != int(ams['humidity']):
self.data[index].humidity_index = int(ams['humidity'])
if self.data[index].temperature != float(ams['temp']):
self.data[index].temperature = float(ams['temp'])
tray_list = ams['tray']
for tray in tray_list:
tray_id = int(tray['id'])
self.data[index].tray[tray_id].print_update(tray)
data_changed = (old_data != f"{self.__dict__}")
return data_changed
@dataclass
class AMSTray:
"""Return all AMS tray related info"""
def __init__(self, client):
self._client = client
self.empty = True
self.idx = ""
self.name = ""
self.type = ""
self.sub_brands = ""
self.color = "00000000" # RRGGBBAA
self.nozzle_temp_min = 0
self.nozzle_temp_max = 0
self.remain = 0
self.k = 0
self.tag_uid = "0000000000000000"
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
if len(data) == 1:
# If the data is exactly one entry then it's just the ID and the tray is empty.
self.empty = True
self.idx = ""
self.name = "Empty"
self.type = "Empty"
self.sub_brands = ""
self.color = "00000000" # RRGGBBAA
self.nozzle_temp_min = 0
self.nozzle_temp_max = 0
self.remain = 0
self.tag_uid = "0000000000000000"
self.k = 0
else:
self.empty = False
self.idx = data.get('tray_info_idx', self.idx)
self.name = get_filament_name(self.idx, self._client.slicer_settings.custom_filaments)
self.type = data.get('tray_type', self.type)
self.sub_brands = data.get('tray_sub_brands', self.sub_brands)
self.color = data.get('tray_color', self.color)
self.nozzle_temp_min = data.get('nozzle_temp_min', self.nozzle_temp_min)
self.nozzle_temp_max = data.get('nozzle_temp_max', self.nozzle_temp_max)
self.remain = data.get('remain', self.remain)
self.tag_uid = data.get('tag_uid', self.tag_uid)
self.k = data.get('k', self.k)
return (old_data != f"{self.__dict__}")
@dataclass
class ExternalSpool(AMSTray):
"""Return the virtual tray related info"""
def __init__(self, client):
super().__init__(client)
self._client = client
def print_update(self, data) -> bool:
# P1P virtual tray example
# "vt_tray": {
# "id": "254",
# "tag_uid": "0000000000000000",
# "tray_id_name": "",
# "tray_info_idx": "GFB99",
# "tray_type": "ABS",
# "tray_sub_brands": "",
# "tray_color": "000000FF",
# "tray_weight": "0",
# "tray_diameter": "0.00",
# "tray_temp": "0",
# "tray_time": "0",
# "bed_temp_type": "0",
# "bed_temp": "0",
# "nozzle_temp_max": "280",
# "nozzle_temp_min": "240",
# "remain": 100,
# "xcam_info": "000000000000000000000000",
# "tray_uuid": "00000000000000000000000000000000",
# "remain": 0,
# "k": 0.029999999329447746,
# "n": 1.399999976158142
# },
#
# This is exact same data as the AMS exposes so we can just defer to the AMSTray object
# to parse this json.
received_virtual_tray_data = False
tray_data = data.get("vt_tray", {})
if len(tray_data) != 0:
received_virtual_tray_data = super().print_update(tray_data)
return received_virtual_tray_data
@dataclass
class Speed:
"""Return speed profile information"""
_id: int
name: str
modifier: int
def __init__(self, client):
self._client = client
self._id = 2
self.name = get_speed_name(2)
self.modifier = 100
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self._id = int(data.get("spd_lvl", self._id))
self.name = get_speed_name(self._id)
self.modifier = int(data.get("spd_mag", self.modifier))
return (old_data != f"{self.__dict__}")
def SetSpeed(self, option: str):
for id, speed in SPEED_PROFILE.items():
if option == speed:
self._id = id
self.name = speed
command = SPEED_PROFILE_TEMPLATE
command['print']['param'] = f"{id}"
self._client.publish(command)
if self._client.callback is not None:
self._client.callback("event_speed_update")
@dataclass
class StageAction:
"""Return Stage Action information"""
_id: int
_print_type: str
description: str
def __init__(self):
self._id = 255
self._print_type = ""
self.description = get_current_stage(self._id)
def print_update(self, data) -> bool:
old_data = f"{self.__dict__}"
self._print_type = data.get("print_type", self._print_type)
if self._print_type.lower() not in PRINT_TYPE_OPTIONS:
self._print_type = "unknown"
self._id = int(data.get("stg_cur", self._id))
if (self._print_type == "idle") and (self._id == 0):
# On boot the printer reports stg_cur == 0 incorrectly instead of 255. Attempt to correct for this.
self._id = 255
self.description = get_current_stage(self._id)
return (old_data != f"{self.__dict__}")
@dataclass
class HMSList:
"""Return all HMS related info"""
_count: int
_errors: dict
def __init__(self, client):
self._client = client
self._count = 0
self._errors = {}
self._errors["Count"] = 0
def print_update(self, data) -> bool:
# Example payload:
# "hms": [
# {
# "attr": 50331904, # In hex this is 0300 0100
# "code": 65543 # In hex this is 0001 0007
# }
# ],
# So this is HMS_0300_0100_0001_0007:
# https://wiki.bambulab.com/en/x1/troubleshooting/hmscode/0300_0100_0001_0007
# 'The heatbed temperature is abnormal; the sensor may have an open circuit.'
if 'hms' in data.keys():
hmsList = data.get('hms', [])
self._count = len(hmsList)
errors = {}
errors["Count"] = self._count
index: int = 0
for hms in hmsList:
index = index + 1
attr = int(hms['attr'])
code = int(hms['code'])
hms_notif = HMSNotification(attr=attr, code=code)
errors[f"{index}-Error"] = f"HMS_{hms_notif.hms_code}: {get_HMS_error_text(hms_notif.hms_code)}"
errors[f"{index}-Wiki"] = hms_notif.wiki_url
errors[f"{index}-Severity"] = hms_notif.severity
#LOGGER.debug(f"HMS error for '{hms_notif.module}' and severity '{hms_notif.severity}': HMS_{hms_notif.hms_code}")
#errors[f"{index}-Module"] = hms_notif.module # commented out to avoid bloat with current structure
if self._errors != errors:
LOGGER.debug("Updating HMS error list.")
self._errors = errors
if self._count != 0:
LOGGER.warning(f"HMS ERRORS: {errors}")
if self._client.callback is not None:
self._client.callback("event_hms_errors")
return True
return False
@property
def errors(self) -> dict:
#LOGGER.debug(f"PROPERTYCALL: get_hms_errors")
return self._errors
@property
def error_count(self) -> int:
return self._count
@dataclass
class PrintErrorList:
"""Return all print_error related info"""
_error: dict
_count: int
def __init__(self, client):
self._client = client
self._error = {}
def print_update(self, data) -> bool:
# Example payload:
# "print_error": 117473286
# So this is 07008006 which we make more human readable to 0700-8006
# https://e.bambulab.com/query.php?lang=en
# 'Unable to feed filament into the extruder. This could be due to entangled filament or a stuck spool. If not, please check if the AMS PTFE tube is connected.'
if 'print_error' in data.keys():
errors = {}
print_error_code = data.get('print_error')
if print_error_code != 0:
hex_conversion = f'0{int(print_error_code):x}'
print_error_code_hex = hex_conversion[slice(0,4,1)] + "_" + hex_conversion[slice(4,8,1)]
errors = {}
errors[f"Code"] = f"{print_error_code_hex.upper()}"
errors[f"Error"] = f"{print_error_code_hex.upper()}: {get_print_error_text(print_error_code)}"
# LOGGER.warning(f"PRINT ERRORS: {errors}") # This will emit a message to home assistant log every 1 second if enabled
if self._error != errors:
self._error = errors
if self._client.callback is not None:
self._client.callback("event_print_error")
# We send the error event directly so always return False for the general data event.
return False
@property
def error(self) -> dict:
return self._error
@property
def on(self) -> int:
return self._error is not None
@dataclass
class HMSNotification:
"""Return an HMS object and all associated details"""
def __init__(self, attr: int = 0, code: int = 0):
self.attr = attr
self.code = code
@property
def severity(self):
return get_HMS_severity(self.code)
@property
def module(self):
return get_HMS_module(self.attr)
@property
def hms_code(self):
if self.attr > 0 and self.code > 0:
return f'{int(self.attr / 0x10000):0>4X}_{self.attr & 0xFFFF:0>4X}_{int(self.code / 0x10000):0>4X}_{self.code & 0xFFFF:0>4X}' # 0300_0100_0001_0007
return ""
@property
def wiki_url(self):
if self.attr > 0 and self.code > 0:
return f"https://wiki.bambulab.com/en/x1/troubleshooting/hmscode/{get_generic_AMS_HMS_error_code(self.hms_code)}"
return ""
@dataclass
class ChamberImage:
"""Returns the latest jpeg data from the P1P camera"""
def __init__(self, client):
self._client = client
self._bytes = bytearray()
def set_jpeg(self, bytes):
self._bytes = bytes
def get_jpeg(self) -> bytearray:
return self._bytes.copy()
@dataclass
class CoverImage:
"""Returns the cover image from the Bambu API"""
def __init__(self, client):
self._client = client
self._bytes = bytearray()
self._image_last_updated = datetime.now()
if self._client.callback is not None:
self._client.callback("event_printer_cover_image_update")
def set_jpeg(self, bytes):
self._bytes = bytes
self._image_last_updated = datetime.now()
def get_jpeg(self) -> bytearray:
return self._bytes
def get_last_update_time(self) -> datetime:
return self._image_last_updated
@dataclass
class HomeFlag:
"""Contains parsed _values from the homeflag sensor"""
_value: int
_sw_ver: str
_device_type: str
def __init__(self, client):
self._value = 0
self._client = client
self._sw_ver = ""
self._device_type = ""
def info_update(self, data):
modules = data.get("module", [])
self._device_type = get_printer_type(modules, self._device_type)
self._sw_ver = get_sw_version(modules, self._sw_ver)
def print_update(self, data: dict) -> bool:
old_data = f"{self.__dict__}"
self._value = int(data.get("home_flag", str(self._value)))
return (old_data != f"{self.__dict__}")
@property
def door_open(self) -> bool or None:
if not self.door_open_available:
return None
return (self._value & Home_Flag_Values.DOOR_OPEN) != 0
@property
def door_open_available(self) -> bool:
if not self._client._device.supports_feature(Features.DOOR_SENSOR):
return False
if (self._device_type in ["X1", "X1C"] and version.parse(self._sw_ver) < version.parse("01.07.00.00")):
return False
return True
@property
def x_axis_homed(self) -> bool:
return (self._value & Home_Flag_Values.X_AXIS) != 0
@property
def y_axis_homed(self) -> bool:
return (self._value & Home_Flag_Values.Y_AXIS) != 0
@property
def z_axis_homed(self) -> bool:
return (self._value & Home_Flag_Values.Z_AXIS) != 0
@property
def homed(self) -> bool:
return self.x_axis_homed and self.y_axis_homed and self.z_axis_homed
@property
def is_220V(self) -> bool:
return (self._value & Home_Flag_Values.VOLTAGE220) != 0
@property
def xcam_autorecovery_steploss(self) -> bool:
return (self._value & Home_Flag_Values.XCAM_AUTO_RECOVERY_STEP_LOSS) != 0
@property
def camera_recording(self) -> bool:
return (self._value & Home_Flag_Values.CAMERA_RECORDING) != 0
@property
def ams_calibrate_remaining(self) -> bool:
return (self._value & Home_Flag_Values.AMS_CALIBRATE_REMAINING) != 0
@property
def sdcard_present(self) -> bool:
return (self._value & Home_Flag_Values.SD_CARD_STATE) != SdcardState.NO_SDCARD
@property
def sdcard_normal(self) -> bool:
return self.sdcard_present and (self._value & Home_Flag_Values.HAS_SDCARD_ABNORMAL) != SdcardState.HAS_SDCARD_ABNORMAL
@property
def ams_auto_switch_filament(self) -> bool:
return (self._value & Home_Flag_Values.AMS_AUTO_SWITCH) != 0
@property
def wired_network_connection(self):
return (self._value & Home_Flag_Values.WIRED_NETWORK) != 0
@property
def xcam_prompt_sound(self) -> bool:
return (self._value & Home_Flag_Values.XCAM_ALLOW_PROMPT_SOUND) != 0
@property
def supports_motor_noise_calibration(self) -> bool:
return (self._value & Home_Flag_Values.SUPPORTS_MOTOR_CALIBRATION) != 0
@property
def p1s_upgrade_supported(self) -> bool:
return (self._value & Home_Flag_Values.SUPPORTED_PLUS) != 0
@property
def p1s_upgrade_installed(self) -> bool:
return (self._value & Home_Flag_Values.INSTALLED_PLUS) != 0
class SlicerSettings:
custom_filaments: dict = field(default_factory=dict)
def __init__(self, client):
self._client = client
self.custom_filaments = {}
def _load_custom_filaments(self, slicer_settings: dict):
if 'private' in slicer_settings["filament"]:
for filament in slicer_settings['filament']['private']:
name = filament["name"]
if " @" in name:
name = name[:name.index(" @")]
if filament.get("filament_id", "") != "":
self.custom_filaments[filament["filament_id"]] = name
LOGGER.debug("Got custom filaments: %s", self.custom_filaments)
def update(self):
self.custom_filaments = {}
if self._client.bambu_cloud.auth_token != "":
LOGGER.debug("Loading slicer settings")
slicer_settings = self._client.bambu_cloud.get_slicer_settings()
if slicer_settings is not None:
self._load_custom_filaments(slicer_settings)