feat(channels): add Linq channel for iMessage/RCS/SMS support

The existing iMessage channel relies on AppleScript and only works on macOS.
Linq provides a REST API for iMessage, RCS, and SMS — this gives ZeroClaw
native iMessage support on any platform via webhooks.

Implements LinqChannel following the same patterns as WhatsAppChannel:
- Channel trait impl (send, listen, health_check, typing indicators)
- Webhook handler with HMAC-SHA256 signature verification
- Sender allowlist filtering
- Onboarding wizard step with connection testing
- 18 unit tests covering parsing, auth, and signature verification

Resolves #656 — the prior issue was closed without a merged PR, so this
is the actual implementation.
This commit is contained in:
George McCain 2026-02-18 11:04:45 -05:00 committed by Chummy
parent e23edde44b
commit 361e750576
5 changed files with 1003 additions and 5 deletions

691
src/channels/linq.rs Normal file
View file

@ -0,0 +1,691 @@
use super::traits::{Channel, ChannelMessage, SendMessage};
use async_trait::async_trait;
use uuid::Uuid;
/// Linq channel — uses the Linq Partner V3 API for iMessage, RCS, and SMS.
///
/// This channel operates in webhook mode (push-based) rather than polling.
/// Messages are received via the gateway's `/linq` webhook endpoint.
/// The `listen` method here is a keepalive placeholder; actual message handling
/// happens in the gateway when Linq sends webhook events.
pub struct LinqChannel {
api_token: String,
from_phone: String,
allowed_senders: Vec<String>,
client: reqwest::Client,
}
const LINQ_API_BASE: &str = "https://api.linqapp.com/api/partner/v3";
impl LinqChannel {
pub fn new(api_token: String, from_phone: String, allowed_senders: Vec<String>) -> Self {
Self {
api_token,
from_phone,
allowed_senders,
client: reqwest::Client::new(),
}
}
/// Check if a sender phone number is allowed (E.164 format: +1234567890)
fn is_sender_allowed(&self, phone: &str) -> bool {
self.allowed_senders.iter().any(|n| n == "*" || n == phone)
}
/// Get the bot's phone number
pub fn phone_number(&self) -> &str {
&self.from_phone
}
/// Parse an incoming webhook payload from Linq and extract messages.
///
/// Linq webhook envelope:
/// ```json
/// {
/// "api_version": "v3",
/// "event_type": "message.received",
/// "event_id": "...",
/// "created_at": "...",
/// "trace_id": "...",
/// "data": {
/// "chat_id": "...",
/// "from": "+1...",
/// "recipient_phone": "+1...",
/// "is_from_me": false,
/// "service": "iMessage",
/// "message": {
/// "id": "...",
/// "parts": [{ "type": "text", "value": "..." }]
/// }
/// }
/// }
/// ```
pub fn parse_webhook_payload(&self, payload: &serde_json::Value) -> Vec<ChannelMessage> {
let mut messages = Vec::new();
// Only handle message.received events
let event_type = payload
.get("event_type")
.and_then(|e| e.as_str())
.unwrap_or("");
if event_type != "message.received" {
tracing::debug!("Linq: skipping non-message event: {event_type}");
return messages;
}
let Some(data) = payload.get("data") else {
return messages;
};
// Skip messages sent by the bot itself
if data
.get("is_from_me")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
tracing::debug!("Linq: skipping is_from_me message");
return messages;
}
// Get sender phone number
let Some(from) = data.get("from").and_then(|f| f.as_str()) else {
return messages;
};
// Normalize to E.164 format
let normalized_from = if from.starts_with('+') {
from.to_string()
} else {
format!("+{from}")
};
// Check allowlist
if !self.is_sender_allowed(&normalized_from) {
tracing::warn!(
"Linq: ignoring message from unauthorized sender: {normalized_from}. \
Add to allowed_senders in config.toml."
);
return messages;
}
// Get chat_id for reply routing
let chat_id = data
.get("chat_id")
.and_then(|c| c.as_str())
.unwrap_or("")
.to_string();
// Extract text from message parts
let Some(message) = data.get("message") else {
return messages;
};
let Some(parts) = message.get("parts").and_then(|p| p.as_array()) else {
return messages;
};
let text_parts: Vec<&str> = parts
.iter()
.filter_map(|part| {
let part_type = part.get("type").and_then(|t| t.as_str())?;
if part_type == "text" {
part.get("value").and_then(|v| v.as_str())
} else {
// Skip media parts for now
tracing::debug!("Linq: skipping {part_type} part");
None
}
})
.collect();
if text_parts.is_empty() {
return messages;
}
let content = text_parts.join("\n");
if content.is_empty() {
return messages;
}
// Get timestamp from created_at or use current time
let timestamp = payload
.get("created_at")
.and_then(|t| t.as_str())
.and_then(|t| {
chrono::DateTime::parse_from_rfc3339(t)
.ok()
.map(|dt| dt.timestamp().cast_unsigned())
})
.unwrap_or_else(|| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
});
// Use chat_id as reply_target so replies go to the right conversation
let reply_target = if chat_id.is_empty() {
normalized_from.clone()
} else {
chat_id
};
messages.push(ChannelMessage {
id: Uuid::new_v4().to_string(),
reply_target,
sender: normalized_from,
content,
channel: "linq".to_string(),
timestamp,
});
messages
}
}
#[async_trait]
impl Channel for LinqChannel {
fn name(&self) -> &str {
"linq"
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
// If reply_target looks like a chat_id, send to existing chat.
// Otherwise create a new chat with the recipient phone number.
let recipient = &message.recipient;
let body = serde_json::json!({
"message": {
"parts": [{
"type": "text",
"value": message.content
}]
}
});
// Try sending to existing chat (recipient is chat_id)
let url = format!("{LINQ_API_BASE}/chats/{recipient}/messages");
let resp = self
.client
.post(&url)
.bearer_auth(&self.api_token)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
// If the chat_id-based send failed with 404, try creating a new chat
if resp.status() == reqwest::StatusCode::NOT_FOUND {
let new_chat_body = serde_json::json!({
"from": self.from_phone,
"to": [recipient],
"message": {
"parts": [{
"type": "text",
"value": message.content
}]
}
});
let create_resp = self
.client
.post(format!("{LINQ_API_BASE}/chats"))
.bearer_auth(&self.api_token)
.header("Content-Type", "application/json")
.json(&new_chat_body)
.send()
.await?;
if !create_resp.status().is_success() {
let status = create_resp.status();
let error_body = create_resp.text().await.unwrap_or_default();
tracing::error!("Linq create chat failed: {status} — {error_body}");
anyhow::bail!("Linq API error: {status}");
}
return Ok(());
}
let status = resp.status();
let error_body = resp.text().await.unwrap_or_default();
tracing::error!("Linq send failed: {status} — {error_body}");
anyhow::bail!("Linq API error: {status}");
}
async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
// Linq uses webhooks (push-based), not polling.
// Messages are received via the gateway's /linq endpoint.
tracing::info!(
"Linq channel active (webhook mode). \
Configure Linq webhook to POST to your gateway's /linq endpoint."
);
// Keep the task alive — it will be cancelled when the channel shuts down
loop {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
async fn health_check(&self) -> bool {
// Check if we can reach the Linq API
let url = format!("{LINQ_API_BASE}/phonenumbers");
self.client
.get(&url)
.bearer_auth(&self.api_token)
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
}
async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> {
let url = format!("{LINQ_API_BASE}/chats/{recipient}/typing");
let resp = self
.client
.post(&url)
.bearer_auth(&self.api_token)
.send()
.await?;
if !resp.status().is_success() {
tracing::debug!("Linq start_typing failed: {}", resp.status());
}
Ok(())
}
async fn stop_typing(&self, recipient: &str) -> anyhow::Result<()> {
let url = format!("{LINQ_API_BASE}/chats/{recipient}/typing");
let resp = self
.client
.delete(&url)
.bearer_auth(&self.api_token)
.send()
.await?;
if !resp.status().is_success() {
tracing::debug!("Linq stop_typing failed: {}", resp.status());
}
Ok(())
}
}
/// Verify a Linq webhook signature.
///
/// Linq signs webhooks with HMAC-SHA256 over `"{timestamp}.{body}"`.
/// The signature is sent in `X-Webhook-Signature` (hex-encoded) and the
/// timestamp in `X-Webhook-Timestamp`. Reject timestamps older than 300s.
pub fn verify_linq_signature(secret: &str, body: &str, timestamp: &str, signature: &str) -> bool {
use hmac::{Hmac, Mac};
use sha2::Sha256;
// Reject stale timestamps (>300s old)
if let Ok(ts) = timestamp.parse::<i64>() {
let now = chrono::Utc::now().timestamp();
if (now - ts).unsigned_abs() > 300 {
tracing::warn!("Linq: rejecting stale webhook timestamp ({ts}, now={now})");
return false;
}
} else {
tracing::warn!("Linq: invalid webhook timestamp: {timestamp}");
return false;
}
// Compute HMAC-SHA256 over "{timestamp}.{body}"
let message = format!("{timestamp}.{body}");
let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(secret.as_bytes()) else {
return false;
};
mac.update(message.as_bytes());
let expected = hex::encode(mac.finalize().into_bytes());
// Constant-time comparison
crate::security::pairing::constant_time_eq(&expected, signature)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_channel() -> LinqChannel {
LinqChannel::new(
"test-token".into(),
"+15551234567".into(),
vec!["+1234567890".into()],
)
}
#[test]
fn linq_channel_name() {
let ch = make_channel();
assert_eq!(ch.name(), "linq");
}
#[test]
fn linq_sender_allowed_exact() {
let ch = make_channel();
assert!(ch.is_sender_allowed("+1234567890"));
assert!(!ch.is_sender_allowed("+9876543210"));
}
#[test]
fn linq_sender_allowed_wildcard() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
assert!(ch.is_sender_allowed("+1234567890"));
assert!(ch.is_sender_allowed("+9999999999"));
}
#[test]
fn linq_sender_allowed_empty() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec![]);
assert!(!ch.is_sender_allowed("+1234567890"));
}
#[test]
fn linq_parse_valid_text_message() {
let ch = make_channel();
let payload = serde_json::json!({
"api_version": "v3",
"event_type": "message.received",
"event_id": "evt-123",
"created_at": "2025-01-15T12:00:00Z",
"trace_id": "trace-456",
"data": {
"chat_id": "chat-789",
"from": "+1234567890",
"recipient_phone": "+15551234567",
"is_from_me": false,
"service": "iMessage",
"message": {
"id": "msg-abc",
"parts": [{
"type": "text",
"value": "Hello ZeroClaw!"
}]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].sender, "+1234567890");
assert_eq!(msgs[0].content, "Hello ZeroClaw!");
assert_eq!(msgs[0].channel, "linq");
assert_eq!(msgs[0].reply_target, "chat-789");
}
#[test]
fn linq_parse_skip_is_from_me() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"chat_id": "chat-789",
"from": "+1234567890",
"is_from_me": true,
"message": {
"id": "msg-abc",
"parts": [{ "type": "text", "value": "My own message" }]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty(), "is_from_me messages should be skipped");
}
#[test]
fn linq_parse_skip_non_message_event() {
let ch = make_channel();
let payload = serde_json::json!({
"event_type": "message.delivered",
"data": {
"chat_id": "chat-789",
"message_id": "msg-abc"
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty(), "Non-message events should be skipped");
}
#[test]
fn linq_parse_unauthorized_sender() {
let ch = make_channel();
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"chat_id": "chat-789",
"from": "+9999999999",
"is_from_me": false,
"message": {
"id": "msg-abc",
"parts": [{ "type": "text", "value": "Spam" }]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty(), "Unauthorized senders should be filtered");
}
#[test]
fn linq_parse_empty_payload() {
let ch = make_channel();
let payload = serde_json::json!({});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn linq_parse_media_only_skipped() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"chat_id": "chat-789",
"from": "+1234567890",
"is_from_me": false,
"message": {
"id": "msg-abc",
"parts": [{
"type": "media",
"url": "https://example.com/image.jpg",
"mime_type": "image/jpeg"
}]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty(), "Media-only messages should be skipped");
}
#[test]
fn linq_parse_multiple_text_parts() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"chat_id": "chat-789",
"from": "+1234567890",
"is_from_me": false,
"message": {
"id": "msg-abc",
"parts": [
{ "type": "text", "value": "First part" },
{ "type": "text", "value": "Second part" }
]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "First part\nSecond part");
}
#[test]
fn linq_signature_verification_valid() {
let secret = "test_webhook_secret";
let body = r#"{"event_type":"message.received"}"#;
let now = chrono::Utc::now().timestamp().to_string();
// Compute expected signature
use hmac::{Hmac, Mac};
use sha2::Sha256;
let message = format!("{now}.{body}");
let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
mac.update(message.as_bytes());
let signature = hex::encode(mac.finalize().into_bytes());
assert!(verify_linq_signature(secret, body, &now, &signature));
}
#[test]
fn linq_signature_verification_invalid() {
let secret = "test_webhook_secret";
let body = r#"{"event_type":"message.received"}"#;
let now = chrono::Utc::now().timestamp().to_string();
assert!(!verify_linq_signature(
secret,
body,
&now,
"deadbeefdeadbeefdeadbeef"
));
}
#[test]
fn linq_signature_verification_stale_timestamp() {
let secret = "test_webhook_secret";
let body = r#"{"event_type":"message.received"}"#;
// 10 minutes ago — stale
let stale_ts = (chrono::Utc::now().timestamp() - 600).to_string();
// Even with correct signature, stale timestamp should fail
use hmac::{Hmac, Mac};
use sha2::Sha256;
let message = format!("{stale_ts}.{body}");
let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
mac.update(message.as_bytes());
let signature = hex::encode(mac.finalize().into_bytes());
assert!(
!verify_linq_signature(secret, body, &stale_ts, &signature),
"Stale timestamps (>300s) should be rejected"
);
}
#[test]
fn linq_parse_normalizes_phone_with_plus() {
let ch = LinqChannel::new(
"tok".into(),
"+15551234567".into(),
vec!["+1234567890".into()],
);
// API sends without +, normalize to +
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"chat_id": "chat-789",
"from": "1234567890",
"is_from_me": false,
"message": {
"id": "msg-abc",
"parts": [{ "type": "text", "value": "Hi" }]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].sender, "+1234567890");
}
#[test]
fn linq_parse_missing_data() {
let ch = make_channel();
let payload = serde_json::json!({
"event_type": "message.received"
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn linq_parse_missing_message_parts() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"chat_id": "chat-789",
"from": "+1234567890",
"is_from_me": false,
"message": {
"id": "msg-abc"
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty());
}
#[test]
fn linq_parse_empty_text_value() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"chat_id": "chat-789",
"from": "+1234567890",
"is_from_me": false,
"message": {
"id": "msg-abc",
"parts": [{ "type": "text", "value": "" }]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty(), "Empty text should be skipped");
}
#[test]
fn linq_parse_fallback_reply_target_when_no_chat_id() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"data": {
"from": "+1234567890",
"is_from_me": false,
"message": {
"id": "msg-abc",
"parts": [{ "type": "text", "value": "Hi" }]
}
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
// Falls back to sender phone number when no chat_id
assert_eq!(msgs[0].reply_target, "+1234567890");
}
#[test]
fn linq_phone_number_accessor() {
let ch = make_channel();
assert_eq!(ch.phone_number(), "+15551234567");
}
}

View file

@ -5,6 +5,7 @@ pub mod email_channel;
pub mod imessage; pub mod imessage;
pub mod irc; pub mod irc;
pub mod lark; pub mod lark;
pub mod linq;
pub mod matrix; pub mod matrix;
pub mod mattermost; pub mod mattermost;
pub mod qq; pub mod qq;
@ -21,6 +22,7 @@ pub use email_channel::EmailChannel;
pub use imessage::IMessageChannel; pub use imessage::IMessageChannel;
pub use irc::IrcChannel; pub use irc::IrcChannel;
pub use lark::LarkChannel; pub use lark::LarkChannel;
pub use linq::LinqChannel;
pub use matrix::MatrixChannel; pub use matrix::MatrixChannel;
pub use mattermost::MattermostChannel; pub use mattermost::MattermostChannel;
pub use qq::QQChannel; pub use qq::QQChannel;
@ -1255,6 +1257,7 @@ pub fn handle_command(command: crate::ChannelCommands, config: &Config) -> Resul
("Matrix", config.channels_config.matrix.is_some()), ("Matrix", config.channels_config.matrix.is_some()),
("Signal", config.channels_config.signal.is_some()), ("Signal", config.channels_config.signal.is_some()),
("WhatsApp", config.channels_config.whatsapp.is_some()), ("WhatsApp", config.channels_config.whatsapp.is_some()),
("Linq", config.channels_config.linq.is_some()),
("Email", config.channels_config.email.is_some()), ("Email", config.channels_config.email.is_some()),
("IRC", config.channels_config.irc.is_some()), ("IRC", config.channels_config.irc.is_some()),
("Lark", config.channels_config.lark.is_some()), ("Lark", config.channels_config.lark.is_some()),
@ -1391,6 +1394,17 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
)); ));
} }
if let Some(ref lq) = config.channels_config.linq {
channels.push((
"Linq",
Arc::new(LinqChannel::new(
lq.api_token.clone(),
lq.from_phone.clone(),
lq.allowed_senders.clone(),
)),
));
}
if let Some(ref email_cfg) = config.channels_config.email { if let Some(ref email_cfg) = config.channels_config.email {
channels.push(("Email", Arc::new(EmailChannel::new(email_cfg.clone())))); channels.push(("Email", Arc::new(EmailChannel::new(email_cfg.clone()))));
} }
@ -1711,6 +1725,14 @@ pub async fn start_channels(config: Config) -> Result<()> {
))); )));
} }
if let Some(ref lq) = config.channels_config.linq {
channels.push(Arc::new(LinqChannel::new(
lq.api_token.clone(),
lq.from_phone.clone(),
lq.allowed_senders.clone(),
)));
}
if let Some(ref email_cfg) = config.channels_config.email { if let Some(ref email_cfg) = config.channels_config.email {
channels.push(Arc::new(EmailChannel::new(email_cfg.clone()))); channels.push(Arc::new(EmailChannel::new(email_cfg.clone())));
} }

