nixcfg/systems/x86_64-linux/mx/nextcloud-claude-bot/bot.py
Harald Hoyer b35373b0ec feat(bot): replace maxTokens with contextMessages option
- Switched `maxTokens` to `contextMessages` to set chat history length instead of token limit.
- Updated environment variables, NixOS module, and prompt building logic for consistency.
- Removed in-memory conversation history, now fetching from Nextcloud for better scalability.
2026-02-03 17:00:50 +01:00

362 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Nextcloud Talk Claude Bot
Receives webhooks from Nextcloud Talk and responds using Claude CLI.
"""
import asyncio
import hashlib
import hmac
import json
import logging
import os
import re
import secrets
from datetime import datetime
from typing import Optional
import httpx
from fastapi import FastAPI, Request, HTTPException, Header
from fastapi.responses import JSONResponse
# Configuration from environment
NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "").rstrip("/")
CLAUDE_PATH = os.environ.get("CLAUDE_PATH", "claude")
ALLOWED_USERS = [u.strip() for u in os.environ.get("ALLOWED_USERS", "").split(",") if u.strip()]
TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", "")
# Bot secret from systemd credential
def get_bot_secret() -> str:
cred_path = os.environ.get("CREDENTIALS_DIRECTORY", "")
if cred_path:
secret_file = os.path.join(cred_path, "bot-secret")
if os.path.exists(secret_file):
with open(secret_file) as f:
return f.read().strip()
# Fallback for development
return os.environ.get("BOT_SECRET", "")
BOT_SECRET = get_bot_secret()
# Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)
app = FastAPI(title="Nextcloud Claude Bot")
# Number of recent messages to fetch for context
CONTEXT_MESSAGES = int(os.environ.get("CONTEXT_MESSAGES", "6"))
def generate_bot_auth_headers(body: str = "") -> dict:
"""Generate authentication headers for bot requests to Nextcloud."""
random = secrets.token_hex(32)
digest = hmac.new(
BOT_SECRET.encode(),
(random + body).encode(),
hashlib.sha256
).hexdigest()
return {
"X-Nextcloud-Talk-Bot-Random": random,
"X-Nextcloud-Talk-Bot-Signature": digest,
"OCS-APIRequest": "true",
}
async def fetch_chat_history(conversation_token: str, limit: int = CONTEXT_MESSAGES) -> list[dict]:
"""Fetch recent messages from Nextcloud Talk conversation."""
if not NEXTCLOUD_URL:
log.warning("NEXTCLOUD_URL not configured, cannot fetch history")
return []
url = f"{NEXTCLOUD_URL}/ocs/v2.php/apps/spreed/api/v1/chat/{conversation_token}"
params = {
"limit": limit,
"lookIntoFuture": 0,
}
headers = generate_bot_auth_headers()
headers["Accept"] = "application/json"
async with httpx.AsyncClient() as client:
try:
resp = await client.get(url, params=params, headers=headers)
if resp.status_code == 200:
data = resp.json()
messages = data.get("ocs", {}).get("data", [])
# Messages come newest first, reverse for chronological order
return list(reversed(messages))
else:
log.warning(f"Failed to fetch chat history: {resp.status_code} {resp.text[:200]}")
return []
except Exception as e:
log.exception("Error fetching chat history")
return []
def verify_signature(body: bytes, signature: str, random: Optional[str] = None) -> bool:
"""Verify Nextcloud webhook signature."""
if not BOT_SECRET:
log.warning("No bot secret configured, skipping signature verification")
return True
# Nextcloud sends: sha256=<hex>
if signature.startswith("sha256="):
signature = signature[7:]
# Try different signature computation methods
# Method 1: Just body
expected1 = hmac.new(BOT_SECRET.encode(), body, hashlib.sha256).hexdigest()
# Method 2: random + body (if random header present)
if random:
expected2 = hmac.new(BOT_SECRET.encode(), (random.encode() + body), hashlib.sha256).hexdigest()
else:
expected2 = None
if hmac.compare_digest(expected1, signature):
return True
if expected2 and hmac.compare_digest(expected2, signature):
return True
return False
async def build_prompt(conversation_token: str, current_message: str, current_user: str) -> str:
"""Build prompt with conversation history from Nextcloud."""
parts = []
if SYSTEM_PROMPT:
parts.append(f"System: {SYSTEM_PROMPT}\n")
# Fetch recent history from Nextcloud
history = await fetch_chat_history(conversation_token)
# Add recent history (excluding the current message which triggered this)
for msg in history:
actor_type = msg.get("actorType", "")
actor_id = msg.get("actorId", "")
message_text = msg.get("message", "")
msg_type = msg.get("messageType", "")
# Skip system messages
if msg_type == "system":
continue
# Determine if this is a user or the bot
if actor_type == "bots":
parts.append(f"Assistant: {message_text}")
elif actor_type == "users":
parts.append(f"User ({actor_id}): {message_text}")
# Add current message
parts.append(f"User ({current_user}): {current_message}")
return "\n\n".join(parts)
async def call_claude(prompt: str) -> str:
"""Call Claude CLI and return response."""
cmd = [CLAUDE_PATH, "--print"]
log.info(f"Calling Claude: {' '.join(cmd)}")
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(prompt.encode()),
timeout=TIMEOUT
)
if proc.returncode != 0:
log.error(f"Claude CLI error: {stderr.decode()}")
return f"❌ Fehler beim Aufruf von Claude: {stderr.decode()[:200]}"
return stdout.decode().strip()
except asyncio.TimeoutError:
log.error(f"Claude CLI timeout after {TIMEOUT}s")
return f"⏱️ Timeout: Claude hat nicht innerhalb von {TIMEOUT}s geantwortet."
except Exception as e:
log.exception("Error calling Claude")
return f"❌ Fehler: {str(e)}"
async def send_reply(conversation_token: str, message: str, reply_to: int = None):
"""Send reply back to Nextcloud Talk."""
if not NEXTCLOUD_URL:
log.error("NEXTCLOUD_URL not configured")
return
url = f"{NEXTCLOUD_URL}/ocs/v2.php/apps/spreed/api/v1/bot/{conversation_token}/message"
# Bot authentication - signature is over the message being sent
headers = generate_bot_auth_headers(message)
headers["Content-Type"] = "application/json"
payload = {
"message": message,
"referenceId": hashlib.sha256(f"{conversation_token}-{datetime.now().isoformat()}".encode()).hexdigest()[:32],
}
if reply_to:
payload["replyTo"] = reply_to
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload, headers=headers)
if resp.status_code not in (200, 201):
log.error(f"Failed to send reply: {resp.status_code} {resp.text}")
else:
log.info(f"Reply sent to conversation {conversation_token}")
except Exception as e:
log.exception("Error sending reply to Nextcloud")
@app.post("/webhook")
async def handle_webhook(
request: Request,
x_nextcloud_talk_signature: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Signature"),
x_nextcloud_talk_random: Optional[str] = Header(None, alias="X-Nextcloud-Talk-Random"),
):
"""Handle incoming webhook from Nextcloud Talk."""
body = await request.body()
# Verify signature
if x_nextcloud_talk_signature and not verify_signature(body, x_nextcloud_talk_signature, x_nextcloud_talk_random):
log.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
data = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
log.info(f"Received webhook: {json.dumps(data, indent=2)[:500]}")
# Extract message info - Nextcloud Talk Bot webhook format
actor = data.get("actor", {})
actor_type = actor.get("type", "")
actor_id_full = actor.get("id", "") # e.g., "users/harald"
# Extract username from "users/username" format
if "/" in actor_id_full:
actor_id = actor_id_full.split("/", 1)[1]
else:
actor_id = actor_id_full
# Message is in object.content as JSON string
obj = data.get("object", {})
message_id = obj.get("id")
content_str = obj.get("content", "{}")
try:
content = json.loads(content_str)
message_text = content.get("message", "")
except json.JSONDecodeError:
message_text = content_str
# Conversation info is in target
target = data.get("target", {})
conversation_token = target.get("id", "")
# Only respond to user/person messages
if actor_type not in ("users", "Person"):
log.info(f"Ignoring non-user actor: {actor_type}")
return JSONResponse({"status": "ignored", "reason": "not a user message"})
# For now, treat all conversations the same (respond to mentions)
is_direct_message = False # We can't easily determine this from the webhook
# Check for bot mention in message (Nextcloud uses @"Bot Name" format)
bot_mentioned = False
clean_message = message_text
# Look for mention patterns: @Claude or @"Claude"
mention_patterns = [
r'@"?Claude"?\s*',
r'@"?claude"?\s*',
]
for pattern in mention_patterns:
if re.search(pattern, message_text, re.IGNORECASE):
bot_mentioned = True
clean_message = re.sub(pattern, '', message_text, flags=re.IGNORECASE).strip()
break
# In group chats, only respond if mentioned
if not is_direct_message and not bot_mentioned:
log.info(f"Ignoring message in group chat without mention")
return JSONResponse({"status": "ignored", "reason": "not mentioned in group chat"})
# Use clean message (without mention) for processing
if bot_mentioned:
message_text = clean_message
# Check allowed users
if ALLOWED_USERS and actor_id not in ALLOWED_USERS:
log.warning(f"User {actor_id} not in allowed list")
await send_reply(
conversation_token,
"🚫 Du bist nicht berechtigt, diesen Bot zu nutzen.",
reply_to=message_id
)
return JSONResponse({"status": "rejected", "reason": "user not allowed"})
if not message_text.strip():
return JSONResponse({"status": "ignored", "reason": "empty message"})
log.info(f"Processing message from {actor_id}: {message_text[:100]}")
if message_text.strip().lower() in ("/help", "/hilfe"):
help_text = """🤖 **Claude Bot Hilfe**
Schreib mir einfach eine Nachricht und ich antworte dir.
**Nutzung:**
• In Direktnachrichten: Einfach schreiben
• In Gruppenchats: @Claude gefolgt von deiner Frage
**Befehle:**
• `/help` oder `/hilfe` Diese Hilfe anzeigen
Der Bot nutzt die letzten Nachrichten aus dem Chat als Kontext."""
await send_reply(conversation_token, help_text, reply_to=message_id)
return JSONResponse({"status": "ok", "action": "help"})
# Build prompt with chat history and call Claude
prompt = await build_prompt(conversation_token, message_text, actor_id)
response = await call_claude(prompt)
# Send response
await send_reply(conversation_token, response, reply_to=message_id)
return JSONResponse({"status": "ok"})
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "ok",
"nextcloud_url": NEXTCLOUD_URL,
"claude_path": CLAUDE_PATH,
"allowed_users": ALLOWED_USERS if ALLOWED_USERS else "all",
"context_messages": CONTEXT_MESSAGES,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8085)