fix(channel/signal): harden target routing and SSE stability

This commit is contained in:
Chummy 2026-02-17 22:32:05 +08:00
parent 55f2637cfe
commit 767c66f3c8

View file

@ -3,9 +3,18 @@ use async_trait::async_trait;
use futures_util::StreamExt; use futures_util::StreamExt;
use reqwest::Client; use reqwest::Client;
use serde::Deserialize; use serde::Deserialize;
use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
const GROUP_TARGET_PREFIX: &str = "group:";
#[derive(Debug, Clone, PartialEq, Eq)]
enum RecipientTarget {
Direct(String),
Group(String),
}
/// Signal channel using signal-cli daemon's native JSON-RPC + SSE API. /// Signal channel using signal-cli daemon's native JSON-RPC + SSE API.
/// ///
/// Connects to a running `signal-cli daemon --http <host:port>`. /// Connects to a running `signal-cli daemon --http <host:port>`.
@ -73,7 +82,7 @@ impl SignalChannel {
) -> Self { ) -> Self {
let http_url = http_url.trim_end_matches('/').to_string(); let http_url = http_url.trim_end_matches('/').to_string();
let client = Client::builder() let client = Client::builder()
.timeout(std::time::Duration::from_secs(30)) .connect_timeout(Duration::from_secs(10))
.build() .build()
.expect("Signal HTTP client should build"); .expect("Signal HTTP client should build");
Self { Self {
@ -103,6 +112,25 @@ impl SignalChannel {
self.allowed_from.iter().any(|u| u == sender) self.allowed_from.iter().any(|u| u == sender)
} }
fn is_e164(recipient: &str) -> bool {
let Some(number) = recipient.strip_prefix('+') else {
return false;
};
(2..=15).contains(&number.len()) && number.chars().all(|c| c.is_ascii_digit())
}
fn parse_recipient_target(recipient: &str) -> RecipientTarget {
if let Some(group_id) = recipient.strip_prefix(GROUP_TARGET_PREFIX) {
return RecipientTarget::Group(group_id.to_string());
}
if Self::is_e164(recipient) {
RecipientTarget::Direct(recipient.to_string())
} else {
RecipientTarget::Group(recipient.to_string())
}
}
/// Check whether the message targets the configured group. /// Check whether the message targets the configured group.
/// If no `group_id` is configured (None), all DMs and groups are accepted. /// If no `group_id` is configured (None), all DMs and groups are accepted.
/// Use "dm" to filter DMs only. /// Use "dm" to filter DMs only.
@ -122,11 +150,15 @@ impl SignalChannel {
/// Determine the send target: group id or the sender's number. /// Determine the send target: group id or the sender's number.
fn reply_target(&self, data_msg: &DataMessage, sender: &str) -> String { fn reply_target(&self, data_msg: &DataMessage, sender: &str) -> String {
data_msg if let Some(group_id) = data_msg
.group_info .group_info
.as_ref() .as_ref()
.and_then(|g| g.group_id.clone()) .and_then(|g| g.group_id.as_deref())
.unwrap_or_else(|| sender.to_string()) {
format!("{GROUP_TARGET_PREFIX}{group_id}")
} else {
sender.to_string()
}
} }
/// Send a JSON-RPC request to signal-cli daemon. /// Send a JSON-RPC request to signal-cli daemon.
@ -148,6 +180,7 @@ impl SignalChannel {
let resp = self let resp = self
.client .client
.post(&url) .post(&url)
.timeout(Duration::from_secs(30))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.json(&body) .json(&body)
.send() .send()
@ -210,10 +243,13 @@ impl SignalChannel {
.timestamp .timestamp
.or(envelope.timestamp) .or(envelope.timestamp)
.unwrap_or_else(|| { .unwrap_or_else(|| {
u64::try_from(
std::time::SystemTime::now() std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
.as_millis() as u64 .as_millis(),
)
.unwrap_or(u64::MAX)
}); });
Some(ChannelMessage { Some(ChannelMessage {
@ -234,20 +270,17 @@ impl Channel for SignalChannel {
} }
async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> { async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()> {
let params = if recipient.starts_with('+') { let params = match Self::parse_recipient_target(recipient) {
// DM RecipientTarget::Direct(number) => serde_json::json!({
serde_json::json!({ "recipient": [number],
"recipient": [recipient],
"message": message, "message": message,
"account": self.account, "account": self.account,
}) }),
} else { RecipientTarget::Group(group_id) => serde_json::json!({
// Group "groupId": group_id,
serde_json::json!({
"groupId": recipient,
"message": message, "message": message,
"account": self.account, "account": self.account,
}) }),
}; };
self.rpc_request("send", params).await?; self.rpc_request("send", params).await?;
@ -258,11 +291,7 @@ impl Channel for SignalChannel {
let mut url = reqwest::Url::parse(&format!("{}/api/v1/events", self.http_url))?; let mut url = reqwest::Url::parse(&format!("{}/api/v1/events", self.http_url))?;
url.query_pairs_mut().append_pair("account", &self.account); url.query_pairs_mut().append_pair("account", &self.account);
tracing::info!( tracing::info!("Signal channel listening via SSE on {}...", self.http_url);
"Signal channel listening via SSE on {} (account {})...",
self.http_url,
self.account
);
let mut retry_delay_secs = 2u64; let mut retry_delay_secs = 2u64;
let max_delay_secs = 60u64; let max_delay_secs = 60u64;
@ -378,17 +407,29 @@ impl Channel for SignalChannel {
async fn health_check(&self) -> bool { async fn health_check(&self) -> bool {
let url = format!("{}/api/v1/check", self.http_url); let url = format!("{}/api/v1/check", self.http_url);
let Ok(resp) = self.client.get(&url).send().await else { let Ok(resp) = self
.client
.get(&url)
.timeout(Duration::from_secs(10))
.send()
.await
else {
return false; return false;
}; };
resp.status().is_success() resp.status().is_success()
} }
async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> { async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> {
let params = serde_json::json!({ let params = match Self::parse_recipient_target(recipient) {
"recipient": [recipient], RecipientTarget::Direct(number) => serde_json::json!({
"recipient": [number],
"account": self.account, "account": self.account,
}); }),
RecipientTarget::Group(group_id) => serde_json::json!({
"groupId": group_id,
"account": self.account,
}),
};
self.rpc_request("sendTyping", params).await?; self.rpc_request("sendTyping", params).await?;
Ok(()) Ok(())
} }
@ -432,12 +473,12 @@ mod tests {
source_number: source_number.map(String::from), source_number: source_number.map(String::from),
data_message: message.map(|m| DataMessage { data_message: message.map(|m| DataMessage {
message: Some(m.to_string()), message: Some(m.to_string()),
timestamp: Some(1700000000000), timestamp: Some(1_700_000_000_000),
group_info: None, group_info: None,
attachments: None, attachments: None,
}), }),
story_message: None, story_message: None,
timestamp: Some(1700000000000), timestamp: Some(1_700_000_000_000),
} }
} }
@ -593,7 +634,31 @@ mod tests {
}), }),
attachments: None, attachments: None,
}; };
assert_eq!(ch.reply_target(&group, "+1111111111"), "group123"); assert_eq!(ch.reply_target(&group, "+1111111111"), "group:group123");
}
#[test]
fn parse_recipient_target_e164_is_direct() {
assert_eq!(
SignalChannel::parse_recipient_target("+1234567890"),
RecipientTarget::Direct("+1234567890".to_string())
);
}
#[test]
fn parse_recipient_target_prefixed_group_is_group() {
assert_eq!(
SignalChannel::parse_recipient_target("group:abc123"),
RecipientTarget::Group("abc123".to_string())
);
}
#[test]
fn parse_recipient_target_non_e164_plus_is_group() {
assert_eq!(
SignalChannel::parse_recipient_target("+abc123"),
RecipientTarget::Group("+abc123".to_string())
);
} }
#[test] #[test]
@ -679,12 +744,12 @@ mod tests {
source_number: Some("+1111111111".to_string()), source_number: Some("+1111111111".to_string()),
data_message: Some(DataMessage { data_message: Some(DataMessage {
message: None, message: None,
timestamp: Some(1700000000000), timestamp: Some(1_700_000_000_000),
group_info: None, group_info: None,
attachments: Some(vec![serde_json::json!({"contentType": "image/png"})]), attachments: Some(vec![serde_json::json!({"contentType": "image/png"})]),
}), }),
story_message: None, story_message: None,
timestamp: Some(1700000000000), timestamp: Some(1_700_000_000_000),
}; };
assert!(ch.process_envelope(&env).is_none()); assert!(ch.process_envelope(&env).is_none());
} }