View file

@ -1974,6 +1974,7 @@ pub struct ChannelsConfig {
pub matrix: Option<MatrixConfig>, pub matrix: Option<MatrixConfig>,
pub signal: Option<SignalConfig>, pub signal: Option<SignalConfig>,
pub whatsapp: Option<WhatsAppConfig>, pub whatsapp: Option<WhatsAppConfig>,
pub linq: Option<LinqConfig>,
pub email: Option<crate::channels::email_channel::EmailConfig>, pub email: Option<crate::channels::email_channel::EmailConfig>,
pub irc: Option<IrcConfig>, pub irc: Option<IrcConfig>,
pub lark: Option<LarkConfig>, pub lark: Option<LarkConfig>,
@ -2002,6 +2003,7 @@ impl Default for ChannelsConfig {
matrix: None, matrix: None,
signal: None, signal: None,
whatsapp: None, whatsapp: None,
linq: None,
email: None, email: None,
irc: None, irc: None,
lark: None, lark: None,
@ -2148,6 +2150,20 @@ pub struct WhatsAppConfig {
pub allowed_numbers: Vec<String>, pub allowed_numbers: Vec<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LinqConfig {
/// Linq Partner API token (Bearer auth)
pub api_token: String,
/// Phone number to send from (E.164 format)
pub from_phone: String,
/// Webhook signing secret for signature verification
#[serde(default)]
pub signing_secret: Option<String>,
/// Allowed sender handles (phone numbers) or "*" for all
#[serde(default)]
pub allowed_senders: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IrcConfig { pub struct IrcConfig {
/// IRC server hostname /// IRC server hostname
@ -3246,6 +3262,7 @@ default_temperature = 0.7
matrix: None, matrix: None,
signal: None, signal: None,
whatsapp: None, whatsapp: None,
linq: None,
email: None, email: None,
irc: None, irc: None,
lark: None, lark: None,
@ -3751,6 +3768,7 @@ allowed_users = ["@ops:matrix.org"]
}), }),
signal: None, signal: None,
whatsapp: None, whatsapp: None,
linq: None,
email: None, email: None,
irc: None, irc: None,
lark: None, lark: None,
@ -3915,6 +3933,7 @@ channel_id = "C123"
app_secret: None, app_secret: None,
allowed_numbers: vec!["+1".into()], allowed_numbers: vec!["+1".into()],
}), }),
linq: None,
email: None, email: None,
irc: None, irc: None,
lark: None, lark: None,

