#!/usr/bin/env python3 """ Nextcloud Talk Claude Bot Receives webhooks from Nextcloud Talk and responds using Claude CLI. """ import asyncio import hashlib import hmac import json import logging import os import re import subprocess from datetime import datetime from typing import Optional import httpx from fastapi import FastAPI, Request, HTTPException, Header from fastapi.responses import JSONResponse # Configuration from environment NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "").rstrip("/") CLAUDE_PATH = os.environ.get("CLAUDE_PATH", "claude") ALLOWED_USERS = [u.strip() for u in os.environ.get("ALLOWED_USERS", "").split(",") if u.strip()] MAX_TOKENS = int(os.environ.get("MAX_TOKENS", "4096")) TIMEOUT = int(os.environ.get("TIMEOUT", "120")) SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", "") # Bot secret from systemd credential def get_bot_secret() -> str: cred_path = os.environ.get("CREDENTIALS_DIRECTORY", "") if cred_path: secret_file = os.path.join(cred_path, "bot-secret") if os.path.exists(secret_file): with open(secret_file) as f: return f.read().strip() # Fallback for development return os.environ.get("BOT_SECRET", "") BOT_SECRET = get_bot_secret() # Logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) log = logging.getLogger(__name__) app = FastAPI(title="Nextcloud Claude Bot") # Simple in-memory conversation history (per user) # Format: {user_id: [(timestamp, role, message), ...]} conversations: dict[str, list[tuple[datetime, str, str]]] = {} MAX_HISTORY = 10 # Keep last N exchanges per user def verify_signature(body: bytes, signature: str, random: Optional[str] = None) -> bool: """Verify Nextcloud webhook signature.""" if not BOT_SECRET: log.warning("No bot secret configured, skipping signature verification") return True # Nextcloud sends: sha256= if signature.startswith("sha256="): signature = signature[7:] # Try different signature computation methods # Method 1: Just body expected1 = hmac.new(BOT_SECRET.encode(), body, hashlib.sha256).hexdigest() # Method 2: random + body (if random header present) if random: expected2 = hmac.new(BOT_SECRET.encode(), (random.encode() + body), hashlib.sha256).hexdigest() else: expected2 = None log.info(f"Signature verification: received={signature[:16]}...") log.info(f" Method 1 (body only): {expected1[:16]}...") if expected2: log.info(f" Method 2 (random+body): {expected2[:16]}...") if hmac.compare_digest(expected1, signature): return True if expected2 and hmac.compare_digest(expected2, signature): return True return False def build_prompt(user_id: str, message: str) -> str: """Build prompt with conversation history.""" history = conversations.get(user_id, []) parts = [] if SYSTEM_PROMPT: parts.append(f"System: {SYSTEM_PROMPT}\n") # Add recent history for ts, role, msg in history[-MAX_HISTORY:]: prefix = "User" if role == "user" else "Assistant" parts.append(f"{prefix}: {msg}") # Add current message parts.append(f"User: {message}") return "\n\n".join(parts) async def call_claude(prompt: str) -> str: """Call Claude CLI and return response.""" cmd = [CLAUDE_PATH, "--print"] log.info(f"Calling Claude: {' '.join(cmd)}") try: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( proc.communicate(prompt.encode()), timeout=TIMEOUT ) if proc.returncode != 0: log.error(f"Claude CLI error: {stderr.decode()}") return f"❌ Fehler beim Aufruf von Claude: {stderr.decode()[:200]}" return stdout.decode().strip() except asyncio.TimeoutError: log.error(f"Claude CLI timeout after {TIMEOUT}s") return f"⏱️ Timeout: Claude hat nicht innerhalb von {TIMEOUT}s geantwortet." except Exception as e: log.exception("Error calling Claude") return f"❌ Fehler: {str(e)}" async def send_reply(conversation_token: str, message: str, reply_to: int = None): """Send reply back to Nextcloud Talk.""" if not NEXTCLOUD_URL: log.error("NEXTCLOUD_URL not configured") return url = f"{NEXTCLOUD_URL}/ocs/v2.php/apps/spreed/api/v1/bot/{conversation_token}/message" headers = { "OCS-APIRequest": "true", "Content-Type": "application/json", } # Bot authentication if BOT_SECRET: # Generate random string for request import secrets random = secrets.token_hex(32) digest = hmac.new( BOT_SECRET.encode(), (random + message).encode(), hashlib.sha256 ).hexdigest() headers["X-Nextcloud-Talk-Bot-Random"] = random headers["X-Nextcloud-Talk-Bot-Signature"] = digest payload = { "message": message, "referenceId": hashlib.sha256(f"{conversation_token}-{datetime.now().isoformat()}".encode()).hexdigest()[:32], } if reply_to: payload["replyTo"] = reply_to async with httpx.AsyncClient() as client: try: resp = await client.post(url, json=payload, headers=headers) if resp.status_code not in (200, 201): log.error(f"Failed to send reply: {resp.status_code} {resp.text}") else: log.info(f"Reply sent to conversation {conversation_token}") except Exception as e: log.exception("Error sending reply to Nextcloud") @app.post("/webhook") async def handle_webhook( request: Request, x_nextcloud_talk_signature: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Signature"), x_nextcloud_talk_random: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Random"), ): """Handle incoming webhook from Nextcloud Talk.""" body = await request.body() log.info(f"Headers: signature={x_nextcloud_talk_signature}, random={x_nextcloud_talk_random}") log.info(f"Body (first 200): {body[:200]}") # Verify signature if x_nextcloud_talk_signature and not verify_signature(body, x_nextcloud_talk_signature, x_nextcloud_talk_random): log.warning("Invalid webhook signature") raise HTTPException(status_code=401, detail="Invalid signature") try: data = json.loads(body) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") log.info(f"Received webhook: {json.dumps(data, indent=2)[:500]}") # Extract message info - Nextcloud Talk Bot webhook format actor = data.get("actor", {}) actor_type = actor.get("type", "") actor_id_full = actor.get("id", "") # e.g., "users/harald" # Extract username from "users/username" format if "/" in actor_id_full: actor_id = actor_id_full.split("/", 1)[1] else: actor_id = actor_id_full # Message is in object.content as JSON string obj = data.get("object", {}) message_id = obj.get("id") content_str = obj.get("content", "{}") try: content = json.loads(content_str) message_text = content.get("message", "") except json.JSONDecodeError: message_text = content_str # Conversation info is in target target = data.get("target", {}) conversation_token = target.get("id", "") # Only respond to user/person messages if actor_type not in ("users", "Person"): log.info(f"Ignoring non-user actor: {actor_type}") return JSONResponse({"status": "ignored", "reason": "not a user message"}) # For now, treat all conversations the same (respond to mentions) is_direct_message = False # We can't easily determine this from the webhook # Check for bot mention in message (Nextcloud uses @"Bot Name" format) bot_mentioned = False clean_message = message_text # Look for mention patterns: @Claude or @"Claude" mention_patterns = [ r'@"?Claude"?\s*', r'@"?claude"?\s*', ] for pattern in mention_patterns: if re.search(pattern, message_text, re.IGNORECASE): bot_mentioned = True clean_message = re.sub(pattern, '', message_text, flags=re.IGNORECASE).strip() break # In group chats, only respond if mentioned if not is_direct_message and not bot_mentioned: log.info(f"Ignoring message in group chat without mention") return JSONResponse({"status": "ignored", "reason": "not mentioned in group chat"}) # Use clean message (without mention) for processing if bot_mentioned: message_text = clean_message # Check allowed users if ALLOWED_USERS and actor_id not in ALLOWED_USERS: log.warning(f"User {actor_id} not in allowed list") await send_reply( conversation_token, "🚫 Du bist nicht berechtigt, diesen Bot zu nutzen.", reply_to=message_id ) return JSONResponse({"status": "rejected", "reason": "user not allowed"}) if not message_text.strip(): return JSONResponse({"status": "ignored", "reason": "empty message"}) log.info(f"Processing message from {actor_id}: {message_text[:100]}") # Store user message in history if actor_id not in conversations: conversations[actor_id] = [] conversations[actor_id].append((datetime.now(), "user", message_text)) # Handle special commands if message_text.strip().lower() in ("/clear", "/reset", "/neu"): conversations[actor_id] = [] await send_reply( conversation_token, "🧹 Konversation zurückgesetzt.", reply_to=message_id ) return JSONResponse({"status": "ok", "action": "cleared"}) if message_text.strip().lower() in ("/help", "/hilfe"): help_text = """🤖 **Claude Bot Hilfe** Schreib mir einfach eine Nachricht und ich antworte dir. **Nutzung:** • In Direktnachrichten: Einfach schreiben • In Gruppenchats: @Claude gefolgt von deiner Frage **Befehle:** • `/clear` oder `/reset` – Konversation zurücksetzen • `/help` oder `/hilfe` – Diese Hilfe anzeigen Der Bot merkt sich die letzten Nachrichten für Kontext.""" await send_reply(conversation_token, help_text, reply_to=message_id) return JSONResponse({"status": "ok", "action": "help"}) # Build prompt and call Claude prompt = build_prompt(actor_id, message_text) response = await call_claude(prompt) # Store assistant response in history conversations[actor_id].append((datetime.now(), "assistant", response)) # Trim history if len(conversations[actor_id]) > MAX_HISTORY * 2: conversations[actor_id] = conversations[actor_id][-MAX_HISTORY * 2:] # Send response await send_reply(conversation_token, response, reply_to=message_id) return JSONResponse({"status": "ok"}) @app.get("/health") async def health(): """Health check endpoint.""" return { "status": "ok", "nextcloud_url": NEXTCLOUD_URL, "claude_path": CLAUDE_PATH, "allowed_users": ALLOWED_USERS if ALLOWED_USERS else "all", } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8085)