feat(matrix): enable e2ee flow and add channel operations docs
This commit is contained in:
parent
e6029e8cec
commit
18b6ea1e79
12 changed files with 2827 additions and 289 deletions
|
|
@ -1,17 +1,34 @@
|
|||
use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
|
||||
use async_trait::async_trait;
|
||||
use matrix_sdk::{
|
||||
authentication::matrix::MatrixSession,
|
||||
config::SyncSettings,
|
||||
ruma::{
|
||||
events::room::message::{
|
||||
MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent,
|
||||
},
|
||||
OwnedRoomId, OwnedUserId,
|
||||
},
|
||||
Client as MatrixSdkClient, LoopCtrl, Room, RoomState, SessionMeta, SessionTokens,
|
||||
};
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::mpsc;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, Mutex, OnceCell, RwLock};
|
||||
|
||||
/// Matrix channel using the Client-Server API (no SDK needed).
|
||||
/// Connects to any Matrix homeserver (Element, Synapse, etc.).
|
||||
/// Matrix channel for Matrix Client-Server API.
|
||||
/// Uses matrix-sdk for reliable sync and encrypted-room decryption.
|
||||
#[derive(Clone)]
|
||||
pub struct MatrixChannel {
|
||||
homeserver: String,
|
||||
access_token: String,
|
||||
room_id: String,
|
||||
allowed_users: Vec<String>,
|
||||
session_user_id_hint: Option<String>,
|
||||
session_device_id_hint: Option<String>,
|
||||
resolved_room_id_cache: Arc<RwLock<Option<String>>>,
|
||||
sdk_client: Arc<OnceCell<MatrixSdkClient>>,
|
||||
http_client: Client,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
|
@ -45,6 +62,8 @@ struct TimelineEvent {
|
|||
event_type: String,
|
||||
sender: String,
|
||||
#[serde(default)]
|
||||
event_id: Option<String>,
|
||||
#[serde(default)]
|
||||
content: EventContent,
|
||||
}
|
||||
|
||||
|
|
@ -59,47 +78,150 @@ struct EventContent {
|
|||
#[derive(Debug, Deserialize)]
|
||||
struct WhoAmIResponse {
|
||||
user_id: String,
|
||||
#[serde(default)]
|
||||
device_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RoomAliasResponse {
|
||||
room_id: String,
|
||||
}
|
||||
|
||||
impl MatrixChannel {
|
||||
fn normalize_optional_field(value: Option<String>) -> Option<String> {
|
||||
value
|
||||
.map(|entry| entry.trim().to_string())
|
||||
.filter(|entry| !entry.is_empty())
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
homeserver: String,
|
||||
access_token: String,
|
||||
room_id: String,
|
||||
allowed_users: Vec<String>,
|
||||
) -> Self {
|
||||
let homeserver = if homeserver.ends_with('/') {
|
||||
homeserver[..homeserver.len() - 1].to_string()
|
||||
} else {
|
||||
homeserver
|
||||
};
|
||||
Self::new_with_session_hint(homeserver, access_token, room_id, allowed_users, None, None)
|
||||
}
|
||||
|
||||
pub fn new_with_session_hint(
|
||||
homeserver: String,
|
||||
access_token: String,
|
||||
room_id: String,
|
||||
allowed_users: Vec<String>,
|
||||
user_id_hint: Option<String>,
|
||||
device_id_hint: Option<String>,
|
||||
) -> Self {
|
||||
let homeserver = homeserver.trim_end_matches('/').to_string();
|
||||
let access_token = access_token.trim().to_string();
|
||||
let room_id = room_id.trim().to_string();
|
||||
let allowed_users = allowed_users
|
||||
.into_iter()
|
||||
.map(|user| user.trim().to_string())
|
||||
.filter(|user| !user.is_empty())
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
homeserver,
|
||||
access_token,
|
||||
room_id,
|
||||
allowed_users,
|
||||
session_user_id_hint: Self::normalize_optional_field(user_id_hint),
|
||||
session_device_id_hint: Self::normalize_optional_field(device_id_hint),
|
||||
resolved_room_id_cache: Arc::new(RwLock::new(None)),
|
||||
sdk_client: Arc::new(OnceCell::new()),
|
||||
http_client: Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn http_client(&self) -> Client {
|
||||
crate::config::build_runtime_proxy_client("channel.matrix")
|
||||
fn encode_path_segment(value: &str) -> String {
|
||||
fn should_encode(byte: u8) -> bool {
|
||||
!matches!(
|
||||
byte,
|
||||
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~'
|
||||
)
|
||||
}
|
||||
|
||||
let mut encoded = String::with_capacity(value.len());
|
||||
for byte in value.bytes() {
|
||||
if should_encode(byte) {
|
||||
use std::fmt::Write;
|
||||
let _ = write!(&mut encoded, "%{byte:02X}");
|
||||
} else {
|
||||
encoded.push(byte as char);
|
||||
}
|
||||
}
|
||||
|
||||
encoded
|
||||
}
|
||||
|
||||
fn auth_header_value(&self) -> String {
|
||||
format!("Bearer {}", self.access_token)
|
||||
}
|
||||
|
||||
fn is_user_allowed(&self, sender: &str) -> bool {
|
||||
if self.allowed_users.iter().any(|u| u == "*") {
|
||||
return true;
|
||||
}
|
||||
self.allowed_users
|
||||
.iter()
|
||||
.any(|u| u.eq_ignore_ascii_case(sender))
|
||||
Self::is_sender_allowed(&self.allowed_users, sender)
|
||||
}
|
||||
|
||||
async fn get_my_user_id(&self) -> anyhow::Result<String> {
|
||||
fn is_sender_allowed(allowed_users: &[String], sender: &str) -> bool {
|
||||
if allowed_users.iter().any(|u| u == "*") {
|
||||
return true;
|
||||
}
|
||||
|
||||
allowed_users.iter().any(|u| u.eq_ignore_ascii_case(sender))
|
||||
}
|
||||
|
||||
fn is_supported_message_type(msgtype: &str) -> bool {
|
||||
matches!(msgtype, "m.text" | "m.notice")
|
||||
}
|
||||
|
||||
fn has_non_empty_body(body: &str) -> bool {
|
||||
!body.trim().is_empty()
|
||||
}
|
||||
|
||||
fn cache_event_id(
|
||||
event_id: &str,
|
||||
recent_order: &mut std::collections::VecDeque<String>,
|
||||
recent_lookup: &mut std::collections::HashSet<String>,
|
||||
) -> bool {
|
||||
const MAX_RECENT_EVENT_IDS: usize = 2048;
|
||||
|
||||
if recent_lookup.contains(event_id) {
|
||||
return true;
|
||||
}
|
||||
|
||||
let event_id_owned = event_id.to_string();
|
||||
recent_lookup.insert(event_id_owned.clone());
|
||||
recent_order.push_back(event_id_owned);
|
||||
|
||||
if recent_order.len() > MAX_RECENT_EVENT_IDS {
|
||||
if let Some(evicted) = recent_order.pop_front() {
|
||||
recent_lookup.remove(&evicted);
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
async fn target_room_id(&self) -> anyhow::Result<String> {
|
||||
if self.room_id.starts_with('!') {
|
||||
return Ok(self.room_id.clone());
|
||||
}
|
||||
|
||||
if let Some(cached) = self.resolved_room_id_cache.read().await.clone() {
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
let resolved = self.resolve_room_id().await?;
|
||||
*self.resolved_room_id_cache.write().await = Some(resolved.clone());
|
||||
Ok(resolved)
|
||||
}
|
||||
|
||||
async fn get_my_identity(&self) -> anyhow::Result<WhoAmIResponse> {
|
||||
let url = format!("{}/_matrix/client/v3/account/whoami", self.homeserver);
|
||||
let resp = self
|
||||
.http_client()
|
||||
.http_client
|
||||
.get(&url)
|
||||
.header("Authorization", format!("Bearer {}", self.access_token))
|
||||
.header("Authorization", self.auth_header_value())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
|
|
@ -108,8 +230,213 @@ impl MatrixChannel {
|
|||
anyhow::bail!("Matrix whoami failed: {err}");
|
||||
}
|
||||
|
||||
let who: WhoAmIResponse = resp.json().await?;
|
||||
Ok(who.user_id)
|
||||
Ok(resp.json().await?)
|
||||
}
|
||||
|
||||
async fn get_my_user_id(&self) -> anyhow::Result<String> {
|
||||
Ok(self.get_my_identity().await?.user_id)
|
||||
}
|
||||
|
||||
async fn matrix_client(&self) -> anyhow::Result<MatrixSdkClient> {
|
||||
let client = self
|
||||
.sdk_client
|
||||
.get_or_try_init(|| async {
|
||||
let identity = self.get_my_identity().await;
|
||||
let whoami = match identity {
|
||||
Ok(whoami) => Some(whoami),
|
||||
Err(error) => {
|
||||
if self.session_user_id_hint.is_some() && self.session_device_id_hint.is_some()
|
||||
{
|
||||
tracing::warn!(
|
||||
"Matrix whoami failed; falling back to configured session hints for E2EE session restore: {error}"
|
||||
);
|
||||
None
|
||||
} else {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let resolved_user_id = if let Some(whoami) = whoami.as_ref() {
|
||||
if let Some(hinted) = self.session_user_id_hint.as_ref() {
|
||||
if hinted != &whoami.user_id {
|
||||
tracing::warn!(
|
||||
"Matrix configured user_id '{}' does not match whoami '{}'; using whoami.",
|
||||
hinted,
|
||||
whoami.user_id
|
||||
);
|
||||
}
|
||||
}
|
||||
whoami.user_id.clone()
|
||||
} else {
|
||||
self.session_user_id_hint.clone().ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Matrix session restore requires user_id when whoami is unavailable"
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
let resolved_device_id = match (whoami.as_ref(), self.session_device_id_hint.as_ref()) {
|
||||
(Some(whoami), Some(hinted)) => {
|
||||
if let Some(whoami_device_id) = whoami.device_id.as_ref() {
|
||||
if whoami_device_id != hinted {
|
||||
tracing::warn!(
|
||||
"Matrix configured device_id '{}' does not match whoami '{}'; using whoami.",
|
||||
hinted,
|
||||
whoami_device_id
|
||||
);
|
||||
}
|
||||
whoami_device_id.clone()
|
||||
} else {
|
||||
hinted.clone()
|
||||
}
|
||||
}
|
||||
(Some(whoami), None) => whoami.device_id.clone().ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Matrix whoami response did not include device_id. Set channels.matrix.device_id to enable E2EE session restore."
|
||||
)
|
||||
})?,
|
||||
(None, Some(hinted)) => hinted.clone(),
|
||||
(None, None) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Matrix E2EE session restore requires device_id when whoami is unavailable"
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let client = MatrixSdkClient::builder()
|
||||
.homeserver_url(&self.homeserver)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
let user_id: OwnedUserId = resolved_user_id.parse()?;
|
||||
let session = MatrixSession {
|
||||
meta: SessionMeta {
|
||||
user_id,
|
||||
device_id: resolved_device_id.into(),
|
||||
},
|
||||
tokens: SessionTokens {
|
||||
access_token: self.access_token.clone(),
|
||||
refresh_token: None,
|
||||
},
|
||||
};
|
||||
|
||||
client.restore_session(session).await?;
|
||||
|
||||
Ok::<MatrixSdkClient, anyhow::Error>(client)
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(client.clone())
|
||||
}
|
||||
|
||||
async fn resolve_room_id(&self) -> anyhow::Result<String> {
|
||||
let configured = self.room_id.trim();
|
||||
|
||||
if configured.starts_with('!') {
|
||||
return Ok(configured.to_string());
|
||||
}
|
||||
|
||||
if configured.starts_with('#') {
|
||||
let encoded_alias = Self::encode_path_segment(configured);
|
||||
let url = format!(
|
||||
"{}/_matrix/client/v3/directory/room/{}",
|
||||
self.homeserver, encoded_alias
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.http_client
|
||||
.get(&url)
|
||||
.header("Authorization", self.auth_header_value())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let err = resp.text().await.unwrap_or_default();
|
||||
anyhow::bail!("Matrix room alias resolution failed for '{configured}': {err}");
|
||||
}
|
||||
|
||||
let resolved: RoomAliasResponse = resp.json().await?;
|
||||
return Ok(resolved.room_id);
|
||||
}
|
||||
|
||||
anyhow::bail!(
|
||||
"Matrix room reference must start with '!' (room ID) or '#' (room alias), got: {configured}"
|
||||
)
|
||||
}
|
||||
|
||||
async fn ensure_room_accessible(&self, room_id: &str) -> anyhow::Result<()> {
|
||||
let encoded_room = Self::encode_path_segment(room_id);
|
||||
let url = format!(
|
||||
"{}/_matrix/client/v3/rooms/{}/joined_members",
|
||||
self.homeserver, encoded_room
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.http_client
|
||||
.get(&url)
|
||||
.header("Authorization", self.auth_header_value())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let err = resp.text().await.unwrap_or_default();
|
||||
anyhow::bail!("Matrix room access check failed for '{room_id}': {err}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn room_is_encrypted(&self, room_id: &str) -> anyhow::Result<bool> {
|
||||
let encoded_room = Self::encode_path_segment(room_id);
|
||||
let url = format!(
|
||||
"{}/_matrix/client/v3/rooms/{}/state/m.room.encryption",
|
||||
self.homeserver, encoded_room
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.http_client
|
||||
.get(&url)
|
||||
.header("Authorization", self.auth_header_value())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if resp.status().is_success() {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
if resp.status() == reqwest::StatusCode::NOT_FOUND {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let err = resp.text().await.unwrap_or_default();
|
||||
anyhow::bail!("Matrix room encryption check failed for '{room_id}': {err}");
|
||||
}
|
||||
|
||||
async fn ensure_room_supported(&self, room_id: &str) -> anyhow::Result<()> {
|
||||
self.ensure_room_accessible(room_id).await?;
|
||||
|
||||
if self.room_is_encrypted(room_id).await? {
|
||||
tracing::info!(
|
||||
"Matrix room {} is encrypted; E2EE decryption is enabled via matrix-sdk.",
|
||||
room_id
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync_filter_for_room(room_id: &str, timeline_limit: usize) -> String {
|
||||
let timeline_limit = timeline_limit.max(1);
|
||||
serde_json::json!({
|
||||
"room": {
|
||||
"rooms": [room_id],
|
||||
"timeline": {
|
||||
"limit": timeline_limit
|
||||
}
|
||||
}
|
||||
})
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -120,148 +447,157 @@ impl Channel for MatrixChannel {
|
|||
}
|
||||
|
||||
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
|
||||
let txn_id = format!("zc_{}", chrono::Utc::now().timestamp_millis());
|
||||
let url = format!(
|
||||
"{}/_matrix/client/v3/rooms/{}/send/m.room.message/{}",
|
||||
self.homeserver, self.room_id, txn_id
|
||||
);
|
||||
let client = self.matrix_client().await?;
|
||||
let target_room_id = self.target_room_id().await?;
|
||||
let target_room: OwnedRoomId = target_room_id.parse()?;
|
||||
|
||||
let body = serde_json::json!({
|
||||
"msgtype": "m.text",
|
||||
"body": message.content
|
||||
});
|
||||
|
||||
let resp = self
|
||||
.http_client()
|
||||
.put(&url)
|
||||
.header("Authorization", format!("Bearer {}", self.access_token))
|
||||
.json(&body)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let err = resp.text().await?;
|
||||
anyhow::bail!("Matrix send failed: {err}");
|
||||
let mut room = client.get_room(&target_room);
|
||||
if room.is_none() {
|
||||
let _ = client.sync_once(SyncSettings::new()).await;
|
||||
room = client.get_room(&target_room);
|
||||
}
|
||||
|
||||
let Some(room) = room else {
|
||||
anyhow::bail!("Matrix room '{}' not found in joined rooms", target_room_id);
|
||||
};
|
||||
|
||||
if room.state() != RoomState::Joined {
|
||||
anyhow::bail!("Matrix room '{}' is not in joined state", target_room_id);
|
||||
}
|
||||
|
||||
room.send(RoomMessageEventContent::text_plain(&message.content))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
|
||||
tracing::info!("Matrix channel listening on room {}...", self.room_id);
|
||||
let target_room_id = self.target_room_id().await?;
|
||||
self.ensure_room_supported(&target_room_id).await?;
|
||||
|
||||
let my_user_id = self.get_my_user_id().await?;
|
||||
let target_room: OwnedRoomId = target_room_id.parse()?;
|
||||
let my_user_id: OwnedUserId = match self.get_my_user_id().await {
|
||||
Ok(user_id) => user_id.parse()?,
|
||||
Err(error) => {
|
||||
if let Some(hinted) = self.session_user_id_hint.as_ref() {
|
||||
tracing::warn!(
|
||||
"Matrix whoami failed while resolving listener user_id; using configured user_id hint: {error}"
|
||||
);
|
||||
hinted.parse()?
|
||||
} else {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
};
|
||||
let client = self.matrix_client().await?;
|
||||
|
||||
// Initial sync to get the since token
|
||||
let url = format!(
|
||||
"{}/_matrix/client/v3/sync?timeout=30000&filter={{\"room\":{{\"timeline\":{{\"limit\":1}}}}}}",
|
||||
self.homeserver
|
||||
let _ = client.sync_once(SyncSettings::new()).await;
|
||||
|
||||
tracing::info!(
|
||||
"Matrix channel listening on room {} (configured as {})...",
|
||||
target_room_id,
|
||||
self.room_id
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.http_client()
|
||||
.get(&url)
|
||||
.header("Authorization", format!("Bearer {}", self.access_token))
|
||||
.send()
|
||||
let recent_event_cache = Arc::new(Mutex::new((
|
||||
std::collections::VecDeque::new(),
|
||||
std::collections::HashSet::new(),
|
||||
)));
|
||||
|
||||
let tx_handler = tx.clone();
|
||||
let target_room_for_handler = target_room.clone();
|
||||
let my_user_id_for_handler = my_user_id.clone();
|
||||
let allowed_users_for_handler = self.allowed_users.clone();
|
||||
let dedupe_for_handler = Arc::clone(&recent_event_cache);
|
||||
|
||||
client.add_event_handler(move |event: OriginalSyncRoomMessageEvent, room: Room| {
|
||||
let tx = tx_handler.clone();
|
||||
let target_room = target_room_for_handler.clone();
|
||||
let my_user_id = my_user_id_for_handler.clone();
|
||||
let allowed_users = allowed_users_for_handler.clone();
|
||||
let dedupe = Arc::clone(&dedupe_for_handler);
|
||||
|
||||
async move {
|
||||
if room.room_id().as_str() != target_room.as_str() {
|
||||
return;
|
||||
}
|
||||
|
||||
if event.sender == my_user_id {
|
||||
return;
|
||||
}
|
||||
|
||||
let sender = event.sender.to_string();
|
||||
if !MatrixChannel::is_sender_allowed(&allowed_users, &sender) {
|
||||
return;
|
||||
}
|
||||
|
||||
let body = match &event.content.msgtype {
|
||||
MessageType::Text(content) => content.body.clone(),
|
||||
MessageType::Notice(content) => content.body.clone(),
|
||||
_ => return,
|
||||
};
|
||||
|
||||
if !MatrixChannel::has_non_empty_body(&body) {
|
||||
return;
|
||||
}
|
||||
|
||||
let event_id = event.event_id.to_string();
|
||||
{
|
||||
let mut guard = dedupe.lock().await;
|
||||
let (recent_order, recent_lookup) = &mut *guard;
|
||||
if MatrixChannel::cache_event_id(&event_id, recent_order, recent_lookup) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let msg = ChannelMessage {
|
||||
id: event_id,
|
||||
sender: sender.clone(),
|
||||
reply_target: sender,
|
||||
content: body,
|
||||
channel: "matrix".to_string(),
|
||||
timestamp: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
};
|
||||
|
||||
let _ = tx.send(msg).await;
|
||||
}
|
||||
});
|
||||
|
||||
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_secs(30));
|
||||
client
|
||||
.sync_with_result_callback(sync_settings, |sync_result| {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
if tx.is_closed() {
|
||||
return Ok::<LoopCtrl, matrix_sdk::Error>(LoopCtrl::Break);
|
||||
}
|
||||
|
||||
if let Err(error) = sync_result {
|
||||
tracing::warn!("Matrix sync error: {error}, retrying...");
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
Ok::<LoopCtrl, matrix_sdk::Error>(LoopCtrl::Continue)
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let err = resp.text().await?;
|
||||
anyhow::bail!("Matrix initial sync failed: {err}");
|
||||
}
|
||||
|
||||
let sync: SyncResponse = resp.json().await?;
|
||||
let mut since = sync.next_batch;
|
||||
|
||||
// Long-poll loop
|
||||
loop {
|
||||
let url = format!(
|
||||
"{}/_matrix/client/v3/sync?since={}&timeout=30000",
|
||||
self.homeserver, since
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.http_client()
|
||||
.get(&url)
|
||||
.header("Authorization", format!("Bearer {}", self.access_token))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let resp = match resp {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
tracing::warn!("Matrix sync error: {e}, retrying...");
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if !resp.status().is_success() {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let sync: SyncResponse = resp.json().await?;
|
||||
since = sync.next_batch;
|
||||
|
||||
// Process events from our room
|
||||
if let Some(room) = sync.rooms.join.get(&self.room_id) {
|
||||
for event in &room.timeline.events {
|
||||
// Skip our own messages
|
||||
if event.sender == my_user_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only process text messages
|
||||
if event.event_type != "m.room.message" {
|
||||
continue;
|
||||
}
|
||||
|
||||
if event.content.msgtype.as_deref() != Some("m.text") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some(ref body) = event.content.body else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if !self.is_user_allowed(&event.sender) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let msg = ChannelMessage {
|
||||
id: format!("mx_{}", chrono::Utc::now().timestamp_millis()),
|
||||
sender: event.sender.clone(),
|
||||
reply_target: event.sender.clone(),
|
||||
content: body.clone(),
|
||||
channel: "matrix".to_string(),
|
||||
timestamp: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
};
|
||||
|
||||
if tx.send(msg).await.is_err() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn health_check(&self) -> bool {
|
||||
let url = format!("{}/_matrix/client/v3/account/whoami", self.homeserver);
|
||||
let Ok(resp) = self
|
||||
.http_client()
|
||||
.get(&url)
|
||||
.header("Authorization", format!("Bearer {}", self.access_token))
|
||||
.send()
|
||||
.await
|
||||
else {
|
||||
let Ok(room_id) = self.target_room_id().await else {
|
||||
return false;
|
||||
};
|
||||
|
||||
resp.status().is_success()
|
||||
if self.ensure_room_supported(&room_id).await.is_err() {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.matrix_client().await.is_ok()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -310,14 +646,139 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_trailing_slashes_strips_one() {
|
||||
fn multiple_trailing_slashes_strip_all() {
|
||||
let ch = MatrixChannel::new(
|
||||
"https://matrix.org//".to_string(),
|
||||
"tok".to_string(),
|
||||
"!r:m".to_string(),
|
||||
vec![],
|
||||
);
|
||||
assert_eq!(ch.homeserver, "https://matrix.org/");
|
||||
assert_eq!(ch.homeserver, "https://matrix.org");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn trims_access_token() {
|
||||
let ch = MatrixChannel::new(
|
||||
"https://matrix.org".to_string(),
|
||||
" syt_test_token ".to_string(),
|
||||
"!r:m".to_string(),
|
||||
vec![],
|
||||
);
|
||||
assert_eq!(ch.access_token, "syt_test_token");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_hints_are_normalized() {
|
||||
let ch = MatrixChannel::new_with_session_hint(
|
||||
"https://matrix.org".to_string(),
|
||||
"tok".to_string(),
|
||||
"!r:m".to_string(),
|
||||
vec![],
|
||||
Some(" @bot:matrix.org ".to_string()),
|
||||
Some(" DEVICE123 ".to_string()),
|
||||
);
|
||||
|
||||
assert_eq!(ch.session_user_id_hint.as_deref(), Some("@bot:matrix.org"));
|
||||
assert_eq!(ch.session_device_id_hint.as_deref(), Some("DEVICE123"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_session_hints_are_ignored() {
|
||||
let ch = MatrixChannel::new_with_session_hint(
|
||||
"https://matrix.org".to_string(),
|
||||
"tok".to_string(),
|
||||
"!r:m".to_string(),
|
||||
vec![],
|
||||
Some(" ".to_string()),
|
||||
Some("".to_string()),
|
||||
);
|
||||
|
||||
assert!(ch.session_user_id_hint.is_none());
|
||||
assert!(ch.session_device_id_hint.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_path_segment_encodes_room_refs() {
|
||||
assert_eq!(
|
||||
MatrixChannel::encode_path_segment("#ops:matrix.example.com"),
|
||||
"%23ops%3Amatrix.example.com"
|
||||
);
|
||||
assert_eq!(
|
||||
MatrixChannel::encode_path_segment("!room:matrix.example.com"),
|
||||
"%21room%3Amatrix.example.com"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn supported_message_type_detection() {
|
||||
assert!(MatrixChannel::is_supported_message_type("m.text"));
|
||||
assert!(MatrixChannel::is_supported_message_type("m.notice"));
|
||||
assert!(!MatrixChannel::is_supported_message_type("m.image"));
|
||||
assert!(!MatrixChannel::is_supported_message_type("m.file"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn body_presence_detection() {
|
||||
assert!(MatrixChannel::has_non_empty_body("hello"));
|
||||
assert!(MatrixChannel::has_non_empty_body(" hello "));
|
||||
assert!(!MatrixChannel::has_non_empty_body(""));
|
||||
assert!(!MatrixChannel::has_non_empty_body(" \n\t "));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_filter_for_room_targets_requested_room() {
|
||||
let filter = MatrixChannel::sync_filter_for_room("!room:matrix.org", 0);
|
||||
let value: serde_json::Value = serde_json::from_str(&filter).unwrap();
|
||||
|
||||
assert_eq!(value["room"]["rooms"][0], "!room:matrix.org");
|
||||
assert_eq!(value["room"]["timeline"]["limit"], 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_id_cache_deduplicates_and_evicts_old_entries() {
|
||||
let mut recent_order = std::collections::VecDeque::new();
|
||||
let mut recent_lookup = std::collections::HashSet::new();
|
||||
|
||||
assert!(!MatrixChannel::cache_event_id(
|
||||
"$first:event",
|
||||
&mut recent_order,
|
||||
&mut recent_lookup
|
||||
));
|
||||
assert!(MatrixChannel::cache_event_id(
|
||||
"$first:event",
|
||||
&mut recent_order,
|
||||
&mut recent_lookup
|
||||
));
|
||||
|
||||
for i in 0..2050 {
|
||||
let event_id = format!("$event-{i}:matrix");
|
||||
MatrixChannel::cache_event_id(&event_id, &mut recent_order, &mut recent_lookup);
|
||||
}
|
||||
|
||||
assert!(!MatrixChannel::cache_event_id(
|
||||
"$first:event",
|
||||
&mut recent_order,
|
||||
&mut recent_lookup
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn trims_room_id_and_allowed_users() {
|
||||
let ch = MatrixChannel::new(
|
||||
"https://matrix.org".to_string(),
|
||||
"tok".to_string(),
|
||||
" !room:matrix.org ".to_string(),
|
||||
vec![
|
||||
" @user:matrix.org ".to_string(),
|
||||
" ".to_string(),
|
||||
"@other:matrix.org".to_string(),
|
||||
],
|
||||
);
|
||||
|
||||
assert_eq!(ch.room_id, "!room:matrix.org");
|
||||
assert_eq!(ch.allowed_users.len(), 2);
|
||||
assert!(ch.allowed_users.contains(&"@user:matrix.org".to_string()));
|
||||
assert!(ch.allowed_users.contains(&"@other:matrix.org".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -393,6 +854,7 @@ mod tests {
|
|||
"events": [
|
||||
{
|
||||
"type": "m.room.message",
|
||||
"event_id": "$event:matrix.org",
|
||||
"sender": "@user:matrix.org",
|
||||
"content": {
|
||||
"msgtype": "m.text",
|
||||
|
|
@ -410,6 +872,10 @@ mod tests {
|
|||
let room = resp.rooms.join.get("!room:matrix.org").unwrap();
|
||||
assert_eq!(room.timeline.events.len(), 1);
|
||||
assert_eq!(room.timeline.events[0].sender, "@user:matrix.org");
|
||||
assert_eq!(
|
||||
room.timeline.events[0].event_id.as_deref(),
|
||||
Some("$event:matrix.org")
|
||||
);
|
||||
assert_eq!(
|
||||
room.timeline.events[0].content.body.as_deref(),
|
||||
Some("Hello!")
|
||||
|
|
@ -461,6 +927,62 @@ mod tests {
|
|||
assert!(event.content.msgtype.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_content_supports_notice_msgtype() {
|
||||
let json = r#"{
|
||||
"type":"m.room.message",
|
||||
"sender":"@u:m",
|
||||
"event_id":"$notice:m",
|
||||
"content":{"msgtype":"m.notice","body":"Heads up"}
|
||||
}"#;
|
||||
let event: TimelineEvent = serde_json::from_str(json).unwrap();
|
||||
assert_eq!(event.content.msgtype.as_deref(), Some("m.notice"));
|
||||
assert_eq!(event.content.body.as_deref(), Some("Heads up"));
|
||||
assert_eq!(event.event_id.as_deref(), Some("$notice:m"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn invalid_room_reference_fails_fast() {
|
||||
let ch = MatrixChannel::new(
|
||||
"https://matrix.org".to_string(),
|
||||
"tok".to_string(),
|
||||
"room_without_prefix".to_string(),
|
||||
vec![],
|
||||
);
|
||||
|
||||
let err = ch.resolve_room_id().await.unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("must start with '!' (room ID) or '#' (room alias)"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn target_room_id_keeps_canonical_room_id_without_lookup() {
|
||||
let ch = MatrixChannel::new(
|
||||
"https://matrix.org".to_string(),
|
||||
"tok".to_string(),
|
||||
"!canonical:matrix.org".to_string(),
|
||||
vec![],
|
||||
);
|
||||
|
||||
let room_id = ch.target_room_id().await.unwrap();
|
||||
assert_eq!(room_id, "!canonical:matrix.org");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn target_room_id_uses_cached_alias_resolution() {
|
||||
let ch = MatrixChannel::new(
|
||||
"https://matrix.org".to_string(),
|
||||
"tok".to_string(),
|
||||
"#ops:matrix.org".to_string(),
|
||||
vec![],
|
||||
);
|
||||
|
||||
*ch.resolved_room_id_cache.write().await = Some("!cached:matrix.org".to_string());
|
||||
let room_id = ch.target_room_id().await.unwrap();
|
||||
assert_eq!(room_id, "!cached:matrix.org");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_response_missing_rooms_defaults() {
|
||||
let json = r#"{"next_batch":"s0"}"#;
|
||||
|
|
|
|||
|
|
@ -1351,11 +1351,13 @@ pub async fn doctor_channels(config: Config) -> Result<()> {
|
|||
if let Some(ref mx) = config.channels_config.matrix {
|
||||
channels.push((
|
||||
"Matrix",
|
||||
Arc::new(MatrixChannel::new(
|
||||
Arc::new(MatrixChannel::new_with_session_hint(
|
||||
mx.homeserver.clone(),
|
||||
mx.access_token.clone(),
|
||||
mx.room_id.clone(),
|
||||
mx.allowed_users.clone(),
|
||||
mx.user_id.clone(),
|
||||
mx.device_id.clone(),
|
||||
)),
|
||||
));
|
||||
}
|
||||
|
|
@ -1676,11 +1678,13 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
|||
}
|
||||
|
||||
if let Some(ref mx) = config.channels_config.matrix {
|
||||
channels.push(Arc::new(MatrixChannel::new(
|
||||
channels.push(Arc::new(MatrixChannel::new_with_session_hint(
|
||||
mx.homeserver.clone(),
|
||||
mx.access_token.clone(),
|
||||
mx.room_id.clone(),
|
||||
mx.allowed_users.clone(),
|
||||
mx.user_id.clone(),
|
||||
mx.device_id.clone(),
|
||||
)));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2091,6 +2091,10 @@ pub struct IMessageConfig {
|
|||
pub struct MatrixConfig {
|
||||
pub homeserver: String,
|
||||
pub access_token: String,
|
||||
#[serde(default)]
|
||||
pub user_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub device_id: Option<String>,
|
||||
pub room_id: String,
|
||||
pub allowed_users: Vec<String>,
|
||||
}
|
||||
|
|
@ -3621,6 +3625,8 @@ tool_dispatcher = "xml"
|
|||
let mc = MatrixConfig {
|
||||
homeserver: "https://matrix.org".into(),
|
||||
access_token: "syt_token_abc".into(),
|
||||
user_id: Some("@bot:matrix.org".into()),
|
||||
device_id: Some("DEVICE123".into()),
|
||||
room_id: "!room123:matrix.org".into(),
|
||||
allowed_users: vec!["@user:matrix.org".into()],
|
||||
};
|
||||
|
|
@ -3628,6 +3634,8 @@ tool_dispatcher = "xml"
|
|||
let parsed: MatrixConfig = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed.homeserver, "https://matrix.org");
|
||||
assert_eq!(parsed.access_token, "syt_token_abc");
|
||||
assert_eq!(parsed.user_id.as_deref(), Some("@bot:matrix.org"));
|
||||
assert_eq!(parsed.device_id.as_deref(), Some("DEVICE123"));
|
||||
assert_eq!(parsed.room_id, "!room123:matrix.org");
|
||||
assert_eq!(parsed.allowed_users.len(), 1);
|
||||
}
|
||||
|
|
@ -3637,6 +3645,8 @@ tool_dispatcher = "xml"
|
|||
let mc = MatrixConfig {
|
||||
homeserver: "https://synapse.local:8448".into(),
|
||||
access_token: "tok".into(),
|
||||
user_id: None,
|
||||
device_id: None,
|
||||
room_id: "!abc:synapse.local".into(),
|
||||
allowed_users: vec!["@admin:synapse.local".into(), "*".into()],
|
||||
};
|
||||
|
|
@ -3646,6 +3656,21 @@ tool_dispatcher = "xml"
|
|||
assert_eq!(parsed.allowed_users.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn matrix_config_backward_compatible_without_session_hints() {
|
||||
let toml = r#"
|
||||
homeserver = "https://matrix.org"
|
||||
access_token = "tok"
|
||||
room_id = "!ops:matrix.org"
|
||||
allowed_users = ["@ops:matrix.org"]
|
||||
"#;
|
||||
|
||||
let parsed: MatrixConfig = toml::from_str(toml).unwrap();
|
||||
assert_eq!(parsed.homeserver, "https://matrix.org");
|
||||
assert!(parsed.user_id.is_none());
|
||||
assert!(parsed.device_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn signal_config_serde() {
|
||||
let sc = SignalConfig {
|
||||
|
|
@ -3709,6 +3734,8 @@ tool_dispatcher = "xml"
|
|||
matrix: Some(MatrixConfig {
|
||||
homeserver: "https://m.org".into(),
|
||||
access_token: "tok".into(),
|
||||
user_id: None,
|
||||
device_id: None,
|
||||
room_id: "!r:m".into(),
|
||||
allowed_users: vec!["@u:m".into()],
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -836,6 +836,8 @@ mod tests {
|
|||
config.channels_config.matrix = Some(MatrixConfig {
|
||||
homeserver: "https://m.org".into(),
|
||||
access_token: "tok".into(),
|
||||
user_id: None,
|
||||
device_id: None,
|
||||
room_id: "!r:m".into(),
|
||||
allowed_users: vec![],
|
||||
});
|
||||
|
|
|
|||
|
|
@ -3015,14 +3015,44 @@ fn setup_channels() -> Result<ChannelsConfig> {
|
|||
.header("Authorization", format!("Bearer {access_token_clone}"))
|
||||
.send()?;
|
||||
let ok = resp.status().is_success();
|
||||
Ok::<_, reqwest::Error>(ok)
|
||||
|
||||
if !ok {
|
||||
return Ok::<_, reqwest::Error>((false, None, None));
|
||||
}
|
||||
|
||||
let payload: Value = match resp.json() {
|
||||
Ok(payload) => payload,
|
||||
Err(_) => Value::Null,
|
||||
};
|
||||
let user_id = payload
|
||||
.get("user_id")
|
||||
.and_then(|value| value.as_str())
|
||||
.map(|value| value.to_string());
|
||||
let device_id = payload
|
||||
.get("device_id")
|
||||
.and_then(|value| value.as_str())
|
||||
.map(|value| value.to_string());
|
||||
|
||||
Ok::<_, reqwest::Error>((true, user_id, device_id))
|
||||
})
|
||||
.join();
|
||||
match thread_result {
|
||||
Ok(Ok(true)) => println!(
|
||||
"\r {} Connection verified ",
|
||||
style("✅").green().bold()
|
||||
),
|
||||
|
||||
let (detected_user_id, detected_device_id) = match thread_result {
|
||||
Ok(Ok((true, user_id, device_id))) => {
|
||||
println!(
|
||||
"\r {} Connection verified ",
|
||||
style("✅").green().bold()
|
||||
);
|
||||
|
||||
if device_id.is_none() {
|
||||
println!(
|
||||
" {} Homeserver did not return device_id from whoami. If E2EE decryption fails, set channels.matrix.device_id manually in config.toml.",
|
||||
style("⚠️").yellow().bold()
|
||||
);
|
||||
}
|
||||
|
||||
(user_id, device_id)
|
||||
}
|
||||
_ => {
|
||||
println!(
|
||||
"\r {} Connection failed — check homeserver URL and token",
|
||||
|
|
@ -3030,7 +3060,7 @@ fn setup_channels() -> Result<ChannelsConfig> {
|
|||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let room_id: String = Input::new()
|
||||
.with_prompt(" Room ID (e.g. !abc123:matrix.org)")
|
||||
|
|
@ -3050,6 +3080,8 @@ fn setup_channels() -> Result<ChannelsConfig> {
|
|||
config.matrix = Some(MatrixConfig {
|
||||
homeserver: homeserver.trim_end_matches('/').to_string(),
|
||||
access_token,
|
||||
user_id: detected_user_id,
|
||||
device_id: detected_device_id,
|
||||
room_id,
|
||||
allowed_users,
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue