From 361e750576c40a913098c23bad6cd4728e48d2f7 Mon Sep 17 00:00:00 2001 From: George McCain Date: Wed, 18 Feb 2026 11:04:45 -0500 Subject: [PATCH] feat(channels): add Linq channel for iMessage/RCS/SMS support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing iMessage channel relies on AppleScript and only works on macOS. Linq provides a REST API for iMessage, RCS, and SMS — this gives ZeroClaw native iMessage support on any platform via webhooks. Implements LinqChannel following the same patterns as WhatsAppChannel: - Channel trait impl (send, listen, health_check, typing indicators) - Webhook handler with HMAC-SHA256 signature verification - Sender allowlist filtering - Onboarding wizard step with connection testing - 18 unit tests covering parsing, auth, and signature verification Resolves #656 — the prior issue was closed without a merged PR, so this is the actual implementation. --- src/channels/linq.rs | 691 ++++++++++++++++++++++++++++++++++++++++++ src/channels/mod.rs | 22 ++ src/config/schema.rs | 19 ++ src/gateway/mod.rs | 165 +++++++++- src/onboard/wizard.rs | 111 ++++++- 5 files changed, 1003 insertions(+), 5 deletions(-) create mode 100644 src/channels/linq.rs diff --git a/src/channels/linq.rs b/src/channels/linq.rs new file mode 100644 index 0000000..636184f --- /dev/null +++ b/src/channels/linq.rs @@ -0,0 +1,691 @@ +use super::traits::{Channel, ChannelMessage, SendMessage}; +use async_trait::async_trait; +use uuid::Uuid; + +/// Linq channel — uses the Linq Partner V3 API for iMessage, RCS, and SMS. +/// +/// This channel operates in webhook mode (push-based) rather than polling. +/// Messages are received via the gateway's `/linq` webhook endpoint. +/// The `listen` method here is a keepalive placeholder; actual message handling +/// happens in the gateway when Linq sends webhook events. +pub struct LinqChannel { + api_token: String, + from_phone: String, + allowed_senders: Vec, + client: reqwest::Client, +} + +const LINQ_API_BASE: &str = "https://api.linqapp.com/api/partner/v3"; + +impl LinqChannel { + pub fn new(api_token: String, from_phone: String, allowed_senders: Vec) -> Self { + Self { + api_token, + from_phone, + allowed_senders, + client: reqwest::Client::new(), + } + } + + /// Check if a sender phone number is allowed (E.164 format: +1234567890) + fn is_sender_allowed(&self, phone: &str) -> bool { + self.allowed_senders.iter().any(|n| n == "*" || n == phone) + } + + /// Get the bot's phone number + pub fn phone_number(&self) -> &str { + &self.from_phone + } + + /// Parse an incoming webhook payload from Linq and extract messages. + /// + /// Linq webhook envelope: + /// ```json + /// { + /// "api_version": "v3", + /// "event_type": "message.received", + /// "event_id": "...", + /// "created_at": "...", + /// "trace_id": "...", + /// "data": { + /// "chat_id": "...", + /// "from": "+1...", + /// "recipient_phone": "+1...", + /// "is_from_me": false, + /// "service": "iMessage", + /// "message": { + /// "id": "...", + /// "parts": [{ "type": "text", "value": "..." }] + /// } + /// } + /// } + /// ``` + pub fn parse_webhook_payload(&self, payload: &serde_json::Value) -> Vec { + let mut messages = Vec::new(); + + // Only handle message.received events + let event_type = payload + .get("event_type") + .and_then(|e| e.as_str()) + .unwrap_or(""); + if event_type != "message.received" { + tracing::debug!("Linq: skipping non-message event: {event_type}"); + return messages; + } + + let Some(data) = payload.get("data") else { + return messages; + }; + + // Skip messages sent by the bot itself + if data + .get("is_from_me") + .and_then(|v| v.as_bool()) + .unwrap_or(false) + { + tracing::debug!("Linq: skipping is_from_me message"); + return messages; + } + + // Get sender phone number + let Some(from) = data.get("from").and_then(|f| f.as_str()) else { + return messages; + }; + + // Normalize to E.164 format + let normalized_from = if from.starts_with('+') { + from.to_string() + } else { + format!("+{from}") + }; + + // Check allowlist + if !self.is_sender_allowed(&normalized_from) { + tracing::warn!( + "Linq: ignoring message from unauthorized sender: {normalized_from}. \ + Add to allowed_senders in config.toml." + ); + return messages; + } + + // Get chat_id for reply routing + let chat_id = data + .get("chat_id") + .and_then(|c| c.as_str()) + .unwrap_or("") + .to_string(); + + // Extract text from message parts + let Some(message) = data.get("message") else { + return messages; + }; + + let Some(parts) = message.get("parts").and_then(|p| p.as_array()) else { + return messages; + }; + + let text_parts: Vec<&str> = parts + .iter() + .filter_map(|part| { + let part_type = part.get("type").and_then(|t| t.as_str())?; + if part_type == "text" { + part.get("value").and_then(|v| v.as_str()) + } else { + // Skip media parts for now + tracing::debug!("Linq: skipping {part_type} part"); + None + } + }) + .collect(); + + if text_parts.is_empty() { + return messages; + } + + let content = text_parts.join("\n"); + + if content.is_empty() { + return messages; + } + + // Get timestamp from created_at or use current time + let timestamp = payload + .get("created_at") + .and_then(|t| t.as_str()) + .and_then(|t| { + chrono::DateTime::parse_from_rfc3339(t) + .ok() + .map(|dt| dt.timestamp().cast_unsigned()) + }) + .unwrap_or_else(|| { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + }); + + // Use chat_id as reply_target so replies go to the right conversation + let reply_target = if chat_id.is_empty() { + normalized_from.clone() + } else { + chat_id + }; + + messages.push(ChannelMessage { + id: Uuid::new_v4().to_string(), + reply_target, + sender: normalized_from, + content, + channel: "linq".to_string(), + timestamp, + }); + + messages + } +} + +#[async_trait] +impl Channel for LinqChannel { + fn name(&self) -> &str { + "linq" + } + + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + // If reply_target looks like a chat_id, send to existing chat. + // Otherwise create a new chat with the recipient phone number. + let recipient = &message.recipient; + + let body = serde_json::json!({ + "message": { + "parts": [{ + "type": "text", + "value": message.content + }] + } + }); + + // Try sending to existing chat (recipient is chat_id) + let url = format!("{LINQ_API_BASE}/chats/{recipient}/messages"); + + let resp = self + .client + .post(&url) + .bearer_auth(&self.api_token) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await?; + + if resp.status().is_success() { + return Ok(()); + } + + // If the chat_id-based send failed with 404, try creating a new chat + if resp.status() == reqwest::StatusCode::NOT_FOUND { + let new_chat_body = serde_json::json!({ + "from": self.from_phone, + "to": [recipient], + "message": { + "parts": [{ + "type": "text", + "value": message.content + }] + } + }); + + let create_resp = self + .client + .post(format!("{LINQ_API_BASE}/chats")) + .bearer_auth(&self.api_token) + .header("Content-Type", "application/json") + .json(&new_chat_body) + .send() + .await?; + + if !create_resp.status().is_success() { + let status = create_resp.status(); + let error_body = create_resp.text().await.unwrap_or_default(); + tracing::error!("Linq create chat failed: {status} — {error_body}"); + anyhow::bail!("Linq API error: {status}"); + } + + return Ok(()); + } + + let status = resp.status(); + let error_body = resp.text().await.unwrap_or_default(); + tracing::error!("Linq send failed: {status} — {error_body}"); + anyhow::bail!("Linq API error: {status}"); + } + + async fn listen(&self, _tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { + // Linq uses webhooks (push-based), not polling. + // Messages are received via the gateway's /linq endpoint. + tracing::info!( + "Linq channel active (webhook mode). \ + Configure Linq webhook to POST to your gateway's /linq endpoint." + ); + + // Keep the task alive — it will be cancelled when the channel shuts down + loop { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + } + } + + async fn health_check(&self) -> bool { + // Check if we can reach the Linq API + let url = format!("{LINQ_API_BASE}/phonenumbers"); + + self.client + .get(&url) + .bearer_auth(&self.api_token) + .send() + .await + .map(|r| r.status().is_success()) + .unwrap_or(false) + } + + async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> { + let url = format!("{LINQ_API_BASE}/chats/{recipient}/typing"); + + let resp = self + .client + .post(&url) + .bearer_auth(&self.api_token) + .send() + .await?; + + if !resp.status().is_success() { + tracing::debug!("Linq start_typing failed: {}", resp.status()); + } + + Ok(()) + } + + async fn stop_typing(&self, recipient: &str) -> anyhow::Result<()> { + let url = format!("{LINQ_API_BASE}/chats/{recipient}/typing"); + + let resp = self + .client + .delete(&url) + .bearer_auth(&self.api_token) + .send() + .await?; + + if !resp.status().is_success() { + tracing::debug!("Linq stop_typing failed: {}", resp.status()); + } + + Ok(()) + } +} + +/// Verify a Linq webhook signature. +/// +/// Linq signs webhooks with HMAC-SHA256 over `"{timestamp}.{body}"`. +/// The signature is sent in `X-Webhook-Signature` (hex-encoded) and the +/// timestamp in `X-Webhook-Timestamp`. Reject timestamps older than 300s. +pub fn verify_linq_signature(secret: &str, body: &str, timestamp: &str, signature: &str) -> bool { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + // Reject stale timestamps (>300s old) + if let Ok(ts) = timestamp.parse::() { + let now = chrono::Utc::now().timestamp(); + if (now - ts).unsigned_abs() > 300 { + tracing::warn!("Linq: rejecting stale webhook timestamp ({ts}, now={now})"); + return false; + } + } else { + tracing::warn!("Linq: invalid webhook timestamp: {timestamp}"); + return false; + } + + // Compute HMAC-SHA256 over "{timestamp}.{body}" + let message = format!("{timestamp}.{body}"); + let Ok(mut mac) = Hmac::::new_from_slice(secret.as_bytes()) else { + return false; + }; + mac.update(message.as_bytes()); + let expected = hex::encode(mac.finalize().into_bytes()); + + // Constant-time comparison + crate::security::pairing::constant_time_eq(&expected, signature) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_channel() -> LinqChannel { + LinqChannel::new( + "test-token".into(), + "+15551234567".into(), + vec!["+1234567890".into()], + ) + } + + #[test] + fn linq_channel_name() { + let ch = make_channel(); + assert_eq!(ch.name(), "linq"); + } + + #[test] + fn linq_sender_allowed_exact() { + let ch = make_channel(); + assert!(ch.is_sender_allowed("+1234567890")); + assert!(!ch.is_sender_allowed("+9876543210")); + } + + #[test] + fn linq_sender_allowed_wildcard() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]); + assert!(ch.is_sender_allowed("+1234567890")); + assert!(ch.is_sender_allowed("+9999999999")); + } + + #[test] + fn linq_sender_allowed_empty() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec![]); + assert!(!ch.is_sender_allowed("+1234567890")); + } + + #[test] + fn linq_parse_valid_text_message() { + let ch = make_channel(); + let payload = serde_json::json!({ + "api_version": "v3", + "event_type": "message.received", + "event_id": "evt-123", + "created_at": "2025-01-15T12:00:00Z", + "trace_id": "trace-456", + "data": { + "chat_id": "chat-789", + "from": "+1234567890", + "recipient_phone": "+15551234567", + "is_from_me": false, + "service": "iMessage", + "message": { + "id": "msg-abc", + "parts": [{ + "type": "text", + "value": "Hello ZeroClaw!" + }] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sender, "+1234567890"); + assert_eq!(msgs[0].content, "Hello ZeroClaw!"); + assert_eq!(msgs[0].channel, "linq"); + assert_eq!(msgs[0].reply_target, "chat-789"); + } + + #[test] + fn linq_parse_skip_is_from_me() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]); + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "chat_id": "chat-789", + "from": "+1234567890", + "is_from_me": true, + "message": { + "id": "msg-abc", + "parts": [{ "type": "text", "value": "My own message" }] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "is_from_me messages should be skipped"); + } + + #[test] + fn linq_parse_skip_non_message_event() { + let ch = make_channel(); + let payload = serde_json::json!({ + "event_type": "message.delivered", + "data": { + "chat_id": "chat-789", + "message_id": "msg-abc" + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "Non-message events should be skipped"); + } + + #[test] + fn linq_parse_unauthorized_sender() { + let ch = make_channel(); + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "chat_id": "chat-789", + "from": "+9999999999", + "is_from_me": false, + "message": { + "id": "msg-abc", + "parts": [{ "type": "text", "value": "Spam" }] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "Unauthorized senders should be filtered"); + } + + #[test] + fn linq_parse_empty_payload() { + let ch = make_channel(); + let payload = serde_json::json!({}); + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn linq_parse_media_only_skipped() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]); + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "chat_id": "chat-789", + "from": "+1234567890", + "is_from_me": false, + "message": { + "id": "msg-abc", + "parts": [{ + "type": "media", + "url": "https://example.com/image.jpg", + "mime_type": "image/jpeg" + }] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "Media-only messages should be skipped"); + } + + #[test] + fn linq_parse_multiple_text_parts() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]); + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "chat_id": "chat-789", + "from": "+1234567890", + "is_from_me": false, + "message": { + "id": "msg-abc", + "parts": [ + { "type": "text", "value": "First part" }, + { "type": "text", "value": "Second part" } + ] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "First part\nSecond part"); + } + + #[test] + fn linq_signature_verification_valid() { + let secret = "test_webhook_secret"; + let body = r#"{"event_type":"message.received"}"#; + let now = chrono::Utc::now().timestamp().to_string(); + + // Compute expected signature + use hmac::{Hmac, Mac}; + use sha2::Sha256; + let message = format!("{now}.{body}"); + let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(message.as_bytes()); + let signature = hex::encode(mac.finalize().into_bytes()); + + assert!(verify_linq_signature(secret, body, &now, &signature)); + } + + #[test] + fn linq_signature_verification_invalid() { + let secret = "test_webhook_secret"; + let body = r#"{"event_type":"message.received"}"#; + let now = chrono::Utc::now().timestamp().to_string(); + + assert!(!verify_linq_signature( + secret, + body, + &now, + "deadbeefdeadbeefdeadbeef" + )); + } + + #[test] + fn linq_signature_verification_stale_timestamp() { + let secret = "test_webhook_secret"; + let body = r#"{"event_type":"message.received"}"#; + // 10 minutes ago — stale + let stale_ts = (chrono::Utc::now().timestamp() - 600).to_string(); + + // Even with correct signature, stale timestamp should fail + use hmac::{Hmac, Mac}; + use sha2::Sha256; + let message = format!("{stale_ts}.{body}"); + let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(message.as_bytes()); + let signature = hex::encode(mac.finalize().into_bytes()); + + assert!( + !verify_linq_signature(secret, body, &stale_ts, &signature), + "Stale timestamps (>300s) should be rejected" + ); + } + + #[test] + fn linq_parse_normalizes_phone_with_plus() { + let ch = LinqChannel::new( + "tok".into(), + "+15551234567".into(), + vec!["+1234567890".into()], + ); + // API sends without +, normalize to + + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "chat_id": "chat-789", + "from": "1234567890", + "is_from_me": false, + "message": { + "id": "msg-abc", + "parts": [{ "type": "text", "value": "Hi" }] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sender, "+1234567890"); + } + + #[test] + fn linq_parse_missing_data() { + let ch = make_channel(); + let payload = serde_json::json!({ + "event_type": "message.received" + }); + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn linq_parse_missing_message_parts() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]); + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "chat_id": "chat-789", + "from": "+1234567890", + "is_from_me": false, + "message": { + "id": "msg-abc" + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn linq_parse_empty_text_value() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]); + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "chat_id": "chat-789", + "from": "+1234567890", + "is_from_me": false, + "message": { + "id": "msg-abc", + "parts": [{ "type": "text", "value": "" }] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "Empty text should be skipped"); + } + + #[test] + fn linq_parse_fallback_reply_target_when_no_chat_id() { + let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]); + let payload = serde_json::json!({ + "event_type": "message.received", + "data": { + "from": "+1234567890", + "is_from_me": false, + "message": { + "id": "msg-abc", + "parts": [{ "type": "text", "value": "Hi" }] + } + } + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + // Falls back to sender phone number when no chat_id + assert_eq!(msgs[0].reply_target, "+1234567890"); + } + + #[test] + fn linq_phone_number_accessor() { + let ch = make_channel(); + assert_eq!(ch.phone_number(), "+15551234567"); + } +} diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 75544f4..088ad73 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -5,6 +5,7 @@ pub mod email_channel; pub mod imessage; pub mod irc; pub mod lark; +pub mod linq; pub mod matrix; pub mod mattermost; pub mod qq; @@ -21,6 +22,7 @@ pub use email_channel::EmailChannel; pub use imessage::IMessageChannel; pub use irc::IrcChannel; pub use lark::LarkChannel; +pub use linq::LinqChannel; pub use matrix::MatrixChannel; pub use mattermost::MattermostChannel; pub use qq::QQChannel; @@ -1255,6 +1257,7 @@ pub fn handle_command(command: crate::ChannelCommands, config: &Config) -> Resul ("Matrix", config.channels_config.matrix.is_some()), ("Signal", config.channels_config.signal.is_some()), ("WhatsApp", config.channels_config.whatsapp.is_some()), + ("Linq", config.channels_config.linq.is_some()), ("Email", config.channels_config.email.is_some()), ("IRC", config.channels_config.irc.is_some()), ("Lark", config.channels_config.lark.is_some()), @@ -1391,6 +1394,17 @@ pub async fn doctor_channels(config: Config) -> Result<()> { )); } + if let Some(ref lq) = config.channels_config.linq { + channels.push(( + "Linq", + Arc::new(LinqChannel::new( + lq.api_token.clone(), + lq.from_phone.clone(), + lq.allowed_senders.clone(), + )), + )); + } + if let Some(ref email_cfg) = config.channels_config.email { channels.push(("Email", Arc::new(EmailChannel::new(email_cfg.clone())))); } @@ -1711,6 +1725,14 @@ pub async fn start_channels(config: Config) -> Result<()> { ))); } + if let Some(ref lq) = config.channels_config.linq { + channels.push(Arc::new(LinqChannel::new( + lq.api_token.clone(), + lq.from_phone.clone(), + lq.allowed_senders.clone(), + ))); + } + if let Some(ref email_cfg) = config.channels_config.email { channels.push(Arc::new(EmailChannel::new(email_cfg.clone()))); } diff --git a/src/config/schema.rs b/src/config/schema.rs index 7803da4..c1293ac 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -1974,6 +1974,7 @@ pub struct ChannelsConfig { pub matrix: Option, pub signal: Option, pub whatsapp: Option, + pub linq: Option, pub email: Option, pub irc: Option, pub lark: Option, @@ -2002,6 +2003,7 @@ impl Default for ChannelsConfig { matrix: None, signal: None, whatsapp: None, + linq: None, email: None, irc: None, lark: None, @@ -2148,6 +2150,20 @@ pub struct WhatsAppConfig { pub allowed_numbers: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LinqConfig { + /// Linq Partner API token (Bearer auth) + pub api_token: String, + /// Phone number to send from (E.164 format) + pub from_phone: String, + /// Webhook signing secret for signature verification + #[serde(default)] + pub signing_secret: Option, + /// Allowed sender handles (phone numbers) or "*" for all + #[serde(default)] + pub allowed_senders: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IrcConfig { /// IRC server hostname @@ -3246,6 +3262,7 @@ default_temperature = 0.7 matrix: None, signal: None, whatsapp: None, + linq: None, email: None, irc: None, lark: None, @@ -3751,6 +3768,7 @@ allowed_users = ["@ops:matrix.org"] }), signal: None, whatsapp: None, + linq: None, email: None, irc: None, lark: None, @@ -3915,6 +3933,7 @@ channel_id = "C123" app_secret: None, allowed_numbers: vec!["+1".into()], }), + linq: None, email: None, irc: None, lark: None, diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 68f6400..4f8246c 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -7,7 +7,7 @@ //! - Request timeouts (30s) to prevent slow-loris attacks //! - Header sanitization (handled by axum/hyper) -use crate::channels::{Channel, SendMessage, WhatsAppChannel}; +use crate::channels::{Channel, LinqChannel, SendMessage, WhatsAppChannel}; use crate::config::Config; use crate::memory::{self, Memory, MemoryCategory}; use crate::providers::{self, Provider}; @@ -53,6 +53,10 @@ fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String format!("whatsapp_{}_{}", msg.sender, msg.id) } +fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { + format!("linq_{}_{}", msg.sender, msg.id) +} + fn hash_webhook_secret(value: &str) -> String { use sha2::{Digest, Sha256}; @@ -274,6 +278,9 @@ pub struct AppState { pub whatsapp: Option>, /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`) pub whatsapp_app_secret: Option>, + pub linq: Option>, + /// Linq webhook signing secret for signature verification + pub linq_signing_secret: Option>, /// Observability backend for metrics scraping pub observer: Arc, } @@ -389,6 +396,34 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { }) .map(Arc::from); + // Linq channel (if configured) + let linq_channel: Option> = config.channels_config.linq.as_ref().map(|lq| { + Arc::new(LinqChannel::new( + lq.api_token.clone(), + lq.from_phone.clone(), + lq.allowed_senders.clone(), + )) + }); + + // Linq signing secret for webhook signature verification + // Priority: environment variable > config file + let linq_signing_secret: Option> = std::env::var("ZEROCLAW_LINQ_SIGNING_SECRET") + .ok() + .and_then(|secret| { + let secret = secret.trim(); + (!secret.is_empty()).then(|| secret.to_owned()) + }) + .or_else(|| { + config.channels_config.linq.as_ref().and_then(|lq| { + lq.signing_secret + .as_deref() + .map(str::trim) + .filter(|secret| !secret.is_empty()) + .map(ToOwned::to_owned) + }) + }) + .map(Arc::from); + // ── Pairing guard ────────────────────────────────────── let pairing = Arc::new(PairingGuard::new( config.gateway.require_pairing, @@ -440,6 +475,9 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { println!(" GET /whatsapp — Meta webhook verification"); println!(" POST /whatsapp — WhatsApp message webhook"); } + if linq_channel.is_some() { + println!(" POST /linq — Linq message webhook (iMessage/RCS/SMS)"); + } println!(" GET /health — health check"); println!(" GET /metrics — Prometheus metrics"); if let Some(code) = pairing.pairing_code() { @@ -476,6 +514,8 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { idempotency_store, whatsapp: whatsapp_channel, whatsapp_app_secret, + linq: linq_channel, + linq_signing_secret, observer, }; @@ -487,6 +527,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { .route("/webhook", post(handle_webhook)) .route("/whatsapp", get(handle_whatsapp_verify)) .route("/whatsapp", post(handle_whatsapp_message)) + .route("/linq", post(handle_linq_webhook)) .with_state(state) .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE)) .layer(TimeoutLayer::with_status_code( @@ -967,6 +1008,118 @@ async fn handle_whatsapp_message( (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } +/// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq) +async fn handle_linq_webhook( + State(state): State, + headers: HeaderMap, + body: Bytes, +) -> impl IntoResponse { + let Some(ref linq) = state.linq else { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "Linq not configured"})), + ); + }; + + let body_str = String::from_utf8_lossy(&body); + + // ── Security: Verify X-Webhook-Signature if signing_secret is configured ── + if let Some(ref signing_secret) = state.linq_signing_secret { + let timestamp = headers + .get("X-Webhook-Timestamp") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + let signature = headers + .get("X-Webhook-Signature") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + if !crate::channels::linq::verify_linq_signature( + signing_secret, + &body_str, + timestamp, + signature, + ) { + tracing::warn!( + "Linq webhook signature verification failed (signature: {})", + if signature.is_empty() { + "missing" + } else { + "invalid" + } + ); + return ( + StatusCode::UNAUTHORIZED, + Json(serde_json::json!({"error": "Invalid signature"})), + ); + } + } + + // Parse JSON body + let Ok(payload) = serde_json::from_slice::(&body) else { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid JSON payload"})), + ); + }; + + // Parse messages from the webhook payload + let messages = linq.parse_webhook_payload(&payload); + + if messages.is_empty() { + // Acknowledge the webhook even if no messages (could be status/delivery events) + return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); + } + + // Process each message + for msg in &messages { + tracing::info!( + "Linq message from {}: {}", + msg.sender, + truncate_with_ellipsis(&msg.content, 50) + ); + + // Auto-save to memory + if state.auto_save { + let key = linq_memory_key(msg); + let _ = state + .mem + .store(&key, &msg.content, MemoryCategory::Conversation, None) + .await; + } + + // Call the LLM + match state + .provider + .simple_chat(&msg.content, &state.model, state.temperature) + .await + { + Ok(response) => { + // Send reply via Linq + if let Err(e) = linq + .send(&SendMessage::new(response, &msg.reply_target)) + .await + { + tracing::error!("Failed to send Linq reply: {e}"); + } + } + Err(e) => { + tracing::error!("LLM error for Linq message: {e:#}"); + let _ = linq + .send(&SendMessage::new( + "Sorry, I couldn't process your message right now.", + &msg.reply_target, + )) + .await; + } + } + } + + // Acknowledge the webhook + (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) +} + #[cfg(test)] mod tests { use super::*; @@ -1433,6 +1586,8 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, observer: Arc::new(crate::observability::NoopObserver), }; @@ -1489,6 +1644,8 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, observer: Arc::new(crate::observability::NoopObserver), }; @@ -1557,6 +1714,8 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, observer: Arc::new(crate::observability::NoopObserver), }; @@ -1597,6 +1756,8 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, observer: Arc::new(crate::observability::NoopObserver), }; @@ -1642,6 +1803,8 @@ mod tests { idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, observer: Arc::new(crate::observability::NoopObserver), }; diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index 99f1860..31df793 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -1,5 +1,5 @@ use crate::config::schema::{ - DingTalkConfig, IrcConfig, LarkReceiveMode, QQConfig, StreamMode, WhatsAppConfig, + DingTalkConfig, IrcConfig, LarkReceiveMode, LinqConfig, QQConfig, StreamMode, WhatsAppConfig, }; use crate::config::{ AutonomyConfig, BrowserConfig, ChannelsConfig, ComposioConfig, Config, DiscordConfig, @@ -2504,6 +2504,14 @@ fn setup_channels() -> Result { "— Business Cloud API" } ), + format!( + "Linq {}", + if config.linq.is_some() { + "✅ connected" + } else { + "— iMessage/RCS/SMS via Linq API" + } + ), format!( "IRC {}", if config.irc.is_some() { @@ -3126,6 +3134,98 @@ fn setup_channels() -> Result { }); } 6 => { + // ── Linq ── + println!(); + println!( + " {} {}", + style("Linq Setup").white().bold(), + style("— iMessage/RCS/SMS via Linq API").dim() + ); + print_bullet("1. Sign up at linqapp.com and get your Partner API token"); + print_bullet("2. Note your Linq phone number (E.164 format)"); + print_bullet("3. Configure webhook URL to: https://your-domain/linq"); + println!(); + + let api_token: String = Input::new() + .with_prompt(" API token (Linq Partner API token)") + .interact_text()?; + + if api_token.trim().is_empty() { + println!(" {} Skipped", style("→").dim()); + continue; + } + + let from_phone: String = Input::new() + .with_prompt(" From phone number (E.164 format, e.g. +12223334444)") + .interact_text()?; + + if from_phone.trim().is_empty() { + println!(" {} Skipped — phone number required", style("→").dim()); + continue; + } + + // Test connection + print!(" {} Testing connection... ", style("⏳").dim()); + let api_token_clone = api_token.clone(); + let thread_result = std::thread::spawn(move || { + let client = reqwest::blocking::Client::new(); + let url = "https://api.linqapp.com/api/partner/v3/phonenumbers"; + let resp = client + .get(url) + .header( + "Authorization", + format!("Bearer {}", api_token_clone.trim()), + ) + .send()?; + Ok::<_, reqwest::Error>(resp.status().is_success()) + }) + .join(); + match thread_result { + Ok(Ok(true)) => { + println!( + "\r {} Connected to Linq API ", + style("✅").green().bold() + ); + } + _ => { + println!( + "\r {} Connection failed — check API token", + style("❌").red().bold() + ); + continue; + } + } + + let users_str: String = Input::new() + .with_prompt( + " Allowed sender numbers (comma-separated +1234567890, or * for all)", + ) + .default("*".into()) + .interact_text()?; + + let allowed_senders = if users_str.trim() == "*" { + vec!["*".into()] + } else { + users_str.split(',').map(|s| s.trim().to_string()).collect() + }; + + let signing_secret: String = Input::new() + .with_prompt(" Webhook signing secret (optional, press Enter to skip)") + .allow_empty(true) + .interact_text()?; + + config.linq = Some(LinqConfig { + api_token: api_token.trim().to_string(), + from_phone: from_phone.trim().to_string(), + signing_secret: if signing_secret.trim().is_empty() { + None + } else { + Some(signing_secret.trim().to_string()) + }, + allowed_senders, + }); + } + 7 => { // ── IRC ── println!(); println!( @@ -3264,7 +3364,7 @@ fn setup_channels() -> Result { verify_tls: Some(verify_tls), }); } - 7 => { + 8 => { // ── Webhook ── println!(); println!( @@ -3297,7 +3397,7 @@ fn setup_channels() -> Result { style(&port).cyan() ); } - 8 => { + 9 => { // ── DingTalk ── println!(); println!( @@ -3367,7 +3467,7 @@ fn setup_channels() -> Result { allowed_users, }); } - 9 => { + 10 => { // ── QQ Official ── println!(); println!( @@ -3655,6 +3755,9 @@ fn setup_channels() -> Result { if config.whatsapp.is_some() { active.push("WhatsApp"); } + if config.linq.is_some() { + active.push("Linq"); + } if config.email.is_some() { active.push("Email"); }