Files
serman-rss-wrapper-mixcloud/mixcloud_rss_pro.py

360 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Erweiterte Mixcloud RSS Feed Generator mit Audio-Streaming
Extrahiert echte Audio-URLs für die Wiedergabe in Podcast-Apps.
"""
import requests
import xml.etree.ElementTree as ET
from datetime import datetime
import json
import time
from urllib.parse import quote
import argparse
import os
import yt_dlp
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class MixcloudRSSGeneratorPro:
def __init__(self, username, output_file="mixcloud_feed.xml", extract_audio=True):
self.username = username
self.output_file = output_file
self.extract_audio = extract_audio
self.base_url = "https://api.mixcloud.com"
self.user_url = f"{self.base_url}/{username}/"
self.audio_cache = {}
self.cache_lock = threading.Lock()
def get_user_info(self):
"""Holt Benutzerinformationen von Mixcloud."""
try:
response = requests.get(self.user_url)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Fehler beim Abrufen der Benutzerinformationen: {e}")
return None
def get_cloudcasts(self, limit=50):
"""Holt die neuesten Cloudcasts (Tracks) des Benutzers."""
cloudcasts_url = f"{self.user_url}cloudcasts/"
params = {"limit": limit}
try:
response = requests.get(cloudcasts_url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", [])
except requests.RequestException as e:
print(f"Fehler beim Abrufen der Cloudcasts: {e}")
return []
def extract_audio_url(self, mixcloud_url):
"""Extrahiert die echte Audio-URL mit yt-dlp."""
if not self.extract_audio:
return mixcloud_url
# Cache prüfen
with self.cache_lock:
if mixcloud_url in self.audio_cache:
return self.audio_cache[mixcloud_url]
try:
ydl_opts = {
'quiet': True,
'no_warnings': True,
'format': 'best[ext=m4a]/best', # Bevorzuge m4a für bessere Podcast-Kompatibilität
'extractaudio': False,
'noplaylist': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(mixcloud_url, download=False)
if info and 'url' in info:
audio_url = info['url']
# Cache speichern
with self.cache_lock:
self.audio_cache[mixcloud_url] = audio_url
return audio_url
else:
print(f"⚠️ Keine Audio-URL gefunden für: {mixcloud_url}")
return mixcloud_url
except Exception as e:
print(f"⚠️ Fehler beim Extrahieren der Audio-URL für {mixcloud_url}: {e}")
return mixcloud_url
def extract_audio_urls_parallel(self, cloudcasts, max_workers=3):
"""Extrahiert Audio-URLs parallel für bessere Performance."""
if not self.extract_audio:
return cloudcasts
print(f"🎵 Extrahiere Audio-URLs für {len(cloudcasts)} Tracks...")
def extract_for_cloudcast(cloudcast):
mixcloud_url = f"https://www.mixcloud.com{cloudcast.get('key', '')}"
audio_url = self.extract_audio_url(mixcloud_url)
cloudcast['audio_url'] = audio_url
return cloudcast
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_cloudcast = {
executor.submit(extract_for_cloudcast, cloudcast): cloudcast
for cloudcast in cloudcasts
}
completed_cloudcasts = []
for i, future in enumerate(as_completed(future_to_cloudcast), 1):
try:
cloudcast = future.result()
completed_cloudcasts.append(cloudcast)
print(f"{i}/{len(cloudcasts)} - {cloudcast.get('name', 'Unbekannt')}")
except Exception as e:
cloudcast = future_to_cloudcast[future]
cloudcast['audio_url'] = f"https://www.mixcloud.com{cloudcast.get('key', '')}"
completed_cloudcasts.append(cloudcast)
print(f" ⚠️ {i}/{len(cloudcasts)} - Fehler: {e}")
return completed_cloudcasts
def format_duration(self, seconds):
"""Formatiert die Dauer in HH:MM:SS Format."""
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def get_content_type_and_size(self, url):
"""Ermittelt Content-Type und Dateigröße einer URL."""
try:
response = requests.head(url, timeout=10)
content_type = response.headers.get('content-type', 'audio/mpeg')
content_length = response.headers.get('content-length')
# Fallback für Content-Type basierend auf URL
if 'audio' not in content_type:
if '.m4a' in url or '.aac' in url:
content_type = 'audio/mp4'
elif '.mp3' in url:
content_type = 'audio/mpeg'
else:
content_type = 'audio/mpeg'
return content_type, content_length
except:
return 'audio/mpeg', None
def create_rss_feed(self):
"""Erstellt den RSS-Feed aus den Mixcloud-Daten."""
user_info = self.get_user_info()
if not user_info:
return False
cloudcasts = self.get_cloudcasts()
if not cloudcasts:
print("Keine Cloudcasts gefunden.")
return False
# Audio-URLs extrahieren wenn aktiviert
if self.extract_audio:
cloudcasts = self.extract_audio_urls_parallel(cloudcasts)
# RSS Root Element
rss = ET.Element("rss")
rss.set("version", "2.0")
rss.set("xmlns:itunes", "http://www.itunes.com/dtds/podcast-1.0.dtd")
rss.set("xmlns:content", "http://purl.org/rss/1.0/modules/content/")
# Channel Element
channel = ET.SubElement(rss, "channel")
# Channel Metadaten
title = ET.SubElement(channel, "title")
title.text = f"{user_info.get('name', self.username)} - Mixcloud Feed"
description = ET.SubElement(channel, "description")
description.text = user_info.get('biog', f"Mixcloud-Feed von {self.username}")
link = ET.SubElement(channel, "link")
link.text = f"https://www.mixcloud.com/{self.username}/"
language = ET.SubElement(channel, "language")
language.text = "de-DE"
# iTunes-spezifische Tags
itunes_author = ET.SubElement(channel, "itunes:author")
itunes_author.text = user_info.get('name', self.username)
itunes_summary = ET.SubElement(channel, "itunes:summary")
itunes_summary.text = user_info.get('biog', f"Mixcloud-Feed von {self.username}")
itunes_category = ET.SubElement(channel, "itunes:category")
itunes_category.set("text", "Music")
# Explicit Content (für Musik meist nicht nötig)
itunes_explicit = ET.SubElement(channel, "itunes:explicit")
itunes_explicit.text = "false"
# Bild falls vorhanden
if user_info.get('pictures', {}).get('large'):
image = ET.SubElement(channel, "image")
image_url = ET.SubElement(image, "url")
image_url.text = user_info['pictures']['large']
image_title = ET.SubElement(image, "title")
image_title.text = title.text
image_link = ET.SubElement(image, "link")
image_link.text = link.text
itunes_image = ET.SubElement(channel, "itunes:image")
itunes_image.set("href", user_info['pictures']['large'])
print(f"📦 Erstelle RSS-Feed mit {len(cloudcasts)} Episoden...")
# Items (Episoden) hinzufügen
for cloudcast in cloudcasts:
item = ET.SubElement(channel, "item")
# Titel
item_title = ET.SubElement(item, "title")
item_title.text = cloudcast.get('name', 'Unbekannter Titel')
# Beschreibung
item_description = ET.SubElement(item, "description")
description_text = cloudcast.get('description', '')
if not description_text:
description_text = f"Mix von {self.username}"
item_description.text = description_text
# Link zur Mixcloud-Seite
item_link = ET.SubElement(item, "link")
item_link.text = cloudcast.get('url', '')
# GUID
item_guid = ET.SubElement(item, "guid")
item_guid.text = cloudcast.get('key', '')
item_guid.set("isPermaLink", "false")
# Veröffentlichungsdatum
item_pubdate = ET.SubElement(item, "pubDate")
created_time = cloudcast.get('created_time')
if created_time:
# Konvertiere ISO-Format zu RFC 2822
dt = datetime.fromisoformat(created_time.replace('Z', '+00:00'))
item_pubdate.text = dt.strftime('%a, %d %b %Y %H:%M:%S %z')
# Audio-Enclosure mit echter Audio-URL
enclosure = ET.SubElement(item, "enclosure")
audio_url = cloudcast.get('audio_url', f"https://www.mixcloud.com{cloudcast.get('key', '')}")
enclosure.set("url", audio_url)
# Content-Type und Größe ermitteln
if self.extract_audio and audio_url != f"https://www.mixcloud.com{cloudcast.get('key', '')}":
content_type, content_length = self.get_content_type_and_size(audio_url)
enclosure.set("type", content_type)
if content_length:
enclosure.set("length", content_length)
else:
enclosure.set("type", "audio/mpeg")
# Dauer
duration = cloudcast.get('audio_length', 0)
if duration:
item_duration = ET.SubElement(item, "itunes:duration")
item_duration.text = self.format_duration(duration)
# iTunes-spezifische Tags
itunes_title = ET.SubElement(item, "itunes:title")
itunes_title.text = item_title.text
itunes_summary = ET.SubElement(item, "itunes:summary")
itunes_summary.text = description_text
itunes_explicit_item = ET.SubElement(item, "itunes:explicit")
itunes_explicit_item.text = "false"
# Tags hinzufügen
tags = cloudcast.get('tags', [])
if tags:
keywords = ", ".join([tag['name'] for tag in tags[:5]]) # Nur erste 5 Tags
itunes_keywords = ET.SubElement(item, "itunes:keywords")
itunes_keywords.text = keywords
# XML in Datei schreiben
tree = ET.ElementTree(rss)
ET.indent(tree, space=" ", level=0)
try:
tree.write(self.output_file, encoding='utf-8', xml_declaration=True)
print(f"✅ RSS-Feed erfolgreich erstellt: {self.output_file}")
print(f"📊 Anzahl der Episoden: {len(cloudcasts)}")
if self.extract_audio:
audio_count = sum(1 for c in cloudcasts if c.get('audio_url', '').startswith('http') and 'mixcloud.com' not in c.get('audio_url', ''))
print(f"🎵 Direkte Audio-URLs extrahiert: {audio_count}/{len(cloudcasts)}")
return True
except Exception as e:
print(f"❌ Fehler beim Schreiben der XML-Datei: {e}")
return False
def serve_feed(self, port=8000):
"""Startet einen einfachen HTTP-Server für den RSS-Feed."""
import http.server
import socketserver
import os
# Wechsle in das Verzeichnis mit der XML-Datei
os.chdir(os.path.dirname(os.path.abspath(self.output_file)))
handler = http.server.SimpleHTTPRequestHandler
try:
with socketserver.TCPServer(("", port), handler) as httpd:
print(f"🌐 Server läuft auf http://localhost:{port}")
print(f"📡 RSS-Feed: http://localhost:{port}/{os.path.basename(self.output_file)}")
print("⏹️ Drücke Ctrl+C zum Beenden")
httpd.serve_forever()
except KeyboardInterrupt:
print("\n👋 Server beendet.")
except Exception as e:
print(f"❌ Fehler beim Starten des Servers: {e}")
def main():
parser = argparse.ArgumentParser(description="Erstellt einen RSS-Feed aus Mixcloud-Tracks mit echten Audio-URLs")
parser.add_argument("username", help="Mixcloud-Benutzername (z.B. serman_dj)")
parser.add_argument("-o", "--output", default="mixcloud_feed.xml",
help="Ausgabedatei für den RSS-Feed (Standard: mixcloud_feed.xml)")
parser.add_argument("-l", "--limit", type=int, default=50,
help="Anzahl der zu holenden Tracks (Standard: 50)")
parser.add_argument("--no-audio", action="store_true",
help="Deaktiviert die Audio-URL-Extraktion (nur Mixcloud-Links)")
parser.add_argument("--serve", action="store_true",
help="Startet einen HTTP-Server für den RSS-Feed")
parser.add_argument("--port", type=int, default=8000,
help="Port für den HTTP-Server (Standard: 8000)")
args = parser.parse_args()
extract_audio = not args.no_audio
generator = MixcloudRSSGeneratorPro(args.username, args.output, extract_audio)
print(f"🎵 Erstelle RSS-Feed für Mixcloud-User: {args.username}")
if extract_audio:
print("🔧 Audio-URL-Extraktion aktiviert (kann einige Minuten dauern)")
else:
print("⚡ Schnellmodus: Nur Mixcloud-Links (keine Audio-Extraktion)")
print("-" * 60)
success = generator.create_rss_feed()
if success and args.serve:
generator.serve_feed(args.port)
if __name__ == "__main__":
main()