//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts. //! //! This module replaces the raw TCP implementation with axum for: //! - Proper HTTP/1.1 parsing and compliance //! - Content-Length validation (handled by hyper) //! - Request body size limits (64KB max) //! - Request timeouts (30s) to prevent slow-loris attacks //! - Header sanitization (handled by axum/hyper) use crate::channels::{Channel, WhatsAppChannel}; use crate::config::Config; use crate::memory::{self, Memory, MemoryCategory}; use crate::providers::{self, Provider}; use crate::security::pairing::{constant_time_eq, is_public_bind, PairingGuard}; use anyhow::Result; use axum::{ body::Bytes, extract::{Query, State}, http::{header, HeaderMap, StatusCode}, response::{IntoResponse, Json}, routing::{get, post}, Router, }; use std::net::SocketAddr; use std::sync::Arc; use tower_http::limit::RequestBodyLimitLayer; /// Maximum request body size (64KB) — prevents memory exhaustion pub const MAX_BODY_SIZE: usize = 65_536; /// Request timeout (30s) — prevents slow-loris attacks pub const REQUEST_TIMEOUT_SECS: u64 = 30; /// Shared state for all axum handlers #[derive(Clone)] pub struct AppState { pub provider: Arc, pub model: String, pub temperature: f64, pub mem: Arc, pub auto_save: bool, pub webhook_secret: Option>, pub pairing: Arc, pub whatsapp: Option>, } /// Run the HTTP gateway using axum with proper HTTP/1.1 compliance. #[allow(clippy::too_many_lines)] pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { // ── Security: refuse public bind without tunnel or explicit opt-in ── if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind { anyhow::bail!( "🛑 Refusing to bind to {host} — gateway would be exposed to the internet.\n\ Fix: use --host 127.0.0.1 (default), configure a tunnel, or set\n\ [gateway] allow_public_bind = true in config.toml (NOT recommended)." ); } let addr: SocketAddr = format!("{host}:{port}").parse()?; let listener = tokio::net::TcpListener::bind(addr).await?; let actual_port = listener.local_addr()?.port(); let display_addr = format!("{host}:{actual_port}"); let provider: Arc = Arc::from(providers::create_resilient_provider( config.default_provider.as_deref().unwrap_or("openrouter"), config.api_key.as_deref(), &config.reliability, )?); let model = config .default_model .clone() .unwrap_or_else(|| "anthropic/claude-sonnet-4-20250514".into()); let temperature = config.default_temperature; let mem: Arc = Arc::from(memory::create_memory( &config.memory, &config.workspace_dir, config.api_key.as_deref(), )?); // Extract webhook secret for authentication let webhook_secret: Option> = config .channels_config .webhook .as_ref() .and_then(|w| w.secret.as_deref()) .map(Arc::from); // WhatsApp channel (if configured) let whatsapp_channel: Option> = config.channels_config.whatsapp.as_ref().map(|wa| { Arc::new(WhatsAppChannel::new( wa.access_token.clone(), wa.phone_number_id.clone(), wa.verify_token.clone(), wa.allowed_numbers.clone(), )) }); // ── Pairing guard ────────────────────────────────────── let pairing = Arc::new(PairingGuard::new( config.gateway.require_pairing, &config.gateway.paired_tokens, )); // ── Tunnel ──────────────────────────────────────────────── let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?; let mut tunnel_url: Option = None; if let Some(ref tun) = tunnel { println!("🔗 Starting {} tunnel...", tun.name()); match tun.start(host, actual_port).await { Ok(url) => { println!("🌐 Tunnel active: {url}"); tunnel_url = Some(url); } Err(e) => { println!("⚠️ Tunnel failed to start: {e}"); println!(" Falling back to local-only mode."); } } } println!("🦀 ZeroClaw Gateway listening on http://{display_addr}"); if let Some(ref url) = tunnel_url { println!(" 🌐 Public URL: {url}"); } println!(" POST /pair — pair a new client (X-Pairing-Code header)"); println!(" POST /webhook — {{\"message\": \"your prompt\"}}"); if whatsapp_channel.is_some() { println!(" GET /whatsapp — Meta webhook verification"); println!(" POST /whatsapp — WhatsApp message webhook"); } println!(" GET /health — health check"); if let Some(code) = pairing.pairing_code() { println!(); println!(" 🔐 PAIRING REQUIRED — use this one-time code:"); println!(" ┌──────────────┐"); println!(" │ {code} │"); println!(" └──────────────┘"); println!(" Send: POST /pair with header X-Pairing-Code: {code}"); } else if pairing.require_pairing() { println!(" 🔒 Pairing: ACTIVE (bearer token required)"); } else { println!(" ⚠️ Pairing: DISABLED (all requests accepted)"); } if webhook_secret.is_some() { println!(" 🔒 Webhook secret: ENABLED"); } println!(" Press Ctrl+C to stop.\n"); crate::health::mark_component_ok("gateway"); // Build shared state let state = AppState { provider, model, temperature, mem, auto_save: config.memory.auto_save, webhook_secret, pairing, whatsapp: whatsapp_channel, }; // Build router with middleware // Note: Body limit layer prevents memory exhaustion from oversized requests // Timeout is handled by tokio's TcpListener accept timeout and hyper's built-in timeouts let app = Router::new() .route("/health", get(handle_health)) .route("/pair", post(handle_pair)) .route("/webhook", post(handle_webhook)) .route("/whatsapp", get(handle_whatsapp_verify)) .route("/whatsapp", post(handle_whatsapp_message)) .with_state(state) .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE)); // Run the server axum::serve(listener, app).await?; Ok(()) } // ══════════════════════════════════════════════════════════════════════════════ // AXUM HANDLERS // ══════════════════════════════════════════════════════════════════════════════ /// GET /health — always public (no secrets leaked) async fn handle_health(State(state): State) -> impl IntoResponse { let body = serde_json::json!({ "status": "ok", "paired": state.pairing.is_paired(), "runtime": crate::health::snapshot_json(), }); Json(body) } /// POST /pair — exchange one-time code for bearer token async fn handle_pair(State(state): State, headers: HeaderMap) -> impl IntoResponse { let code = headers .get("X-Pairing-Code") .and_then(|v| v.to_str().ok()) .unwrap_or(""); match state.pairing.try_pair(code) { Ok(Some(token)) => { tracing::info!("🔐 New client paired successfully"); let body = serde_json::json!({ "paired": true, "token": token, "message": "Save this token — use it as Authorization: Bearer " }); (StatusCode::OK, Json(body)) } Ok(None) => { tracing::warn!("🔐 Pairing attempt with invalid code"); let err = serde_json::json!({"error": "Invalid pairing code"}); (StatusCode::FORBIDDEN, Json(err)) } Err(lockout_secs) => { tracing::warn!( "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)" ); let err = serde_json::json!({ "error": format!("Too many failed attempts. Try again in {lockout_secs}s."), "retry_after": lockout_secs }); (StatusCode::TOO_MANY_REQUESTS, Json(err)) } } } /// Webhook request body #[derive(serde::Deserialize)] pub struct WebhookBody { pub message: String, } /// POST /webhook — main webhook endpoint async fn handle_webhook( State(state): State, headers: HeaderMap, body: Result, axum::extract::rejection::JsonRejection>, ) -> impl IntoResponse { // ── Bearer token auth (pairing) ── if state.pairing.require_pairing() { let auth = headers .get(header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .unwrap_or(""); let token = auth.strip_prefix("Bearer ").unwrap_or(""); if !state.pairing.is_authenticated(token) { tracing::warn!("Webhook: rejected — not paired / invalid bearer token"); let err = serde_json::json!({ "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer " }); return (StatusCode::UNAUTHORIZED, Json(err)); } } // ── Webhook secret auth (optional, additional layer) ── if let Some(ref secret) = state.webhook_secret { let header_val = headers .get("X-Webhook-Secret") .and_then(|v| v.to_str().ok()); match header_val { Some(val) if constant_time_eq(val, secret.as_ref()) => {} _ => { tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret"); let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"}); return (StatusCode::UNAUTHORIZED, Json(err)); } } } // ── Parse body ── let Json(webhook_body) = match body { Ok(b) => b, Err(e) => { let err = serde_json::json!({ "error": format!("Invalid JSON: {e}. Expected: {{\"message\": \"...\"}}") }); return (StatusCode::BAD_REQUEST, Json(err)); } }; let message = &webhook_body.message; if state.auto_save { let _ = state .mem .store("webhook_msg", message, MemoryCategory::Conversation) .await; } match state .provider .chat(message, &state.model, state.temperature) .await { Ok(response) => { let body = serde_json::json!({"response": response, "model": state.model}); (StatusCode::OK, Json(body)) } Err(e) => { tracing::error!("LLM error: {e:#}"); let err = serde_json::json!({"error": "Internal error processing your request"}); (StatusCode::INTERNAL_SERVER_ERROR, Json(err)) } } } /// `WhatsApp` verification query params #[derive(serde::Deserialize)] pub struct WhatsAppVerifyQuery { #[serde(rename = "hub.mode")] pub mode: Option, #[serde(rename = "hub.verify_token")] pub verify_token: Option, #[serde(rename = "hub.challenge")] pub challenge: Option, } /// GET /whatsapp — Meta webhook verification async fn handle_whatsapp_verify( State(state): State, Query(params): Query, ) -> impl IntoResponse { let Some(ref wa) = state.whatsapp else { return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string()); }; // Verify the token matches if params.mode.as_deref() == Some("subscribe") && params.verify_token.as_deref() == Some(wa.verify_token()) { if let Some(ch) = params.challenge { tracing::info!("WhatsApp webhook verified successfully"); return (StatusCode::OK, ch); } return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string()); } tracing::warn!("WhatsApp webhook verification failed — token mismatch"); (StatusCode::FORBIDDEN, "Forbidden".to_string()) } /// POST /whatsapp — incoming message webhook async fn handle_whatsapp_message(State(state): State, body: Bytes) -> impl IntoResponse { let Some(ref wa) = state.whatsapp else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "WhatsApp not configured"})), ); }; // Parse JSON body let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; // Parse messages from the webhook payload let messages = wa.parse_webhook_payload(&payload); if messages.is_empty() { // Acknowledge the webhook even if no messages (could be status updates) return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); } // Process each message for msg in &messages { tracing::info!( "WhatsApp message from {}: {}", msg.sender, if msg.content.len() > 50 { format!("{}...", &msg.content[..50]) } else { msg.content.clone() } ); // Auto-save to memory if state.auto_save { let _ = state .mem .store( &format!("whatsapp_{}", msg.sender), &msg.content, MemoryCategory::Conversation, ) .await; } // Call the LLM match state .provider .chat(&msg.content, &state.model, state.temperature) .await { Ok(response) => { // Send reply via WhatsApp if let Err(e) = wa.send(&response, &msg.sender).await { tracing::error!("Failed to send WhatsApp reply: {e}"); } } Err(e) => { tracing::error!("LLM error for WhatsApp message: {e:#}"); let _ = wa .send("Sorry, I couldn't process your message right now.", &msg.sender) .await; } } } // Acknowledge the webhook (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } #[cfg(test)] mod tests { use super::*; #[test] fn security_body_limit_is_64kb() { assert_eq!(MAX_BODY_SIZE, 65_536); } #[test] fn security_timeout_is_30_seconds() { assert_eq!(REQUEST_TIMEOUT_SECS, 30); } #[test] fn webhook_body_requires_message_field() { let valid = r#"{"message": "hello"}"#; let parsed: Result = serde_json::from_str(valid); assert!(parsed.is_ok()); assert_eq!(parsed.unwrap().message, "hello"); let missing = r#"{"other": "field"}"#; let parsed: Result = serde_json::from_str(missing); assert!(parsed.is_err()); } #[test] fn whatsapp_query_fields_are_optional() { let q = WhatsAppVerifyQuery { mode: None, verify_token: None, challenge: None, }; assert!(q.mode.is_none()); } #[test] fn app_state_is_clone() { fn assert_clone() {} assert_clone::(); } }