Initial commit

This commit is contained in:
Filip Bednárik 2024-12-12 23:05:19 +01:00
parent 9d964901f7
commit c036365600
9 changed files with 797 additions and 0 deletions

134
.gitignore vendored Normal file
View File

@ -0,0 +1,134 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Flask stuff
instance/
.webassets-cache
# Scrapy stuff
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# IDE files
.idea/
config.py

222
app.py Normal file
View File

@ -0,0 +1,222 @@
import json
import uuid
import requests
from flask import Flask, request, render_template_string
from config import BASE_URL, PRINTER_ID, SPOOLMAN_API_URL
from filament import generate_filament_brand_code, generate_filament_temperatures
from messages import AMS_FILAMENT_SETTING
from mqtt_bambulab import fetchSpools, getLastAMSConfig, publish, getMqttClient
app = Flask(__name__)
@app.route("/spool_info")
def spool_info():
tag_id = request.args.get("tag_id")
last_ams_config = getLastAMSConfig()
ams_data = last_ams_config.get("ams", [])
vt_tray_data = last_ams_config.get("vt_tray", {})
print(ams_data)
print(vt_tray_data)
if not tag_id:
return "TAG ID is required as a query parameter (e.g., ?tagid=RFID123)"
spools = fetchSpools()
current_spool = None
for spool in spools:
if not spool.get("extra", {}).get("tag"):
continue
tag = json.loads(spool["extra"]["tag"])
if tag != tag_id:
continue
current_spool = spool
# Generate HTML for AMS selection
html = f"""
<h1>Spool information</h1>
"""
if current_spool:
html += f"<p>Current Spool: {current_spool}</p>"
html += """
<h1>AMS</h1>
<ul>
"""
for ams in ams_data:
html += f"<li>AMS {ams['id']} (Humidity: {ams['humidity']}%, Temp: {ams['temp']}°C)<ul>"
for tray in ams["tray"]:
tray_status = f"[{tray['tray_sub_brands']} {tray['tray_color']}]"
html += f"""
<li>
Tray {tray['id']} {tray_status} - Remaining: {tray['remain']}%
<a href="/tray_load?spool_id={current_spool['id']}&tag_id={tag_id}&ams={ams['id']}&tray={tray['id']}">Pick this tray</a>
</li>
"""
html += "</ul></li>"
html += "</ul>"
html += f"""
<h1>External Spool</h1>
<ul>
<li>Tray {vt_tray_data['id']} [{vt_tray_data['tray_sub_brands']} {vt_tray_data['tray_color']}] - Remaining: {vt_tray_data['remain']}%
<a href="/tray_load?spool_id={current_spool['id']}&tag_id={tag_id}&ams={vt_tray_data['id']}&tray=255">Pick this tray</a></li>
</ul>
"""
return render_template_string(html)
@app.route("/tray_load")
def tray_load():
tag_id = request.args.get("tag_id")
ams_id = request.args.get("ams")
tray_id = request.args.get("tray")
spool_id = request.args.get("spool_id")
if not all([tag_id, ams_id, tray_id, spool_id]):
return "Missing RFID, AMS ID, or Tray ID or spool_id."
try:
# Update Spoolman with the selected tray
response = requests.patch(f"{SPOOLMAN_API_URL}/spool/{spool_id}", json={
"extra": {
"tag": json.dumps(tag_id),
"active_tray": json.dumps(f"{PRINTER_ID}_{ams_id}_{tray_id}"),
}
})
print(response.status_code)
print(response.text)
response = requests.get(f"{SPOOLMAN_API_URL}/spool/{spool_id}")
print(response.status_code)
print(response.text)
spool_data = json.loads(response.text)
ams_message = AMS_FILAMENT_SETTING
ams_message["print"]["sequence_id"] = 0
ams_message["print"]["ams_id"] = int(ams_id)
ams_message["print"]["tray_id"] = int(tray_id)
ams_message["print"]["tray_color"] = spool_data["filament"]["color_hex"].upper()+"FF"
if "nozzle_temperature" in spool_data["filament"]["extra"]:
nozzle_temperature_range = spool_data["filament"]["extra"]["nozzle_temperature"].strip("[]").split(",")
ams_message["print"]["nozzle_temp_min"] = int(nozzle_temperature_range[0])
ams_message["print"]["nozzle_temp_max"] = int(nozzle_temperature_range[1])
else:
nozzle_temperature_range_obj = generate_filament_temperatures(spool_data["filament"]["material"],
spool_data["filament"]["vendor"]["name"])
ams_message["print"]["nozzle_temp_min"] = int(nozzle_temperature_range_obj["filament_min_temp"])
ams_message["print"]["nozzle_temp_max"] = int(nozzle_temperature_range_obj["filament_max_temp"])
ams_message["print"]["tray_type"] = spool_data["filament"]["material"]
filament_brand_code = generate_filament_brand_code(spool_data["filament"]["material"],
spool_data["filament"]["vendor"]["name"],
spool_data["filament"]["extra"].get("type", ""))
ams_message["print"]["tray_info_idx"] = filament_brand_code["brand_code"]
#ams_message["print"]["tray_sub_brands"] = filament_brand_code["sub_brand_code"]
ams_message["print"]["tray_sub_brands"] = ""
print(ams_message)
publish(getMqttClient(), ams_message)
return f"""
<h1>Success</h1>
<p>Updated Spool ID {spool_id} with TAG id {tag_id} to AMS {ams_id}, Tray {tray_id}.</p>
"""
except Exception as e:
return f"<h1>Error</h1><p>{str(e)}</p>"
@app.route("/")
def home():
try:
# Update Spoolman with the selected tray
spools = fetchSpools()
last_ams_config = getLastAMSConfig()
ams_data = last_ams_config.get("ams", [])
vt_tray_data = last_ams_config.get("vt_tray", {})
html = """
<h1>Current AMS Configuration</h1>
"""
html += """
<h1>AMS</h1>
<ul>
"""
for ams in ams_data:
html += f"<li>AMS {ams['id']} (Humidity: {ams['humidity']}%, Temp: {ams['temp']}°C)<ul>"
for tray in ams["tray"]:
tray_status = f"[{tray['tray_sub_brands']} {tray['tray_color']}]"
html += f"""
<li>
Tray {tray['id']} {tray_status} - Remaining: {tray['remain']}%
</li>
"""
html += "</ul></li>"
html += "</ul>"
html += f"""
<h1>External Spool</h1>
<ul>
<li>Tray {vt_tray_data['id']} [{vt_tray_data['tray_sub_brands']} {vt_tray_data['tray_color']}] - Remaining: {vt_tray_data['remain']}%</li>
</ul>
"""
html += """
<h1>Add new TAG</h1>
<ul>
"""
for spool in spools:
if not spool.get("extra", {}).get("tag"):
html += f"<li><a href='/assign_tag?spool_id={spool.get('id')}'>Spool {spool.get('filament').get('vendor').get('name')} - {spool.get('filament').get('name')}</a></li>"
html += "</ul>"
return html
except Exception as e:
return f"<h1>Error</h1><p>{str(e)}</p>"
@app.route("/assign_tag")
def assign_tag():
spool_id = request.args.get("spool_id")
if not spool_id:
return "spool ID is required as a query parameter (e.g., ?spool_id=1)"
myuuid = str(uuid.uuid4())
resp = requests.patch(f"{SPOOLMAN_API_URL}/spool/{spool_id}", json={
"extra": {
"tag": json.dumps(myuuid),
}
})
print(resp.status_code)
print(resp.raw)
return f"""
<html>
<header>
<script type="text/javascript">
function writeNFC(){{
const ndef = new NDEFReader();
ndef.write({{
records: [{{ recordType: "url", data: "{BASE_URL}/spool_info?tag_id={myuuid}" }}],
}}).then(() => {{
alert("Message written.");
}}).catch(error => {{
alert(`Write failed :-( try again: ${{error}}.`);
}});
}};
</script>
</header>
<body>
NFC Write
<button id="write" onclick="writeNFC()">Write</button>
</body>
</html>
"""
@app.route('/', methods=['GET'])
def health():
return "OK", 200

