diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ebd768b --- /dev/null +++ b/.gitignore @@ -0,0 +1,134 @@ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Flask stuff +instance/ +.webassets-cache + +# Scrapy stuff +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# IDE files +.idea/ + +config.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..155c19e --- /dev/null +++ b/app.py @@ -0,0 +1,222 @@ +import json +import uuid + +import requests +from flask import Flask, request, render_template_string + +from config import BASE_URL, PRINTER_ID, SPOOLMAN_API_URL +from filament import generate_filament_brand_code, generate_filament_temperatures +from messages import AMS_FILAMENT_SETTING +from mqtt_bambulab import fetchSpools, getLastAMSConfig, publish, getMqttClient + +app = Flask(__name__) + +@app.route("/spool_info") +def spool_info(): + tag_id = request.args.get("tag_id") + + last_ams_config = getLastAMSConfig() + ams_data = last_ams_config.get("ams", []) + vt_tray_data = last_ams_config.get("vt_tray", {}) + + print(ams_data) + print(vt_tray_data) + + if not tag_id: + return "TAG ID is required as a query parameter (e.g., ?tagid=RFID123)" + + spools = fetchSpools() + current_spool = None + for spool in spools: + if not spool.get("extra", {}).get("tag"): + continue + tag = json.loads(spool["extra"]["tag"]) + if tag != tag_id: + continue + current_spool = spool + + # Generate HTML for AMS selection + html = f""" +

Spool information

+ """ + if current_spool: + html += f"

Current Spool: {current_spool}

" + html += """ +

AMS

+ " + html += f""" +

External Spool

+ + """ + + return render_template_string(html) + +@app.route("/tray_load") +def tray_load(): + tag_id = request.args.get("tag_id") + ams_id = request.args.get("ams") + tray_id = request.args.get("tray") + spool_id = request.args.get("spool_id") + + if not all([tag_id, ams_id, tray_id, spool_id]): + return "Missing RFID, AMS ID, or Tray ID or spool_id." + + try: + # Update Spoolman with the selected tray + response = requests.patch(f"{SPOOLMAN_API_URL}/spool/{spool_id}", json={ + "extra": { + "tag": json.dumps(tag_id), + "active_tray": json.dumps(f"{PRINTER_ID}_{ams_id}_{tray_id}"), + } + }) + print(response.status_code) + print(response.text) + + response = requests.get(f"{SPOOLMAN_API_URL}/spool/{spool_id}") + print(response.status_code) + print(response.text) + + spool_data = json.loads(response.text) + ams_message = AMS_FILAMENT_SETTING + ams_message["print"]["sequence_id"] = 0 + ams_message["print"]["ams_id"] = int(ams_id) + ams_message["print"]["tray_id"] = int(tray_id) + ams_message["print"]["tray_color"] = spool_data["filament"]["color_hex"].upper()+"FF" + + if "nozzle_temperature" in spool_data["filament"]["extra"]: + nozzle_temperature_range = spool_data["filament"]["extra"]["nozzle_temperature"].strip("[]").split(",") + ams_message["print"]["nozzle_temp_min"] = int(nozzle_temperature_range[0]) + ams_message["print"]["nozzle_temp_max"] = int(nozzle_temperature_range[1]) + else: + nozzle_temperature_range_obj = generate_filament_temperatures(spool_data["filament"]["material"], + spool_data["filament"]["vendor"]["name"]) + ams_message["print"]["nozzle_temp_min"] = int(nozzle_temperature_range_obj["filament_min_temp"]) + ams_message["print"]["nozzle_temp_max"] = int(nozzle_temperature_range_obj["filament_max_temp"]) + + ams_message["print"]["tray_type"] = spool_data["filament"]["material"] + filament_brand_code = generate_filament_brand_code(spool_data["filament"]["material"], + spool_data["filament"]["vendor"]["name"], + spool_data["filament"]["extra"].get("type", "")) + ams_message["print"]["tray_info_idx"] = filament_brand_code["brand_code"] + + #ams_message["print"]["tray_sub_brands"] = filament_brand_code["sub_brand_code"] + ams_message["print"]["tray_sub_brands"] = "" + + print(ams_message) + publish(getMqttClient(), ams_message) + + return f""" +

Success

+

