#!/usr/bin/env python3 """ Nextcloud Talk OpenCode Bot Receives webhooks from Nextcloud Talk and responds using opencode CLI against a local model exposed via the `halo-8000` provider. """ import asyncio import hashlib import hmac import json import logging import os import re import secrets from datetime import datetime from typing import Optional import httpx from fastapi import FastAPI, Request, HTTPException, Header from fastapi.responses import JSONResponse NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "").rstrip("/") OPENCODE_PATH = os.environ.get("OPENCODE_PATH", "opencode") OPENCODE_MODEL = os.environ.get("OPENCODE_MODEL", "halo-8000/halo-8000") ALLOWED_USERS = [u.strip() for u in os.environ.get("ALLOWED_USERS", "").split(",") if u.strip()] TIMEOUT = int(os.environ.get("TIMEOUT", "120")) SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", "") BOT_NAME = os.environ.get("BOT_NAME", "Halo") def get_bot_secret() -> str: cred_path = os.environ.get("CREDENTIALS_DIRECTORY", "") if cred_path: secret_file = os.path.join(cred_path, "bot-secret") if os.path.exists(secret_file): with open(secret_file) as f: return f.read().strip() return os.environ.get("BOT_SECRET", "") BOT_SECRET = get_bot_secret() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) log = logging.getLogger(__name__) app = FastAPI(title="Nextcloud OpenCode Bot") conversations: dict[str, list[tuple[str, str]]] = {} MAX_HISTORY = int(os.environ.get("CONTEXT_MESSAGES", "6")) def generate_bot_auth_headers(body: str = "") -> dict: random = secrets.token_hex(32) digest = hmac.new( BOT_SECRET.encode(), (random + body).encode(), hashlib.sha256 ).hexdigest() return { "X-Nextcloud-Talk-Bot-Random": random, "X-Nextcloud-Talk-Bot-Signature": digest, "OCS-APIRequest": "true", } def verify_signature(body: bytes, signature: str, random: Optional[str] = None) -> bool: if not BOT_SECRET: log.warning("No bot secret configured, skipping signature verification") return True if signature.startswith("sha256="): signature = signature[7:] expected1 = hmac.new(BOT_SECRET.encode(), body, hashlib.sha256).hexdigest() if random: expected2 = hmac.new(BOT_SECRET.encode(), (random.encode() + body), hashlib.sha256).hexdigest() else: expected2 = None if hmac.compare_digest(expected1, signature): return True if expected2 and hmac.compare_digest(expected2, signature): return True return False BOT_SYSTEM_PROMPT = """\ Du bist ein KI-Assistent im Nextcloud Talk Chat. Deine Antworten werden direkt in den Chatraum gepostet. Halte deine Antworten kurz und prägnant, da es ein Chat ist. Nutze Markdown für Formatierung wenn sinnvoll. Du erhältst: - : Die letzten Nachrichten im Chatraum (User und deine Antworten) - : Die aktuelle Nachricht, auf die du antworten sollst""" def build_system_prompt() -> str: if SYSTEM_PROMPT: return f"{BOT_SYSTEM_PROMPT}\n\n{SYSTEM_PROMPT.strip()}" return BOT_SYSTEM_PROMPT def build_prompt(conversation_token: str, current_message: str, current_user: str) -> str: """Build the full prompt. opencode run has no system-prompt flag, so we inline the system instructions at the top.""" parts = [ "", build_system_prompt(), "", "", ] history = conversations.get(conversation_token, []) if history: parts.append("") for role, msg in history[-MAX_HISTORY:]: parts.append(f"{role}: {msg}") parts.append("") parts.append("") parts.append(f"") parts.append(current_message) parts.append("") return "\n".join(parts) async def call_opencode(prompt: str) -> str: """Call opencode CLI and return response.""" cmd = [OPENCODE_PATH, "run", "-m", OPENCODE_MODEL, prompt] log.info(f"Calling opencode: {OPENCODE_PATH} run -m {OPENCODE_MODEL} ...") try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=TIMEOUT ) if proc.returncode != 0: log.error(f"opencode CLI error: {stderr.decode()}") return f"❌ Fehler beim Aufruf von opencode: {stderr.decode()[:200]}" return stdout.decode().strip() except asyncio.TimeoutError: log.error(f"opencode CLI timeout after {TIMEOUT}s") return f"⏱️ Timeout: opencode hat nicht innerhalb von {TIMEOUT}s geantwortet." except Exception as e: log.exception("Error calling opencode") return f"❌ Fehler: {str(e)}" async def send_reply(conversation_token: str, message: str, reply_to: int = None): if not NEXTCLOUD_URL: log.error("NEXTCLOUD_URL not configured") return url = f"{NEXTCLOUD_URL}/ocs/v2.php/apps/spreed/api/v1/bot/{conversation_token}/message" headers = generate_bot_auth_headers(message) headers["Content-Type"] = "application/json" payload = { "message": message, "referenceId": hashlib.sha256( f"{conversation_token}-{datetime.now().isoformat()}".encode() ).hexdigest()[:32], } if reply_to: payload["replyTo"] = reply_to async with httpx.AsyncClient() as client: try: resp = await client.post(url, json=payload, headers=headers) if resp.status_code not in (200, 201): log.error(f"Failed to send reply: {resp.status_code} {resp.text}") else: log.info(f"Reply sent to conversation {conversation_token}") except Exception: log.exception("Error sending reply to Nextcloud") @app.post("/webhook") async def handle_webhook( request: Request, x_nextcloud_talk_signature: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Signature"), x_nextcloud_talk_random: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Random"), ): body = await request.body() if x_nextcloud_talk_signature and not verify_signature(body, x_nextcloud_talk_signature, x_nextcloud_talk_random): log.warning("Invalid webhook signature") raise HTTPException(status_code=401, detail="Invalid signature") try: data = json.loads(body) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") log.info(f"Received webhook: {json.dumps(data, indent=2)[:500]}") actor = data.get("actor", {}) actor_type = actor.get("type", "") actor_id_full = actor.get("id", "") if "/" in actor_id_full: actor_id = actor_id_full.split("/", 1)[1] else: actor_id = actor_id_full obj = data.get("object", {}) message_id = obj.get("id") content_str = obj.get("content", "{}") try: content = json.loads(content_str) message_text = content.get("message", "") except json.JSONDecodeError: message_text = content_str target = data.get("target", {}) conversation_token = target.get("id", "") if actor_type not in ("users", "Person"): log.info(f"Ignoring non-user actor: {actor_type}") return JSONResponse({"status": "ignored", "reason": "not a user message"}) is_direct_message = False bot_mentioned = False clean_message = message_text escaped = re.escape(BOT_NAME) mention_patterns = [ rf'@"?{escaped}"?\s*', ] for pattern in mention_patterns: if re.search(pattern, message_text, re.IGNORECASE): bot_mentioned = True clean_message = re.sub(pattern, '', message_text, flags=re.IGNORECASE).strip() break if not is_direct_message and not bot_mentioned: log.info("Ignoring message in group chat without mention") return JSONResponse({"status": "ignored", "reason": "not mentioned in group chat"}) if bot_mentioned: message_text = clean_message if ALLOWED_USERS and actor_id not in ALLOWED_USERS: log.warning(f"User {actor_id} not in allowed list") await send_reply( conversation_token, "🚫 Du bist nicht berechtigt, diesen Bot zu nutzen.", reply_to=message_id ) return JSONResponse({"status": "rejected", "reason": "user not allowed"}) if not message_text.strip(): return JSONResponse({"status": "ignored", "reason": "empty message"}) log.info(f"Processing message from {actor_id}: {message_text[:100]}") if message_text.strip().lower() in ("hilfe", "help", "?"): help_text = f"""🤖 **{BOT_NAME} Bot Hilfe** Schreib mir einfach eine Nachricht und ich antworte dir. **Nutzung:** • In Gruppenchats: @{BOT_NAME} gefolgt von deiner Frage **Befehle:** • `hilfe` oder `?` – Diese Hilfe anzeigen Modell: `{OPENCODE_MODEL}` Der Bot merkt sich die letzten Nachrichten pro Raum (bis zum Neustart).""" await send_reply(conversation_token, help_text, reply_to=message_id) return JSONResponse({"status": "ok", "action": "help"}) prompt = build_prompt(conversation_token, message_text, actor_id) response = await call_opencode(prompt) if conversation_token not in conversations: conversations[conversation_token] = [] conversations[conversation_token].append((f"User ({actor_id})", message_text)) conversations[conversation_token].append(("Assistant", response)) if len(conversations[conversation_token]) > MAX_HISTORY * 2: conversations[conversation_token] = conversations[conversation_token][-MAX_HISTORY * 2:] await send_reply(conversation_token, response, reply_to=message_id) return JSONResponse({"status": "ok"}) @app.get("/health") async def health(): return { "status": "ok", "nextcloud_url": NEXTCLOUD_URL, "opencode_path": OPENCODE_PATH, "opencode_model": OPENCODE_MODEL, "bot_name": BOT_NAME, "allowed_users": ALLOWED_USERS if ALLOWED_USERS else "all", "max_history": MAX_HISTORY, } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8086)