8
config.template Normal file
View File

@ -0,0 +1,8 @@
BASE_URL = "" # Where will this app be accessible
PRINTER_ID = "" # Printer serial number - Run init_bambulab.py
PRINTER_CODE = "" # Printer access code - Run init_bambulab.py
PRINTER_IP = "" # Printer local IP address - Check wireless on printer
SPOOLMAN_PROTO = ""
SPOOLMAN_IP = "" # Spoolman IP address
SPOOLMAN_PORT = "" # Spoolman port
SPOOLMAN_API_URL = f"{SPOOLMAN_PROTO}://{SPOOLMAN_IP}:{SPOOLMAN_PORT}/api/v1"

179
filament.py Normal file
View File

@ -0,0 +1,179 @@
# mapping adapted from https://github.com/spuder/OpenSpool/blob/main/firmware/conf.d/automation.yaml
def generate_filament_brand_code(filament_type, filament_brand, filament_variant):
filament_sub_brand = ""
filament_brand_code = ""
if filament_type == "TPU":
if filament_brand == "Bambu":
filament_brand_code = "GFU01"
filament_sub_brand = "TPU 95A"
else:
filament_brand_code = "GFU99"
filament_sub_brand = "TPU"
elif filament_type == "PLA":
if filament_brand == "PolyTerra":
filament_brand_code = "GFL01"
filament_sub_brand = "PolyTerra PLA"
elif filament_brand == "PolyLite":
filament_brand_code = "GFL00"
filament_sub_brand = "PolyLite PLA"
elif filament_brand == "Bambu":
if filament_variant == "Basic":
filament_brand_code = "GFA00"
filament_sub_brand = "PLA Basic"
elif filament_variant == "Matte":
filament_brand_code = "GFA01"
filament_sub_brand = "PLA Matte"
elif filament_variant == "Metal":
filament_brand_code = "GFA02"
filament_sub_brand = "PLA Metal"
elif filament_variant == "Impact":
filament_brand_code = "GFA03"
filament_sub_brand = "PLA Impact"
else:
filament_brand_code = "GFA00"
filament_sub_brand = "PLA Basic"
else:
filament_brand_code = "GFL99"
filament_sub_brand = "PLA"
elif filament_type == "PETG":
if filament_brand == "Overture":
filament_brand_code = "GFG99" # Placeholder code
filament_sub_brand = "PETG"
else:
filament_brand_code = "GFG99"
filament_sub_brand = "PETG"
elif filament_type == "PET-CF":
if filament_brand == "Bambu":
filament_brand_code = "GFT00"
filament_sub_brand = "PET-CF"
else:
filament_brand_code = "GFG99"
filament_sub_brand = "PET-CF"
elif filament_type == "ASA":
filament_brand_code = "GFB98"
filament_sub_brand = "ASA"
elif filament_type == "ABS":
if filament_brand == "Bambu":
filament_brand_code = "GFB00"
filament_sub_brand = "ABS"
else:
filament_brand_code = "GFB99"
filament_sub_brand = "ABS"
elif filament_type == "PC":
if filament_brand == "Bambu":
filament_brand_code = "GFC00"
filament_sub_brand = "PC"
else:
filament_brand_code = "GFC99"
filament_sub_brand = "PC"
elif filament_type == "PA":
filament_brand_code = "GFN99"
filament_sub_brand = "PA"
elif filament_type == "PA-CF":
if filament_brand == "Bambu":
filament_brand_code = "GFN03"
filament_sub_brand = "PA-CF"
else:
filament_brand_code = "GFN98"
filament_sub_brand = "PA-CF"
elif filament_type == "PLA-CF":
filament_brand_code = "GFL98"
filament_sub_brand = "PLA-CF"
elif filament_type == "PVA":
filament_brand_code = "GFS99"
filament_sub_brand = "PVA"
elif filament_type == "Support":
if filament_variant == "G":
filament_brand_code = "GFS01"
filament_sub_brand = "Support G"
elif filament_variant == "W":
filament_brand_code = "GFS00"
filament_sub_brand = "Support W"
else:
filament_brand_code = "GFS00"
filament_sub_brand = "Support W"
else:
print(f"Unknown filament type: {filament_type}")
return {"brand_code": filament_brand_code,
"sub_brand_code": filament_sub_brand
}
def generate_filament_temperatures(filament_type, filament_brand):
filament_min_temp = 150
filament_max_temp = 300
if not filament_type:
print("Skipping temperature generation as filament_type is empty.")
return
if filament_type == "TPU":
if filament_brand == "Generic":
filament_min_temp = 200
filament_max_temp = 250
else:
print(f"Unknown temperatures for TPU brand: {filament_brand}")
filament_min_temp = 200
filament_max_temp = 250
elif filament_type == "PLA":
if filament_brand == "Generic":
filament_min_temp = 190
filament_max_temp = 240
else:
print(f"Unknown temperatures for PLA brand: {filament_brand}")
filament_min_temp = 190
filament_max_temp = 240
elif filament_type == "PETG":
if filament_brand == "Generic":
filament_min_temp = 220
filament_max_temp = 270
else:
print(f"Unknown temperatures for PETG brand: {filament_brand}")
filament_min_temp = 220
filament_max_temp = 270
elif filament_type == "ASA":
if filament_brand == "Generic":
filament_min_temp = 240
filament_max_temp = 280
else:
print(f"Unknown temperatures for ASA brand: {filament_brand}")
filament_min_temp = 240
filament_max_temp = 280
elif filament_type == "PC":
if filament_brand == "Generic":
filament_min_temp = 250
filament_max_temp = 300
else:
print(f"Unknown temperatures for PC brand: {filament_brand}")
filament_min_temp = 250
filament_max_temp = 300
elif filament_type == "PA":
if filament_brand == "Generic":
filament_min_temp = 260
filament_max_temp = 300
else:
print(f"Unknown temperatures for PA brand: {filament_brand}")
filament_min_temp = 260
filament_max_temp = 300
else:
print(f"Unknown filament type: {filament_type}")
return {"filament_min_temp": filament_min_temp,
"filament_max_temp": filament_max_temp
}