View file

@ -7,7 +7,7 @@
//! - Request timeouts (30s) to prevent slow-loris attacks //! - Request timeouts (30s) to prevent slow-loris attacks
//! - Header sanitization (handled by axum/hyper) //! - Header sanitization (handled by axum/hyper)
use crate::channels::{Channel, SendMessage, WhatsAppChannel}; use crate::channels::{Channel, LinqChannel, SendMessage, WhatsAppChannel};
use crate::config::Config; use crate::config::Config;
use crate::memory::{self, Memory, MemoryCategory}; use crate::memory::{self, Memory, MemoryCategory};
use crate::providers::{self, Provider}; use crate::providers::{self, Provider};
@ -53,6 +53,10 @@ fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String
format!("whatsapp_{}_{}", msg.sender, msg.id) format!("whatsapp_{}_{}", msg.sender, msg.id)
} }
fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
format!("linq_{}_{}", msg.sender, msg.id)
}
fn hash_webhook_secret(value: &str) -> String { fn hash_webhook_secret(value: &str) -> String {
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@ -274,6 +278,9 @@ pub struct AppState {
pub whatsapp: Option<Arc<WhatsAppChannel>>, pub whatsapp: Option<Arc<WhatsAppChannel>>,
/// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`) /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`)
pub whatsapp_app_secret: Option<Arc<str>>, pub whatsapp_app_secret: Option<Arc<str>>,
pub linq: Option<Arc<LinqChannel>>,
/// Linq webhook signing secret for signature verification
pub linq_signing_secret: Option<Arc<str>>,
/// Observability backend for metrics scraping /// Observability backend for metrics scraping
pub observer: Arc<dyn crate::observability::Observer>, pub observer: Arc<dyn crate::observability::Observer>,
} }
@ -389,6 +396,34 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
}) })
.map(Arc::from); .map(Arc::from);
// Linq channel (if configured)
let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
Arc::new(LinqChannel::new(
lq.api_token.clone(),
lq.from_phone.clone(),
lq.allowed_senders.clone(),
))
});
// Linq signing secret for webhook signature verification
// Priority: environment variable > config file
let linq_signing_secret: Option<Arc<str>> = std::env::var("ZEROCLAW_LINQ_SIGNING_SECRET")
.ok()
.and_then(|secret| {
let secret = secret.trim();
(!secret.is_empty()).then(|| secret.to_owned())
})
.or_else(|| {
config.channels_config.linq.as_ref().and_then(|lq| {
lq.signing_secret
.as_deref()
.map(str::trim)
.filter(|secret| !secret.is_empty())
.map(ToOwned::to_owned)
})
})
.map(Arc::from);
// ── Pairing guard ────────────────────────────────────── // ── Pairing guard ──────────────────────────────────────
let pairing = Arc::new(PairingGuard::new( let pairing = Arc::new(PairingGuard::new(
config.gateway.require_pairing, config.gateway.require_pairing,
@ -440,6 +475,9 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
println!(" GET /whatsapp — Meta webhook verification"); println!(" GET /whatsapp — Meta webhook verification");
println!(" POST /whatsapp — WhatsApp message webhook"); println!(" POST /whatsapp — WhatsApp message webhook");
} }
if linq_channel.is_some() {
println!(" POST /linq — Linq message webhook (iMessage/RCS/SMS)");
}
println!(" GET /health — health check"); println!(" GET /health — health check");
println!(" GET /metrics — Prometheus metrics"); println!(" GET /metrics — Prometheus metrics");
if let Some(code) = pairing.pairing_code() { if let Some(code) = pairing.pairing_code() {
@ -476,6 +514,8 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
idempotency_store, idempotency_store,
whatsapp: whatsapp_channel, whatsapp: whatsapp_channel,
whatsapp_app_secret, whatsapp_app_secret,
linq: linq_channel,
linq_signing_secret,
observer, observer,
}; };
@ -487,6 +527,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
.route("/webhook", post(handle_webhook)) .route("/webhook", post(handle_webhook))
.route("/whatsapp", get(handle_whatsapp_verify)) .route("/whatsapp", get(handle_whatsapp_verify))
.route("/whatsapp", post(handle_whatsapp_message)) .route("/whatsapp", post(handle_whatsapp_message))
.route("/linq", post(handle_linq_webhook))
.with_state(state) .with_state(state)
.layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE)) .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
.layer(TimeoutLayer::with_status_code( .layer(TimeoutLayer::with_status_code(
@ -967,6 +1008,118 @@ async fn handle_whatsapp_message(
(StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
} }
/// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq)
async fn handle_linq_webhook(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let Some(ref linq) = state.linq else {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Linq not configured"})),
);
};
let body_str = String::from_utf8_lossy(&body);
// ── Security: Verify X-Webhook-Signature if signing_secret is configured ──
if let Some(ref signing_secret) = state.linq_signing_secret {
let timestamp = headers
.get("X-Webhook-Timestamp")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let signature = headers
.get("X-Webhook-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !crate::channels::linq::verify_linq_signature(
signing_secret,
&body_str,
timestamp,
signature,
) {
tracing::warn!(
"Linq webhook signature verification failed (signature: {})",
if signature.is_empty() {
"missing"
} else {
"invalid"
}
);
return (
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({"error": "Invalid signature"})),
);
}
}
// Parse JSON body
let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "Invalid JSON payload"})),
);
};
// Parse messages from the webhook payload
let messages = linq.parse_webhook_payload(&payload);
if messages.is_empty() {
// Acknowledge the webhook even if no messages (could be status/delivery events)
return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
}
// Process each message
for msg in &messages {
tracing::info!(
"Linq message from {}: {}",
msg.sender,
truncate_with_ellipsis(&msg.content, 50)
);
// Auto-save to memory
if state.auto_save {
let key = linq_memory_key(msg);
let _ = state
.mem
.store(&key, &msg.content, MemoryCategory::Conversation, None)
.await;
}
// Call the LLM
match state
.provider
.simple_chat(&msg.content, &state.model, state.temperature)
.await
{
Ok(response) => {
// Send reply via Linq
if let Err(e) = linq
.send(&SendMessage::new(response, &msg.reply_target))
.await
{
tracing::error!("Failed to send Linq reply: {e}");
}
}
Err(e) => {
tracing::error!("LLM error for Linq message: {e:#}");
let _ = linq
.send(&SendMessage::new(
"Sorry, I couldn't process your message right now.",
&msg.reply_target,
))
.await;
}
}
}
// Acknowledge the webhook
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -1433,6 +1586,8 @@ mod tests {
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
whatsapp: None, whatsapp: None,
whatsapp_app_secret: None, whatsapp_app_secret: None,
linq: None,
linq_signing_secret: None,
observer: Arc::new(crate::observability::NoopObserver), observer: Arc::new(crate::observability::NoopObserver),
}; };
@ -1489,6 +1644,8 @@ mod tests {
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
whatsapp: None, whatsapp: None,
whatsapp_app_secret: None, whatsapp_app_secret: None,
linq: None,
linq_signing_secret: None,
observer: Arc::new(crate::observability::NoopObserver), observer: Arc::new(crate::observability::NoopObserver),
}; };
@ -1557,6 +1714,8 @@ mod tests {
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
whatsapp: None, whatsapp: None,
whatsapp_app_secret: None, whatsapp_app_secret: None,
linq: None,
linq_signing_secret: None,
observer: Arc::new(crate::observability::NoopObserver), observer: Arc::new(crate::observability::NoopObserver),
}; };
@ -1597,6 +1756,8 @@ mod tests {
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
whatsapp: None, whatsapp: None,
whatsapp_app_secret: None, whatsapp_app_secret: None,
linq: None,
linq_signing_secret: None,
observer: Arc::new(crate::observability::NoopObserver), observer: Arc::new(crate::observability::NoopObserver),
}; };
@ -1642,6 +1803,8 @@ mod tests {
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
whatsapp: None, whatsapp: None,
whatsapp_app_secret: None, whatsapp_app_secret: None,
linq: None,
linq_signing_secret: None,
observer: Arc::new(crate::observability::NoopObserver), observer: Arc::new(crate::observability::NoopObserver),
}; };

