#!/usr/bin/env python3 """ Nextcloud Talk OpenCode Bot Receives webhooks from Nextcloud Talk and forwards the conversation to an OpenAI-compatible chat-completions endpoint (e.g. llama-server) running on the local LLM host. """ import hashlib import hmac import json import logging import os import re import secrets from datetime import datetime from typing import Optional import httpx from fastapi import FastAPI, Request, HTTPException, Header from fastapi.responses import JSONResponse NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "").rstrip("/") MODEL_BASE_URL = os.environ.get("MODEL_BASE_URL", "").rstrip("/") MODEL_NAME = os.environ.get("MODEL_NAME", "halo-8000") MODEL_API_KEY = os.environ.get("MODEL_API_KEY", "") ALLOWED_USERS = [u.strip() for u in os.environ.get("ALLOWED_USERS", "").split(",") if u.strip()] TIMEOUT = int(os.environ.get("TIMEOUT", "120")) SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", "") BOT_NAME = os.environ.get("BOT_NAME", "Halo") def get_bot_secret() -> str: cred_path = os.environ.get("CREDENTIALS_DIRECTORY", "") if cred_path: secret_file = os.path.join(cred_path, "bot-secret") if os.path.exists(secret_file): with open(secret_file) as f: return f.read().strip() return os.environ.get("BOT_SECRET", "") BOT_SECRET = get_bot_secret() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) log = logging.getLogger(__name__) app = FastAPI(title="Nextcloud OpenCode Bot") # Conversation history per room: list of OpenAI-style message dicts # (role: "user"|"assistant", content: str). conversations: dict[str, list[dict]] = {} MAX_HISTORY = int(os.environ.get("CONTEXT_MESSAGES", "6")) def generate_bot_auth_headers(body: str = "") -> dict: random = secrets.token_hex(32) digest = hmac.new( BOT_SECRET.encode(), (random + body).encode(), hashlib.sha256 ).hexdigest() return { "X-Nextcloud-Talk-Bot-Random": random, "X-Nextcloud-Talk-Bot-Signature": digest, "OCS-APIRequest": "true", } def verify_signature(body: bytes, signature: str, random: Optional[str] = None) -> bool: if not BOT_SECRET: log.warning("No bot secret configured, skipping signature verification") return True if signature.startswith("sha256="): signature = signature[7:] expected1 = hmac.new(BOT_SECRET.encode(), body, hashlib.sha256).hexdigest() if random: expected2 = hmac.new(BOT_SECRET.encode(), (random.encode() + body), hashlib.sha256).hexdigest() else: expected2 = None if hmac.compare_digest(expected1, signature): return True if expected2 and hmac.compare_digest(expected2, signature): return True return False BOT_SYSTEM_PROMPT = """\ Du bist ein KI-Assistent im Nextcloud Talk Chat. Deine Antworten werden direkt in den Chatraum gepostet. Halte deine Antworten kurz und prägnant, da es ein Chat ist. Nutze Markdown für Formatierung wenn sinnvoll.""" def build_system_prompt() -> str: if SYSTEM_PROMPT: return f"{BOT_SYSTEM_PROMPT}\n\n{SYSTEM_PROMPT.strip()}" return BOT_SYSTEM_PROMPT def build_messages(conversation_token: str, current_message: str, current_user: str) -> list[dict]: messages: list[dict] = [{"role": "system", "content": build_system_prompt()}] history = conversations.get(conversation_token, []) messages.extend(history[-MAX_HISTORY * 2:]) messages.append({"role": "user", "content": f"[{current_user}] {current_message}"}) return messages async def call_model(messages: list[dict]) -> str: """POST to /chat/completions and return the assistant content.""" if not MODEL_BASE_URL: return "❌ Fehler: MODEL_BASE_URL ist nicht konfiguriert." url = f"{MODEL_BASE_URL}/chat/completions" headers = {"Content-Type": "application/json"} if MODEL_API_KEY: headers["Authorization"] = f"Bearer {MODEL_API_KEY}" payload = { "model": MODEL_NAME, "messages": messages, "stream": False, } log.info(f"Calling model {MODEL_NAME} at {url} ({len(messages)} messages)") try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: resp = await client.post(url, json=payload, headers=headers) if resp.status_code != 200: log.error(f"Model API error: {resp.status_code} {resp.text[:500]}") return f"❌ Fehler vom Modell: HTTP {resp.status_code}" data = resp.json() choices = data.get("choices") or [] if not choices: log.error(f"Model returned no choices: {data}") return "❌ Fehler: Modell hat keine Antwort geliefert." content = choices[0].get("message", {}).get("content", "") if not content: log.error(f"Model returned empty content: {choices[0]}") return "❌ Fehler: leere Antwort vom Modell." return content.strip() except httpx.TimeoutException: log.error(f"Model API timeout after {TIMEOUT}s") return f"⏱️ Timeout: Das Modell hat nicht innerhalb von {TIMEOUT}s geantwortet." except Exception as e: log.exception("Error calling model") return f"❌ Fehler: {str(e)}" async def send_reply(conversation_token: str, message: str, reply_to: int = None): if not NEXTCLOUD_URL: log.error("NEXTCLOUD_URL not configured") return url = f"{NEXTCLOUD_URL}/ocs/v2.php/apps/spreed/api/v1/bot/{conversation_token}/message" headers = generate_bot_auth_headers(message) headers["Content-Type"] = "application/json" payload = { "message": message, "referenceId": hashlib.sha256( f"{conversation_token}-{datetime.now().isoformat()}".encode() ).hexdigest()[:32], } if reply_to: payload["replyTo"] = reply_to async with httpx.AsyncClient() as client: try: resp = await client.post(url, json=payload, headers=headers) if resp.status_code not in (200, 201): log.error(f"Failed to send reply: {resp.status_code} {resp.text}") else: log.info(f"Reply sent to conversation {conversation_token}") except Exception: log.exception("Error sending reply to Nextcloud") @app.post("/webhook") async def handle_webhook( request: Request, x_nextcloud_talk_signature: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Signature"), x_nextcloud_talk_random: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Random"), ): body = await request.body() if x_nextcloud_talk_signature and not verify_signature(body, x_nextcloud_talk_signature, x_nextcloud_talk_random): log.warning("Invalid webhook signature") raise HTTPException(status_code=401, detail="Invalid signature") try: data = json.loads(body) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") log.info(f"Received webhook: {json.dumps(data, indent=2)[:500]}") actor = data.get("actor", {}) actor_type = actor.get("type", "") actor_id_full = actor.get("id", "") if "/" in actor_id_full: actor_id = actor_id_full.split("/", 1)[1] else: actor_id = actor_id_full obj = data.get("object", {}) message_id = obj.get("id") content_str = obj.get("content", "{}") try: content = json.loads(content_str) message_text = content.get("message", "") except json.JSONDecodeError: message_text = content_str target = data.get("target", {}) conversation_token = target.get("id", "") if actor_type not in ("users", "Person"): log.info(f"Ignoring non-user actor: {actor_type}") return JSONResponse({"status": "ignored", "reason": "not a user message"}) is_direct_message = False bot_mentioned = False clean_message = message_text escaped = re.escape(BOT_NAME) mention_patterns = [ rf'@"?{escaped}"?\s*', ] for pattern in mention_patterns: if re.search(pattern, message_text, re.IGNORECASE): bot_mentioned = True clean_message = re.sub(pattern, '', message_text, flags=re.IGNORECASE).strip() break if not is_direct_message and not bot_mentioned: log.info("Ignoring message in group chat without mention") return JSONResponse({"status": "ignored", "reason": "not mentioned in group chat"}) if bot_mentioned: message_text = clean_message if ALLOWED_USERS and actor_id not in ALLOWED_USERS: log.warning(f"User {actor_id} not in allowed list") await send_reply( conversation_token, "🚫 Du bist nicht berechtigt, diesen Bot zu nutzen.", reply_to=message_id ) return JSONResponse({"status": "rejected", "reason": "user not allowed"}) if not message_text.strip(): return JSONResponse({"status": "ignored", "reason": "empty message"}) log.info(f"Processing message from {actor_id}: {message_text[:100]}") if message_text.strip().lower() in ("hilfe", "help", "?"): help_text = f"""🤖 **{BOT_NAME} Bot Hilfe** Schreib mir einfach eine Nachricht und ich antworte dir. **Nutzung:** • In Gruppenchats: @{BOT_NAME} gefolgt von deiner Frage **Befehle:** • `hilfe` oder `?` – Diese Hilfe anzeigen Modell: `{MODEL_NAME}` @ `{MODEL_BASE_URL}` Der Bot merkt sich die letzten Nachrichten pro Raum (bis zum Neustart).""" await send_reply(conversation_token, help_text, reply_to=message_id) return JSONResponse({"status": "ok", "action": "help"}) messages = build_messages(conversation_token, message_text, actor_id) response = await call_model(messages) history = conversations.setdefault(conversation_token, []) history.append({"role": "user", "content": f"[{actor_id}] {message_text}"}) history.append({"role": "assistant", "content": response}) if len(history) > MAX_HISTORY * 2: del history[: len(history) - MAX_HISTORY * 2] await send_reply(conversation_token, response, reply_to=message_id) return JSONResponse({"status": "ok"}) @app.get("/health") async def health(): return { "status": "ok", "nextcloud_url": NEXTCLOUD_URL, "model_base_url": MODEL_BASE_URL, "model_name": MODEL_NAME, "bot_name": BOT_NAME, "allowed_users": ALLOWED_USERS if ALLOWED_USERS else "all", "max_history": MAX_HISTORY, } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8086)