16
messages.py Normal file
View File

@ -0,0 +1,16 @@
GET_VERSION = {"info": {"sequence_id": "0", "command": "get_version"}}
PUSH_ALL = {"pushing": {"sequence_id": "0", "command": "pushall"}}
AMS_FILAMENT_SETTING = {"print":
{
"sequence_id": "0",
"command": "ams_filament_setting",
"ams_id": None,
"tray_id": None,
"tray_color": None,
"nozzle_temp_min": None,
"nozzle_temp_max": None,
"tray_type": None,
"setting_id": "",
"tray_info_idx": None
}
}

118
mqtt_bambulab.py Normal file
View File

@ -0,0 +1,118 @@
import json
import ssl
import traceback
from threading import Thread
import paho.mqtt.client as mqtt
import requests
from config import PRINTER_ID, PRINTER_CODE, PRINTER_IP, SPOOLMAN_API_URL
from messages import GET_VERSION, PUSH_ALL
def num2letter(num):
return chr(ord("A") + int(num))
def publish(client, msg):
result = client.publish(f"device/{PRINTER_ID}/request", json.dumps(msg))
status = result[0]
if status == 0:
print(f"Sent {msg} to topic device/{PRINTER_ID}/request")
return True
print(f"Failed to send message to topic device/{PRINTER_ID}/request")
return False
def on_message(client, userdata, msg):
global LAST_AMS_CONFIG
# TODO: Consume spool
try:
data = json.loads(msg.payload.decode())
if "print" in data and "vt_tray" in data["print"]:
print(data)
LAST_AMS_CONFIG["vt_tray"] = data["print"]["vt_tray"]
if "print" in data and "ams" in data["print"] and "ams" in data["print"]["ams"]:
print(data)
LAST_AMS_CONFIG["ams"] = data["print"]["ams"]["ams"]
print(LAST_AMS_CONFIG)
for ams in data["print"]["ams"]["ams"]:
print(f"AMS [{num2letter(ams['id'])}] (hum: {ams['humidity']}, temp: {ams['temp']}ºC)")
for tray in ams["tray"]:
if "tray_sub_brands" in tray:
print(f" - [{num2letter(ams['id'])}{tray['id']}] {tray['tray_sub_brands']} {tray['tray_color']} ({str(tray['remain']).zfill(3)}%) [[ {tray['tag_uid']} ]]")
found = False
for spool in SPOOLS:
if not spool.get("extra", {}).get("tag"):
continue
tag = json.loads(spool["extra"]["tag"])
if tag != tray["tag_uid"]:
continue
found = True
resp = requests.patch(f"{SPOOLMAN_API_URL}/spool/{spool['id']}", json={
"extra": {
"tag": spool["extra"]["tag"],
"active_tray": json.dumps(f"{PRINTER_ID}_{ams['id']}_{tray['id']}"),
}
})
print(resp.text)
print(resp.status_code)
#TODO: remove active_tray from inactive spools
#Doesn't work for AMS Lite
#requests.patch(f"http://{SPOOLMAN_IP}:7912/api/v1/spool/{spool['id']}", json={
# "remaining_weight": tray["remain"] / 100 * tray["tray_weight"]
#})
if not found:
print(" - Not found. Update spool tag!")
except Exception as e:
traceback.print_exc()
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe(f"device/{PRINTER_ID}/report")
publish(client, GET_VERSION)
publish(client, PUSH_ALL)
# Fetch spools from spoolman
def fetchSpools():
global SPOOLS
response = requests.get(f"{SPOOLMAN_API_URL}/spool")
SPOOLS = response.json()
return SPOOLS
def async_subscribe():
global MQTT_CLIENT
MQTT_CLIENT = mqtt.Client()
MQTT_CLIENT.username_pw_set("bblp", PRINTER_CODE)
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
MQTT_CLIENT.tls_set_context(ssl_ctx)
MQTT_CLIENT.tls_insecure_set(True)
MQTT_CLIENT.on_connect = on_connect
MQTT_CLIENT.on_message = on_message
MQTT_CLIENT.connect(PRINTER_IP, 8883)
MQTT_CLIENT.loop_forever()
# Start the asynchronous processing in a separate thread
thread = Thread(target=async_subscribe)
thread.start()
def getLastAMSConfig():
global LAST_AMS_CONFIG
return LAST_AMS_CONFIG
def getMqttClient():
global MQTT_CLIENT
return MQTT_CLIENT
MQTT_CLIENT = {} # Global variable storing MQTT Client
LAST_AMS_CONFIG = {} # Global variable storing last AMS configuration
SPOOLS = fetchSpools() # Global variable storing latest spool from spoolman

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
paho-mqtt==2.1.0
requests==2.32.3
flask==3.1.0
pyopenssl==24.3.0