Updated Spool ID {spool_id} with TAG id {tag_id} to AMS {ams_id}, Tray {tray_id}.

+ """ + except Exception as e: + return f"

Error

{str(e)}

" + +@app.route("/") +def home(): + try: + # Update Spoolman with the selected tray + spools = fetchSpools() + + last_ams_config = getLastAMSConfig() + ams_data = last_ams_config.get("ams", []) + vt_tray_data = last_ams_config.get("vt_tray", {}) + + html = """ +

Current AMS Configuration

+ """ + html += """ +

AMS

+ " + html += f""" +

External Spool

+ + """ + html += """ +

Add new TAG

+ " + return html + except Exception as e: + return f"

Error

{str(e)}

" + +@app.route("/assign_tag") +def assign_tag(): + spool_id = request.args.get("spool_id") + + if not spool_id: + return "spool ID is required as a query parameter (e.g., ?spool_id=1)" + + myuuid = str(uuid.uuid4()) + + resp = requests.patch(f"{SPOOLMAN_API_URL}/spool/{spool_id}", json={ + "extra": { + "tag": json.dumps(myuuid), + } + }) + + print(resp.status_code) + print(resp.raw) + + return f""" + +
+ +
+ + NFC Write + + + + """ + +@app.route('/', methods=['GET']) +def health(): + return "OK", 200 diff --git a/config.template b/config.template new file mode 100644 index 0000000..39a3e02 --- /dev/null +++ b/config.template @@ -0,0 +1,8 @@ +BASE_URL = "" # Where will this app be accessible +PRINTER_ID = "" # Printer serial number - Run init_bambulab.py +PRINTER_CODE = "" # Printer access code - Run init_bambulab.py +PRINTER_IP = "" # Printer local IP address - Check wireless on printer +SPOOLMAN_PROTO = "" +SPOOLMAN_IP = "" # Spoolman IP address +SPOOLMAN_PORT = "" # Spoolman port +SPOOLMAN_API_URL = f"{SPOOLMAN_PROTO}://{SPOOLMAN_IP}:{SPOOLMAN_PORT}/api/v1" diff --git a/filament.py b/filament.py new file mode 100644 index 0000000..82f09e4 --- /dev/null +++ b/filament.py @@ -0,0 +1,179 @@ +# mapping adapted from https://github.com/spuder/OpenSpool/blob/main/firmware/conf.d/automation.yaml +def generate_filament_brand_code(filament_type, filament_brand, filament_variant): + filament_sub_brand = "" + filament_brand_code = "" + + if filament_type == "TPU": + if filament_brand == "Bambu": + filament_brand_code = "GFU01" + filament_sub_brand = "TPU 95A" + else: + filament_brand_code = "GFU99" + filament_sub_brand = "TPU" + + elif filament_type == "PLA": + if filament_brand == "PolyTerra": + filament_brand_code = "GFL01" + filament_sub_brand = "PolyTerra PLA" + elif filament_brand == "PolyLite": + filament_brand_code = "GFL00" + filament_sub_brand = "PolyLite PLA" + elif filament_brand == "Bambu": + if filament_variant == "Basic": + filament_brand_code = "GFA00" + filament_sub_brand = "PLA Basic" + elif filament_variant == "Matte": + filament_brand_code = "GFA01" + filament_sub_brand = "PLA Matte" + elif filament_variant == "Metal": + filament_brand_code = "GFA02" + filament_sub_brand = "PLA Metal" + elif filament_variant == "Impact": + filament_brand_code = "GFA03" + filament_sub_brand = "PLA Impact" + else: + filament_brand_code = "GFA00" + filament_sub_brand = "PLA Basic" + else: + filament_brand_code = "GFL99" + filament_sub_brand = "PLA" + + elif filament_type == "PETG": + if filament_brand == "Overture": + filament_brand_code = "GFG99" # Placeholder code + filament_sub_brand = "PETG" + else: + filament_brand_code = "GFG99" + filament_sub_brand = "PETG" + + elif filament_type == "PET-CF": + if filament_brand == "Bambu": + filament_brand_code = "GFT00" + filament_sub_brand = "PET-CF" + else: + filament_brand_code = "GFG99" + filament_sub_brand = "PET-CF" + + elif filament_type == "ASA": + filament_brand_code = "GFB98" + filament_sub_brand = "ASA" + + elif filament_type == "ABS": + if filament_brand == "Bambu": + filament_brand_code = "GFB00" + filament_sub_brand = "ABS" + else: + filament_brand_code = "GFB99" + filament_sub_brand = "ABS" + + elif filament_type == "PC": + if filament_brand == "Bambu": + filament_brand_code = "GFC00" + filament_sub_brand = "PC" + else: + filament_brand_code = "GFC99" + filament_sub_brand = "PC" + + elif filament_type == "PA": + filament_brand_code = "GFN99" + filament_sub_brand = "PA" + + elif filament_type == "PA-CF": + if filament_brand == "Bambu": + filament_brand_code = "GFN03" + filament_sub_brand = "PA-CF" + else: + filament_brand_code = "GFN98" + filament_sub_brand = "PA-CF" + + elif filament_type == "PLA-CF": + filament_brand_code = "GFL98" + filament_sub_brand = "PLA-CF" + + elif filament_type == "PVA": + filament_brand_code = "GFS99" + filament_sub_brand = "PVA" + + elif filament_type == "Support": + if filament_variant == "G": + filament_brand_code = "GFS01" + filament_sub_brand = "Support G" + elif filament_variant == "W": + filament_brand_code = "GFS00" + filament_sub_brand = "Support W" + else: + filament_brand_code = "GFS00" + filament_sub_brand = "Support W" + else: + print(f"Unknown filament type: {filament_type}") + + return {"brand_code": filament_brand_code, + "sub_brand_code": filament_sub_brand + } + + +def generate_filament_temperatures(filament_type, filament_brand): + filament_min_temp = 150 + filament_max_temp = 300 + + if not filament_type: + print("Skipping temperature generation as filament_type is empty.") + return + + if filament_type == "TPU": + if filament_brand == "Generic": + filament_min_temp = 200 + filament_max_temp = 250 + else: + print(f"Unknown temperatures for TPU brand: {filament_brand}") + filament_min_temp = 200 + filament_max_temp = 250 + elif filament_type == "PLA": + if filament_brand == "Generic": + filament_min_temp = 190 + filament_max_temp = 240 + else: + print(f"Unknown temperatures for PLA brand: {filament_brand}") + filament_min_temp = 190 + filament_max_temp = 240 + elif filament_type == "PETG": + if filament_brand == "Generic": + filament_min_temp = 220 + filament_max_temp = 270 + else: + print(f"Unknown temperatures for PETG brand: {filament_brand}") + filament_min_temp = 220 + filament_max_temp = 270 + elif filament_type == "ASA": + if filament_brand == "Generic": + filament_min_temp = 240 + filament_max_temp = 280 + else: + print(f"Unknown temperatures for ASA brand: {filament_brand}") + filament_min_temp = 240 + filament_max_temp = 280 + + elif filament_type == "PC": + if filament_brand == "Generic": + filament_min_temp = 250 + filament_max_temp = 300 + else: + print(f"Unknown temperatures for PC brand: {filament_brand}") + filament_min_temp = 250 + filament_max_temp = 300 + + + elif filament_type == "PA": + if filament_brand == "Generic": + filament_min_temp = 260 + filament_max_temp = 300 + else: + print(f"Unknown temperatures for PA brand: {filament_brand}") + filament_min_temp = 260 + filament_max_temp = 300 + else: + print(f"Unknown filament type: {filament_type}") + + return {"filament_min_temp": filament_min_temp, + "filament_max_temp": filament_max_temp + } diff --git a/messages.py b/messages.py new file mode 100644 index 0000000..5c92b9a --- /dev/null +++ b/messages.py @@ -0,0 +1,16 @@ +GET_VERSION = {"info": {"sequence_id": "0", "command": "get_version"}} +PUSH_ALL = {"pushing": {"sequence_id": "0", "command": "pushall"}} +AMS_FILAMENT_SETTING = {"print": + { + "sequence_id": "0", + "command": "ams_filament_setting", + "ams_id": None, + "tray_id": None, + "tray_color": None, + "nozzle_temp_min": None, + "nozzle_temp_max": None, + "tray_type": None, + "setting_id": "", + "tray_info_idx": None + } +} diff --git a/mqtt_bambulab.py b/mqtt_bambulab.py new file mode 100644 index 0000000..0c2a97a --- /dev/null +++ b/mqtt_bambulab.py @@ -0,0 +1,118 @@ +import json +import ssl +import traceback +from threading import Thread + +import paho.mqtt.client as mqtt +import requests + +from config import PRINTER_ID, PRINTER_CODE, PRINTER_IP, SPOOLMAN_API_URL +from messages import GET_VERSION, PUSH_ALL + + +def num2letter(num): + return chr(ord("A") + int(num)) + +def publish(client, msg): + result = client.publish(f"device/{PRINTER_ID}/request", json.dumps(msg)) + status = result[0] + if status == 0: + print(f"Sent {msg} to topic device/{PRINTER_ID}/request") + return True + + print(f"Failed to send message to topic device/{PRINTER_ID}/request") + return False + +def on_message(client, userdata, msg): + global LAST_AMS_CONFIG + # TODO: Consume spool + try: + data = json.loads(msg.payload.decode()) + if "print" in data and "vt_tray" in data["print"]: + print(data) + LAST_AMS_CONFIG["vt_tray"] = data["print"]["vt_tray"] + + if "print" in data and "ams" in data["print"] and "ams" in data["print"]["ams"]: + print(data) + LAST_AMS_CONFIG["ams"] = data["print"]["ams"]["ams"] + + print(LAST_AMS_CONFIG) + for ams in data["print"]["ams"]["ams"]: + print(f"AMS [{num2letter(ams['id'])}] (hum: {ams['humidity']}, temp: {ams['temp']}ºC)") + for tray in ams["tray"]: + if "tray_sub_brands" in tray: + print(f" - [{num2letter(ams['id'])}{tray['id']}] {tray['tray_sub_brands']} {tray['tray_color']} ({str(tray['remain']).zfill(3)}%) [[ {tray['tag_uid']} ]]") + + found = False + for spool in SPOOLS: + if not spool.get("extra", {}).get("tag"): + continue + tag = json.loads(spool["extra"]["tag"]) + if tag != tray["tag_uid"]: + continue + + found = True + + resp = requests.patch(f"{SPOOLMAN_API_URL}/spool/{spool['id']}", json={ + "extra": { + "tag": spool["extra"]["tag"], + "active_tray": json.dumps(f"{PRINTER_ID}_{ams['id']}_{tray['id']}"), + } + }) + + print(resp.text) + print(resp.status_code) + + #TODO: remove active_tray from inactive spools + #Doesn't work for AMS Lite + #requests.patch(f"http://{SPOOLMAN_IP}:7912/api/v1/spool/{spool['id']}", json={ + # "remaining_weight": tray["remain"] / 100 * tray["tray_weight"] + #}) + + if not found: + print(" - Not found. Update spool tag!") + except Exception as e: + traceback.print_exc() + +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + client.subscribe(f"device/{PRINTER_ID}/report") + publish(client, GET_VERSION) + publish(client, PUSH_ALL) + +# Fetch spools from spoolman +def fetchSpools(): + global SPOOLS + response = requests.get(f"{SPOOLMAN_API_URL}/spool") + SPOOLS = response.json() + return SPOOLS + +def async_subscribe(): + global MQTT_CLIENT + MQTT_CLIENT = mqtt.Client() + MQTT_CLIENT.username_pw_set("bblp", PRINTER_CODE) + ssl_ctx = ssl.create_default_context() + ssl_ctx.check_hostname = False + ssl_ctx.verify_mode = ssl.CERT_NONE + MQTT_CLIENT.tls_set_context(ssl_ctx) + MQTT_CLIENT.tls_insecure_set(True) + MQTT_CLIENT.on_connect = on_connect + MQTT_CLIENT.on_message = on_message + MQTT_CLIENT.connect(PRINTER_IP, 8883) + MQTT_CLIENT.loop_forever() + +# Start the asynchronous processing in a separate thread +thread = Thread(target=async_subscribe) +thread.start() + +def getLastAMSConfig(): + global LAST_AMS_CONFIG + return LAST_AMS_CONFIG + +def getMqttClient(): + global MQTT_CLIENT + return MQTT_CLIENT + +MQTT_CLIENT = {} # Global variable storing MQTT Client +LAST_AMS_CONFIG = {} # Global variable storing last AMS configuration +SPOOLS = fetchSpools() # Global variable storing latest spool from spoolman diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d73bc56 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +paho-mqtt==2.1.0 +requests==2.32.3 +flask==3.1.0 +pyopenssl==24.3.0 diff --git a/scripts/init_bambulab.py b/scripts/init_bambulab.py new file mode 100644 index 0000000..dae517e --- /dev/null +++ b/scripts/init_bambulab.py @@ -0,0 +1,111 @@ +import requests +import json + +# Prompt the user for their Bambu Lab username and password +bambuUsername = input("Enter your Bambu Lab username: ") +bambuPassword = input("Enter your Bambu Lab password: ") + +# Define the User-Agent and other common headers +user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" +headers = { + "Content-Type": "application/json", + "User-Agent": user_agent, + "Accept": "application/json, text/plain, */*", + "Accept-Language": "en-US,en;q=0.9", + "Origin": "https://bambulab.com", + "Referer": "https://bambulab.com/" +} + +# Perform the login request with custom headers +auth_payload = { + "account": bambuUsername, + "password": bambuPassword +} +auth_response = requests.post("https://api.bambulab.com/v1/user-service/user/login", headers=headers, json=auth_payload) + +# Print the auth response for debugging +print("Step 1. Auth login response:", auth_response.text) + +# Check if authentication was successful +auth_json = auth_response.json() +if not auth_json.get("success"): + print("Authentication failed, attempting code verification") + + # Send the verification code request + send_code_payload = { + "email": bambuUsername, + "type": "codeLogin" + } + + if auth_json.get("loginType") == "verifyCode": + send_code_response = requests.post("https://api.bambulab.com/v1/user-service/user/sendemail/code", headers=headers, + json=send_code_payload) + if send_code_response.status_code == 200: + print("Verification code sent successfully. Check your email.") + else: + print("Failed to send verification code.") + print("Response:", send_code_response.text) + exit(1) + + verify_code = input("Enter your access code: ") + verify_payload = { + "account": bambuUsername, + "code": verify_code + } + api_response = requests.post("https://api.bambulab.com/v1/user-service/user/login", headers=headers, + json=verify_payload) + print("Step 2 - API verify code response:", api_response.text) + + api_token = api_response.json() + if api_token: + token = api_token.get("accessToken") + + if not token: + print("Failed to extract token") + exit(1) + + elif auth_json.get("loginType") == "tfa": + tfa_auth = auth_json.get("tfaKey") + tfa_code = input("Enter your MFA access code: ") + verify_payload = { + "tfaKey": tfa_auth, + "tfaCode": tfa_code + } + print("payload: ", verify_payload) + api2_response = requests.post("https://bambulab.com/api-sign-in/tfa", headers=headers, json=verify_payload) + print("Step 2 - API MFA response:", api2_response.text) + + cookies = api2_response.cookies.get_dict() + token_from_tfa = cookies.get("token") + print("tokenFromTFA:", token_from_tfa) + + token = token_from_tfa + + if not token: + print("Failed to extract token") + exit(1) + +else: + # If authentication was successful in the first attempt, get the token from the JSON response + token = auth_json.get("accessToken") + +if not token: + print("Unable to authenticate or verify. Exiting...") + exit(1) + +# Perform the API request to fetch the image with custom headers +headers["Authorization"] = f"Bearer {token}" +api_response = requests.get("https://api.bambulab.com/v1/iot-service/api/user/bind", headers=headers) + +# Print the API response for debugging +print("API to fetch info:", api_response.text) + +# Check if the API request was successful +if api_response.status_code != 200: + print("API request failed") + exit(1) + +# Extract and display the relevant information +api_json = api_response.json() + +print(json.dumps(api_json, indent=2)) diff --git a/wsgi.py b/wsgi.py new file mode 100644 index 0000000..d280c0f --- /dev/null +++ b/wsgi.py @@ -0,0 +1,5 @@ +from app import app +import os + +if __name__ == '__main__': + app.run(debug=os.getenv('DEBUG', False), port=os.getenv('PORT', 443), host=os.getenv('HOST', '0.0.0.0'), ssl_context='adhoc')