nixcfg/systems/x86_64-linux/mx/nextcloud-opencode-bot/bot.py
Harald Hoyer 42c52bd87f refactor(mx): drive opencode bot via direct chat-completions API
The bot no longer shells out to `opencode run`. Instead it POSTs to the
OpenAI-compatible /chat/completions endpoint exposed by llama-server on
halo.hoyer.tail:8000 directly. This removes the Bun/sqlite cold-start
overhead per request, drops the pkgs.opencode runtime dependency, and
eliminates the ExecStartPre dance that materialized config.json into the
service's $HOME.

Conversation history is now stored as a proper OpenAI `messages` list
with system/user/assistant roles, instead of the XML blob that was
inlined into a single `opencode run` argument. The interactive opencode
setup (config/opencode/config.json) is unchanged — only the bot stops
depending on it.

The module gains a `modelBaseUrl` option; `model` is now the bare model
name (`halo-8000`) without the provider/ prefix that the opencode CLI
required.
2026-05-13 16:38:58 +02:00

320 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Nextcloud Talk OpenCode Bot
Receives webhooks from Nextcloud Talk and forwards the conversation to an
OpenAI-compatible chat-completions endpoint (e.g. llama-server) running on
the local LLM host.
"""
import hashlib
import hmac
import json
import logging
import os
import re
import secrets
from datetime import datetime
from typing import Optional
import httpx
from fastapi import FastAPI, Request, HTTPException, Header
from fastapi.responses import JSONResponse
NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "").rstrip("/")
MODEL_BASE_URL = os.environ.get("MODEL_BASE_URL", "").rstrip("/")
MODEL_NAME = os.environ.get("MODEL_NAME", "halo-8000")
MODEL_API_KEY = os.environ.get("MODEL_API_KEY", "")
ALLOWED_USERS = [u.strip() for u in os.environ.get("ALLOWED_USERS", "").split(",") if u.strip()]
TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", "")
BOT_NAME = os.environ.get("BOT_NAME", "Halo")
def get_bot_secret() -> str:
cred_path = os.environ.get("CREDENTIALS_DIRECTORY", "")
if cred_path:
secret_file = os.path.join(cred_path, "bot-secret")
if os.path.exists(secret_file):
with open(secret_file) as f:
return f.read().strip()
return os.environ.get("BOT_SECRET", "")
BOT_SECRET = get_bot_secret()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)
app = FastAPI(title="Nextcloud OpenCode Bot")
# Conversation history per room: list of OpenAI-style message dicts
# (role: "user"|"assistant", content: str).
conversations: dict[str, list[dict]] = {}
MAX_HISTORY = int(os.environ.get("CONTEXT_MESSAGES", "6"))
def generate_bot_auth_headers(body: str = "") -> dict:
random = secrets.token_hex(32)
digest = hmac.new(
BOT_SECRET.encode(),
(random + body).encode(),
hashlib.sha256
).hexdigest()
return {
"X-Nextcloud-Talk-Bot-Random": random,
"X-Nextcloud-Talk-Bot-Signature": digest,
"OCS-APIRequest": "true",
}
def verify_signature(body: bytes, signature: str, random: Optional[str] = None) -> bool:
if not BOT_SECRET:
log.warning("No bot secret configured, skipping signature verification")
return True
if signature.startswith("sha256="):
signature = signature[7:]
expected1 = hmac.new(BOT_SECRET.encode(), body, hashlib.sha256).hexdigest()
if random:
expected2 = hmac.new(BOT_SECRET.encode(), (random.encode() + body), hashlib.sha256).hexdigest()
else:
expected2 = None
if hmac.compare_digest(expected1, signature):
return True
if expected2 and hmac.compare_digest(expected2, signature):
return True
return False
BOT_SYSTEM_PROMPT = """\
Du bist ein KI-Assistent im Nextcloud Talk Chat.
Deine Antworten werden direkt in den Chatraum gepostet.
Halte deine Antworten kurz und prägnant, da es ein Chat ist.
Nutze Markdown für Formatierung wenn sinnvoll."""
def build_system_prompt() -> str:
if SYSTEM_PROMPT:
return f"{BOT_SYSTEM_PROMPT}\n\n{SYSTEM_PROMPT.strip()}"
return BOT_SYSTEM_PROMPT
def build_messages(conversation_token: str, current_message: str, current_user: str) -> list[dict]:
messages: list[dict] = [{"role": "system", "content": build_system_prompt()}]
history = conversations.get(conversation_token, [])
messages.extend(history[-MAX_HISTORY * 2:])
messages.append({"role": "user", "content": f"[{current_user}] {current_message}"})
return messages
async def call_model(messages: list[dict]) -> str:
"""POST to /chat/completions and return the assistant content."""
if not MODEL_BASE_URL:
return "❌ Fehler: MODEL_BASE_URL ist nicht konfiguriert."
url = f"{MODEL_BASE_URL}/chat/completions"
headers = {"Content-Type": "application/json"}
if MODEL_API_KEY:
headers["Authorization"] = f"Bearer {MODEL_API_KEY}"
payload = {
"model": MODEL_NAME,
"messages": messages,
"stream": False,
}
log.info(f"Calling model {MODEL_NAME} at {url} ({len(messages)} messages)")
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
resp = await client.post(url, json=payload, headers=headers)
if resp.status_code != 200:
log.error(f"Model API error: {resp.status_code} {resp.text[:500]}")
return f"❌ Fehler vom Modell: HTTP {resp.status_code}"
data = resp.json()
choices = data.get("choices") or []
if not choices:
log.error(f"Model returned no choices: {data}")
return "❌ Fehler: Modell hat keine Antwort geliefert."
content = choices[0].get("message", {}).get("content", "")
if not content:
log.error(f"Model returned empty content: {choices[0]}")
return "❌ Fehler: leere Antwort vom Modell."
return content.strip()
except httpx.TimeoutException:
log.error(f"Model API timeout after {TIMEOUT}s")
return f"⏱️ Timeout: Das Modell hat nicht innerhalb von {TIMEOUT}s geantwortet."
except Exception as e:
log.exception("Error calling model")
return f"❌ Fehler: {str(e)}"
async def send_reply(conversation_token: str, message: str, reply_to: int = None):
if not NEXTCLOUD_URL:
log.error("NEXTCLOUD_URL not configured")
return
url = f"{NEXTCLOUD_URL}/ocs/v2.php/apps/spreed/api/v1/bot/{conversation_token}/message"
headers = generate_bot_auth_headers(message)
headers["Content-Type"] = "application/json"
payload = {
"message": message,
"referenceId": hashlib.sha256(
f"{conversation_token}-{datetime.now().isoformat()}".encode()
).hexdigest()[:32],
}
if reply_to:
payload["replyTo"] = reply_to
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload, headers=headers)
if resp.status_code not in (200, 201):
log.error(f"Failed to send reply: {resp.status_code} {resp.text}")
else:
log.info(f"Reply sent to conversation {conversation_token}")
except Exception:
log.exception("Error sending reply to Nextcloud")
@app.post("/webhook")
async def handle_webhook(
request: Request,
x_nextcloud_talk_signature: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Signature"),
x_nextcloud_talk_random: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Random"),
):
body = await request.body()
if x_nextcloud_talk_signature and not verify_signature(body, x_nextcloud_talk_signature, x_nextcloud_talk_random):
log.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
data = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
log.info(f"Received webhook: {json.dumps(data, indent=2)[:500]}")
actor = data.get("actor", {})
actor_type = actor.get("type", "")
actor_id_full = actor.get("id", "")
if "/" in actor_id_full:
actor_id = actor_id_full.split("/", 1)[1]
else:
actor_id = actor_id_full
obj = data.get("object", {})
message_id = obj.get("id")
content_str = obj.get("content", "{}")
try:
content = json.loads(content_str)
message_text = content.get("message", "")
except json.JSONDecodeError:
message_text = content_str
target = data.get("target", {})
conversation_token = target.get("id", "")
if actor_type not in ("users", "Person"):
log.info(f"Ignoring non-user actor: {actor_type}")
return JSONResponse({"status": "ignored", "reason": "not a user message"})
is_direct_message = False
bot_mentioned = False
clean_message = message_text
escaped = re.escape(BOT_NAME)
mention_patterns = [
rf'@"?{escaped}"?\s*',
]
for pattern in mention_patterns:
if re.search(pattern, message_text, re.IGNORECASE):
bot_mentioned = True
clean_message = re.sub(pattern, '', message_text, flags=re.IGNORECASE).strip()
break
if not is_direct_message and not bot_mentioned:
log.info("Ignoring message in group chat without mention")
return JSONResponse({"status": "ignored", "reason": "not mentioned in group chat"})
if bot_mentioned:
message_text = clean_message
if ALLOWED_USERS and actor_id not in ALLOWED_USERS:
log.warning(f"User {actor_id} not in allowed list")
await send_reply(
conversation_token,
"🚫 Du bist nicht berechtigt, diesen Bot zu nutzen.",
reply_to=message_id
)
return JSONResponse({"status": "rejected", "reason": "user not allowed"})
if not message_text.strip():
return JSONResponse({"status": "ignored", "reason": "empty message"})
log.info(f"Processing message from {actor_id}: {message_text[:100]}")
if message_text.strip().lower() in ("hilfe", "help", "?"):
help_text = f"""🤖 **{BOT_NAME} Bot Hilfe**
Schreib mir einfach eine Nachricht und ich antworte dir.
**Nutzung:**
• In Gruppenchats: @{BOT_NAME} gefolgt von deiner Frage
**Befehle:**
• `hilfe` oder `?` Diese Hilfe anzeigen
Modell: `{MODEL_NAME}` @ `{MODEL_BASE_URL}`
Der Bot merkt sich die letzten Nachrichten pro Raum (bis zum Neustart)."""
await send_reply(conversation_token, help_text, reply_to=message_id)
return JSONResponse({"status": "ok", "action": "help"})
messages = build_messages(conversation_token, message_text, actor_id)
response = await call_model(messages)
history = conversations.setdefault(conversation_token, [])
history.append({"role": "user", "content": f"[{actor_id}] {message_text}"})
history.append({"role": "assistant", "content": response})
if len(history) > MAX_HISTORY * 2:
del history[: len(history) - MAX_HISTORY * 2]
await send_reply(conversation_token, response, reply_to=message_id)
return JSONResponse({"status": "ok"})
@app.get("/health")
async def health():
return {
"status": "ok",
"nextcloud_url": NEXTCLOUD_URL,
"model_base_url": MODEL_BASE_URL,
"model_name": MODEL_NAME,
"bot_name": BOT_NAME,
"allowed_users": ALLOWED_USERS if ALLOWED_USERS else "all",
"max_history": MAX_HISTORY,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8086)