111
scripts/init_bambulab.py Normal file
View File

@ -0,0 +1,111 @@
import requests
import json
# Prompt the user for their Bambu Lab username and password
bambuUsername = input("Enter your Bambu Lab username: ")
bambuPassword = input("Enter your Bambu Lab password: ")
# Define the User-Agent and other common headers
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"
headers = {
"Content-Type": "application/json",
"User-Agent": user_agent,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"Origin": "https://bambulab.com",
"Referer": "https://bambulab.com/"
}
# Perform the login request with custom headers
auth_payload = {
"account": bambuUsername,
"password": bambuPassword
}
auth_response = requests.post("https://api.bambulab.com/v1/user-service/user/login", headers=headers, json=auth_payload)
# Print the auth response for debugging
print("Step 1. Auth login response:", auth_response.text)
# Check if authentication was successful
auth_json = auth_response.json()
if not auth_json.get("success"):
print("Authentication failed, attempting code verification")
# Send the verification code request
send_code_payload = {
"email": bambuUsername,
"type": "codeLogin"
}
if auth_json.get("loginType") == "verifyCode":
send_code_response = requests.post("https://api.bambulab.com/v1/user-service/user/sendemail/code", headers=headers,
json=send_code_payload)
if send_code_response.status_code == 200:
print("Verification code sent successfully. Check your email.")
else:
print("Failed to send verification code.")
print("Response:", send_code_response.text)
exit(1)
verify_code = input("Enter your access code: ")
verify_payload = {
"account": bambuUsername,
"code": verify_code
}
api_response = requests.post("https://api.bambulab.com/v1/user-service/user/login", headers=headers,
json=verify_payload)
print("Step 2 - API verify code response:", api_response.text)
api_token = api_response.json()
if api_token:
token = api_token.get("accessToken")
if not token:
print("Failed to extract token")
exit(1)
elif auth_json.get("loginType") == "tfa":
tfa_auth = auth_json.get("tfaKey")
tfa_code = input("Enter your MFA access code: ")
verify_payload = {
"tfaKey": tfa_auth,
"tfaCode": tfa_code
}
print("payload: ", verify_payload)
api2_response = requests.post("https://bambulab.com/api-sign-in/tfa", headers=headers, json=verify_payload)
print("Step 2 - API MFA response:", api2_response.text)
cookies = api2_response.cookies.get_dict()
token_from_tfa = cookies.get("token")
print("tokenFromTFA:", token_from_tfa)
token = token_from_tfa
if not token:
print("Failed to extract token")
exit(1)
else:
# If authentication was successful in the first attempt, get the token from the JSON response
token = auth_json.get("accessToken")
if not token:
print("Unable to authenticate or verify. Exiting...")
exit(1)
# Perform the API request to fetch the image with custom headers
headers["Authorization"] = f"Bearer {token}"
api_response = requests.get("https://api.bambulab.com/v1/iot-service/api/user/bind", headers=headers)
# Print the API response for debugging
print("API to fetch info:", api_response.text)
# Check if the API request was successful
if api_response.status_code != 200:
print("API request failed")
exit(1)
# Extract and display the relevant information
api_json = api_response.json()
print(json.dumps(api_json, indent=2))

5
wsgi.py Normal file
View File

@ -0,0 +1,5 @@
from app import app
import os
if __name__ == '__main__':
app.run(debug=os.getenv('DEBUG', False), port=os.getenv('PORT', 443), host=os.getenv('HOST', '0.0.0.0'), ssl_context='adhoc')