nixcfg/systems/x86_64-linux/mx/nextcloud-opencode-bot/bot.py
Harald Hoyer 5b44e037a1 feat(halo): add song <URL> command to convert via song.link
Resolves the URL through the Odesli public API (api.song.link) and
replies with the canonical song.link page plus per-platform deep links
(Spotify, Apple Music, YouTube/YT Music, Tidal, Deezer, Amazon Music,
SoundCloud). Country is pinned to DE.
2026-05-20 09:42:11 +02:00

388 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Nextcloud Talk OpenCode Bot
Receives webhooks from Nextcloud Talk and forwards the conversation to an
OpenAI-compatible chat-completions endpoint (e.g. llama-server) running on
the local LLM host.
"""
import hashlib
import hmac
import json
import logging
import os
import re
import secrets
from datetime import datetime
from typing import Optional
import httpx
from fastapi import FastAPI, Request, HTTPException, Header
from fastapi.responses import JSONResponse
NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "").rstrip("/")
MODEL_BASE_URL = os.environ.get("MODEL_BASE_URL", "").rstrip("/")
MODEL_NAME = os.environ.get("MODEL_NAME", "halo-8000")
MODEL_API_KEY = os.environ.get("MODEL_API_KEY", "")
ALLOWED_USERS = [u.strip() for u in os.environ.get("ALLOWED_USERS", "").split(",") if u.strip()]
TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", "")
BOT_NAME = os.environ.get("BOT_NAME", "Halo")
SONGLINK_API = "https://api.song.link/v1-alpha.1/links"
SONGLINK_COUNTRY = "DE"
SONGLINK_PLATFORMS = [
("spotify", "Spotify"),
("appleMusic", "Apple Music"),
("youtube", "YouTube"),
("youtubeMusic", "YouTube Music"),
("tidal", "Tidal"),
("deezer", "Deezer"),
("amazonMusic", "Amazon Music"),
("soundcloud", "SoundCloud"),
]
def get_bot_secret() -> str:
cred_path = os.environ.get("CREDENTIALS_DIRECTORY", "")
if cred_path:
secret_file = os.path.join(cred_path, "bot-secret")
if os.path.exists(secret_file):
with open(secret_file) as f:
return f.read().strip()
return os.environ.get("BOT_SECRET", "")
BOT_SECRET = get_bot_secret()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)
app = FastAPI(title="Nextcloud OpenCode Bot")
# Conversation history per room: list of OpenAI-style message dicts
# (role: "user"|"assistant", content: str).
conversations: dict[str, list[dict]] = {}
MAX_HISTORY = int(os.environ.get("CONTEXT_MESSAGES", "6"))
def generate_bot_auth_headers(body: str = "") -> dict:
random = secrets.token_hex(32)
digest = hmac.new(
BOT_SECRET.encode(),
(random + body).encode(),
hashlib.sha256
).hexdigest()
return {
"X-Nextcloud-Talk-Bot-Random": random,
"X-Nextcloud-Talk-Bot-Signature": digest,
"OCS-APIRequest": "true",
}
def verify_signature(body: bytes, signature: str, random: Optional[str] = None) -> bool:
if not BOT_SECRET:
log.warning("No bot secret configured, skipping signature verification")
return True
if signature.startswith("sha256="):
signature = signature[7:]
expected1 = hmac.new(BOT_SECRET.encode(), body, hashlib.sha256).hexdigest()
if random:
expected2 = hmac.new(BOT_SECRET.encode(), (random.encode() + body), hashlib.sha256).hexdigest()
else:
expected2 = None
if hmac.compare_digest(expected1, signature):
return True
if expected2 and hmac.compare_digest(expected2, signature):
return True
return False
BOT_SYSTEM_PROMPT = """\
Du bist ein KI-Assistent im Nextcloud Talk Chat.
Deine Antworten werden direkt in den Chatraum gepostet.
Halte deine Antworten kurz und prägnant, da es ein Chat ist.
Nutze Markdown für Formatierung wenn sinnvoll."""
def build_system_prompt() -> str:
if SYSTEM_PROMPT:
return f"{BOT_SYSTEM_PROMPT}\n\n{SYSTEM_PROMPT.strip()}"
return BOT_SYSTEM_PROMPT
def build_messages(conversation_token: str, current_message: str, current_user: str) -> list[dict]:
messages: list[dict] = [{"role": "system", "content": build_system_prompt()}]
history = conversations.get(conversation_token, [])
messages.extend(history[-MAX_HISTORY * 2:])
messages.append({"role": "user", "content": f"[{current_user}] {current_message}"})
return messages
async def call_model(messages: list[dict]) -> str:
"""POST to /chat/completions and return the assistant content."""
if not MODEL_BASE_URL:
return "❌ Fehler: MODEL_BASE_URL ist nicht konfiguriert."
url = f"{MODEL_BASE_URL}/chat/completions"
headers = {"Content-Type": "application/json"}
if MODEL_API_KEY:
headers["Authorization"] = f"Bearer {MODEL_API_KEY}"
payload = {
"model": MODEL_NAME,
"messages": messages,
"stream": False,
}
log.info(f"Calling model {MODEL_NAME} at {url} ({len(messages)} messages)")
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
resp = await client.post(url, json=payload, headers=headers)
if resp.status_code != 200:
log.error(f"Model API error: {resp.status_code} {resp.text[:500]}")
return f"❌ Fehler vom Modell: HTTP {resp.status_code}"
data = resp.json()
choices = data.get("choices") or []
if not choices:
log.error(f"Model returned no choices: {data}")
return "❌ Fehler: Modell hat keine Antwort geliefert."
content = choices[0].get("message", {}).get("content", "")
if not content:
log.error(f"Model returned empty content: {choices[0]}")
return "❌ Fehler: leere Antwort vom Modell."
return content.strip()
except httpx.TimeoutException:
log.error(f"Model API timeout after {TIMEOUT}s")
return f"⏱️ Timeout: Das Modell hat nicht innerhalb von {TIMEOUT}s geantwortet."
except Exception as e:
log.exception("Error calling model")
return f"❌ Fehler: {str(e)}"
async def resolve_song_link(url: str) -> str:
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
SONGLINK_API,
params={"url": url, "userCountry": SONGLINK_COUNTRY},
)
except httpx.TimeoutException:
return "⏱️ Timeout bei der Anfrage an song.link."
except Exception as e:
log.exception("song.link request failed")
return f"❌ Fehler bei song.link: {e}"
if resp.status_code != 200:
log.error(f"song.link error: {resp.status_code} {resp.text[:300]}")
return f"❌ song.link Fehler: HTTP {resp.status_code}"
data = resp.json()
page_url = data.get("pageUrl")
if not page_url:
return "❌ song.link hat keine Seite zurückgegeben."
entity_id = data.get("entityUniqueId")
entity = data.get("entitiesByUniqueId", {}).get(entity_id, {}) if entity_id else {}
title = entity.get("title")
artist = entity.get("artistName")
lines: list[str] = []
if title:
header = f"**{title}**" + (f" {artist}" if artist else "")
lines.append(header)
lines.append(page_url)
links = data.get("linksByPlatform", {})
platform_lines = []
for key, label in SONGLINK_PLATFORMS:
platform_url = links.get(key, {}).get("url")
if platform_url:
platform_lines.append(f"- [{label}]({platform_url})")
if platform_lines:
lines.append("")
lines.extend(platform_lines)
return "\n".join(lines)
async def send_reply(conversation_token: str, message: str, reply_to: int = None):
if not NEXTCLOUD_URL:
log.error("NEXTCLOUD_URL not configured")
return
url = f"{NEXTCLOUD_URL}/ocs/v2.php/apps/spreed/api/v1/bot/{conversation_token}/message"
headers = generate_bot_auth_headers(message)
headers["Content-Type"] = "application/json"
payload = {
"message": message,
"referenceId": hashlib.sha256(
f"{conversation_token}-{datetime.now().isoformat()}".encode()
).hexdigest()[:32],
}
if reply_to:
payload["replyTo"] = reply_to
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload, headers=headers)
if resp.status_code not in (200, 201):
log.error(f"Failed to send reply: {resp.status_code} {resp.text}")
else:
log.info(f"Reply sent to conversation {conversation_token}")
except Exception:
log.exception("Error sending reply to Nextcloud")
@app.post("/webhook")
async def handle_webhook(
request: Request,
x_nextcloud_talk_signature: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Signature"),
x_nextcloud_talk_random: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Random"),
):
body = await request.body()
if x_nextcloud_talk_signature and not verify_signature(body, x_nextcloud_talk_signature, x_nextcloud_talk_random):
log.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
data = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
log.info(f"Received webhook: {json.dumps(data, indent=2)[:500]}")
actor = data.get("actor", {})
actor_type = actor.get("type", "")
actor_id_full = actor.get("id", "")
if "/" in actor_id_full:
actor_id = actor_id_full.split("/", 1)[1]
else:
actor_id = actor_id_full
obj = data.get("object", {})
message_id = obj.get("id")
content_str = obj.get("content", "{}")
try:
content = json.loads(content_str)
message_text = content.get("message", "")
except json.JSONDecodeError:
message_text = content_str
target = data.get("target", {})
conversation_token = target.get("id", "")
if actor_type not in ("users", "Person"):
log.info(f"Ignoring non-user actor: {actor_type}")
return JSONResponse({"status": "ignored", "reason": "not a user message"})
is_direct_message = False
bot_mentioned = False
clean_message = message_text
escaped = re.escape(BOT_NAME)
mention_patterns = [
rf'@"?{escaped}"?\s*',
]
for pattern in mention_patterns:
if re.search(pattern, message_text, re.IGNORECASE):
bot_mentioned = True
clean_message = re.sub(pattern, '', message_text, flags=re.IGNORECASE).strip()
break
if not is_direct_message and not bot_mentioned:
log.info("Ignoring message in group chat without mention")
return JSONResponse({"status": "ignored", "reason": "not mentioned in group chat"})
if bot_mentioned:
message_text = clean_message
if ALLOWED_USERS and actor_id not in ALLOWED_USERS:
log.warning(f"User {actor_id} not in allowed list")
await send_reply(
conversation_token,
"🚫 Du bist nicht berechtigt, diesen Bot zu nutzen.",
reply_to=message_id
)
return JSONResponse({"status": "rejected", "reason": "user not allowed"})
if not message_text.strip():
return JSONResponse({"status": "ignored", "reason": "empty message"})
log.info(f"Processing message from {actor_id}: {message_text[:100]}")
if message_text.strip().lower() in ("hilfe", "help", "?"):
help_text = f"""🤖 **{BOT_NAME} Bot Hilfe**
Schreib mir einfach eine Nachricht und ich antworte dir.
**Nutzung:**
• In Gruppenchats: @{BOT_NAME} gefolgt von deiner Frage
**Befehle:**
• `hilfe` oder `?` Diese Hilfe anzeigen
• `song <URL>` Streaming-Link über song.link konvertieren
Modell: `{MODEL_NAME}` @ `{MODEL_BASE_URL}`
Der Bot merkt sich die letzten Nachrichten pro Raum (bis zum Neustart)."""
await send_reply(conversation_token, help_text, reply_to=message_id)
return JSONResponse({"status": "ok", "action": "help"})
song_match = re.match(r'^\s*song\s+(\S+)', message_text, re.IGNORECASE)
if song_match:
song_url = song_match.group(1).strip()
log.info(f"Resolving song.link for {song_url}")
reply = await resolve_song_link(song_url)
await send_reply(conversation_token, reply, reply_to=message_id)
return JSONResponse({"status": "ok", "action": "song"})
messages = build_messages(conversation_token, message_text, actor_id)
response = await call_model(messages)
history = conversations.setdefault(conversation_token, [])
history.append({"role": "user", "content": f"[{actor_id}] {message_text}"})
history.append({"role": "assistant", "content": response})
if len(history) > MAX_HISTORY * 2:
del history[: len(history) - MAX_HISTORY * 2]
await send_reply(conversation_token, response, reply_to=message_id)
return JSONResponse({"status": "ok"})
@app.get("/health")
async def health():
return {
"status": "ok",
"nextcloud_url": NEXTCLOUD_URL,
"model_base_url": MODEL_BASE_URL,
"model_name": MODEL_NAME,
"bot_name": BOT_NAME,
"allowed_users": ALLOWED_USERS if ALLOWED_USERS else "all",
"max_history": MAX_HISTORY,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8086)