diff --git a/src/channels/signal.rs b/src/channels/signal.rs index 62e958e..3bcaf56 100644 --- a/src/channels/signal.rs +++ b/src/channels/signal.rs @@ -3,9 +3,18 @@ use async_trait::async_trait; use futures_util::StreamExt; use reqwest::Client; use serde::Deserialize; +use std::time::Duration; use tokio::sync::mpsc; use uuid::Uuid; +const GROUP_TARGET_PREFIX: &str = "group:"; + +#[derive(Debug, Clone, PartialEq, Eq)] +enum RecipientTarget { + Direct(String), + Group(String), +} + /// Signal channel using signal-cli daemon's native JSON-RPC + SSE API. /// /// Connects to a running `signal-cli daemon --http `. @@ -73,7 +82,7 @@ impl SignalChannel { ) -> Self { let http_url = http_url.trim_end_matches('/').to_string(); let client = Client::builder() - .timeout(std::time::Duration::from_secs(30)) + .connect_timeout(Duration::from_secs(10)) .build() .expect("Signal HTTP client should build"); Self { @@ -103,6 +112,25 @@ impl SignalChannel { self.allowed_from.iter().any(|u| u == sender) } + fn is_e164(recipient: &str) -> bool { + let Some(number) = recipient.strip_prefix('+') else { + return false; + }; + (2..=15).contains(&number.len()) && number.chars().all(|c| c.is_ascii_digit()) + } + + fn parse_recipient_target(recipient: &str) -> RecipientTarget { + if let Some(group_id) = recipient.strip_prefix(GROUP_TARGET_PREFIX) { + return RecipientTarget::Group(group_id.to_string()); + } + + if Self::is_e164(recipient) { + RecipientTarget::Direct(recipient.to_string()) + } else { + RecipientTarget::Group(recipient.to_string()) + } + } + /// Check whether the message targets the configured group. /// If no `group_id` is configured (None), all DMs and groups are accepted. /// Use "dm" to filter DMs only. @@ -122,11 +150,15 @@ impl SignalChannel { /// Determine the send target: group id or the sender's number. fn reply_target(&self, data_msg: &DataMessage, sender: &str) -> String { - data_msg + if let Some(group_id) = data_msg .group_info .as_ref() - .and_then(|g| g.group_id.clone()) - .unwrap_or_else(|| sender.to_string()) + .and_then(|g| g.group_id.as_deref()) + { + format!("{GROUP_TARGET_PREFIX}{group_id}") + } else { + sender.to_string() + } } /// Send a JSON-RPC request to signal-cli daemon. @@ -148,6 +180,7 @@ impl SignalChannel { let resp = self .client .post(&url) + .timeout(Duration::from_secs(30)) .header("Content-Type", "application/json") .json(&body) .send() @@ -210,10 +243,13 @@ impl SignalChannel { .timestamp .or(envelope.timestamp) .unwrap_or_else(|| { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64 + u64::try_from( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis(), + ) + .unwrap_or(u64::MAX) }); Some(ChannelMessage { @@ -234,20 +270,17 @@ impl Channel for SignalChannel { } async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { - let params = if recipient.starts_with('+') { - // DM - serde_json::json!({ - "recipient": [recipient], + let params = match Self::parse_recipient_target(recipient) { + RecipientTarget::Direct(number) => serde_json::json!({ + "recipient": [number], "message": message, "account": self.account, - }) - } else { - // Group - serde_json::json!({ - "groupId": recipient, + }), + RecipientTarget::Group(group_id) => serde_json::json!({ + "groupId": group_id, "message": message, "account": self.account, - }) + }), }; self.rpc_request("send", params).await?; @@ -258,11 +291,7 @@ impl Channel for SignalChannel { let mut url = reqwest::Url::parse(&format!("{}/api/v1/events", self.http_url))?; url.query_pairs_mut().append_pair("account", &self.account); - tracing::info!( - "Signal channel listening via SSE on {} (account {})...", - self.http_url, - self.account - ); + tracing::info!("Signal channel listening via SSE on {}...", self.http_url); let mut retry_delay_secs = 2u64; let max_delay_secs = 60u64; @@ -378,17 +407,29 @@ impl Channel for SignalChannel { async fn health_check(&self) -> bool { let url = format!("{}/api/v1/check", self.http_url); - let Ok(resp) = self.client.get(&url).send().await else { + let Ok(resp) = self + .client + .get(&url) + .timeout(Duration::from_secs(10)) + .send() + .await + else { return false; }; resp.status().is_success() } async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> { - let params = serde_json::json!({ - "recipient": [recipient], - "account": self.account, - }); + let params = match Self::parse_recipient_target(recipient) { + RecipientTarget::Direct(number) => serde_json::json!({ + "recipient": [number], + "account": self.account, + }), + RecipientTarget::Group(group_id) => serde_json::json!({ + "groupId": group_id, + "account": self.account, + }), + }; self.rpc_request("sendTyping", params).await?; Ok(()) } @@ -432,12 +473,12 @@ mod tests { source_number: source_number.map(String::from), data_message: message.map(|m| DataMessage { message: Some(m.to_string()), - timestamp: Some(1700000000000), + timestamp: Some(1_700_000_000_000), group_info: None, attachments: None, }), story_message: None, - timestamp: Some(1700000000000), + timestamp: Some(1_700_000_000_000), } } @@ -593,7 +634,31 @@ mod tests { }), attachments: None, }; - assert_eq!(ch.reply_target(&group, "+1111111111"), "group123"); + assert_eq!(ch.reply_target(&group, "+1111111111"), "group:group123"); + } + + #[test] + fn parse_recipient_target_e164_is_direct() { + assert_eq!( + SignalChannel::parse_recipient_target("+1234567890"), + RecipientTarget::Direct("+1234567890".to_string()) + ); + } + + #[test] + fn parse_recipient_target_prefixed_group_is_group() { + assert_eq!( + SignalChannel::parse_recipient_target("group:abc123"), + RecipientTarget::Group("abc123".to_string()) + ); + } + + #[test] + fn parse_recipient_target_non_e164_plus_is_group() { + assert_eq!( + SignalChannel::parse_recipient_target("+abc123"), + RecipientTarget::Group("+abc123".to_string()) + ); } #[test] @@ -679,12 +744,12 @@ mod tests { source_number: Some("+1111111111".to_string()), data_message: Some(DataMessage { message: None, - timestamp: Some(1700000000000), + timestamp: Some(1_700_000_000_000), group_info: None, attachments: Some(vec![serde_json::json!({"contentType": "image/png"})]), }), story_message: None, - timestamp: Some(1700000000000), + timestamp: Some(1_700_000_000_000), }; assert!(ch.process_envelope(&env).is_none()); }