View file

@ -1,5 +1,5 @@
use crate::config::schema::{ use crate::config::schema::{
DingTalkConfig, IrcConfig, LarkReceiveMode, QQConfig, StreamMode, WhatsAppConfig, DingTalkConfig, IrcConfig, LarkReceiveMode, LinqConfig, QQConfig, StreamMode, WhatsAppConfig,
}; };
use crate::config::{ use crate::config::{
AutonomyConfig, BrowserConfig, ChannelsConfig, ComposioConfig, Config, DiscordConfig, AutonomyConfig, BrowserConfig, ChannelsConfig, ComposioConfig, Config, DiscordConfig,
@ -2504,6 +2504,14 @@ fn setup_channels() -> Result<ChannelsConfig> {
"— Business Cloud API" "— Business Cloud API"
} }
), ),
format!(
"Linq {}",
if config.linq.is_some() {
"✅ connected"
} else {
"— iMessage/RCS/SMS via Linq API"
}
),
format!( format!(
"IRC {}", "IRC {}",
if config.irc.is_some() { if config.irc.is_some() {
@ -3126,6 +3134,98 @@ fn setup_channels() -> Result<ChannelsConfig> {
}); });
} }
6 => { 6 => {
// ── Linq ──
println!();
println!(
" {} {}",
style("Linq Setup").white().bold(),
style("— iMessage/RCS/SMS via Linq API").dim()
);
print_bullet("1. Sign up at linqapp.com and get your Partner API token");
print_bullet("2. Note your Linq phone number (E.164 format)");
print_bullet("3. Configure webhook URL to: https://your-domain/linq");
println!();
let api_token: String = Input::new()
.with_prompt(" API token (Linq Partner API token)")
.interact_text()?;
if api_token.trim().is_empty() {
println!(" {} Skipped", style("").dim());
continue;
}
let from_phone: String = Input::new()
.with_prompt(" From phone number (E.164 format, e.g. +12223334444)")
.interact_text()?;
if from_phone.trim().is_empty() {
println!(" {} Skipped — phone number required", style("").dim());
continue;
}
// Test connection
print!(" {} Testing connection... ", style("").dim());
let api_token_clone = api_token.clone();
let thread_result = std::thread::spawn(move || {
let client = reqwest::blocking::Client::new();
let url = "https://api.linqapp.com/api/partner/v3/phonenumbers";
let resp = client
.get(url)
.header(
"Authorization",
format!("Bearer {}", api_token_clone.trim()),
)
.send()?;
Ok::<_, reqwest::Error>(resp.status().is_success())
})
.join();
match thread_result {
Ok(Ok(true)) => {
println!(
"\r {} Connected to Linq API ",
style("").green().bold()
);
}
_ => {
println!(
"\r {} Connection failed — check API token",
style("").red().bold()
);
continue;
}
}
let users_str: String = Input::new()
.with_prompt(
" Allowed sender numbers (comma-separated +1234567890, or * for all)",
)
.default("*".into())
.interact_text()?;
let allowed_senders = if users_str.trim() == "*" {
vec!["*".into()]
} else {
users_str.split(',').map(|s| s.trim().to_string()).collect()
};
let signing_secret: String = Input::new()
.with_prompt(" Webhook signing secret (optional, press Enter to skip)")
.allow_empty(true)
.interact_text()?;
config.linq = Some(LinqConfig {
api_token: api_token.trim().to_string(),
from_phone: from_phone.trim().to_string(),
signing_secret: if signing_secret.trim().is_empty() {
None
} else {
Some(signing_secret.trim().to_string())
},
allowed_senders,
});
}
7 => {
// ── IRC ── // ── IRC ──
println!(); println!();
println!( println!(
@ -3264,7 +3364,7 @@ fn setup_channels() -> Result<ChannelsConfig> {
verify_tls: Some(verify_tls), verify_tls: Some(verify_tls),
}); });
} }
7 => { 8 => {
// ── Webhook ── // ── Webhook ──
println!(); println!();
println!( println!(
@ -3297,7 +3397,7 @@ fn setup_channels() -> Result<ChannelsConfig> {
style(&port).cyan() style(&port).cyan()
); );
} }
8 => { 9 => {
// ── DingTalk ── // ── DingTalk ──
println!(); println!();
println!( println!(
@ -3367,7 +3467,7 @@ fn setup_channels() -> Result<ChannelsConfig> {
allowed_users, allowed_users,
}); });
} }
9 => { 10 => {
// ── QQ Official ── // ── QQ Official ──
println!(); println!();
println!( println!(
@ -3655,6 +3755,9 @@ fn setup_channels() -> Result<ChannelsConfig> {
if config.whatsapp.is_some() { if config.whatsapp.is_some() {
active.push("WhatsApp"); active.push("WhatsApp");
} }
if config.linq.is_some() {
active.push("Linq");
}
if config.email.is_some() { if config.email.is_some() {
active.push("Email"); active.push("Email");
} }