feat: initial release — ZeroClaw v0.1.0

- 22 AI providers (OpenRouter, Anthropic, OpenAI, Mistral, etc.)
- 7 channels (CLI, Telegram, Discord, Slack, iMessage, Matrix, Webhook)
- 5-step onboarding wizard with Project Context personalization
- OpenClaw-aligned system prompt (SOUL.md, IDENTITY.md, USER.md, AGENTS.md, etc.)
- SQLite memory backend with auto-save
- Skills system with on-demand loading
- Security: autonomy levels, command allowlists, cost limits
- 532 tests passing, 0 clippy warnings
This commit is contained in:
argenis de la rosa 2026-02-13 12:19:14 -05:00
commit 05cb353f7f
71 changed files with 15757 additions and 0 deletions

117
src/channels/cli.rs Normal file
View file

@ -0,0 +1,117 @@
use super::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use tokio::io::{self, AsyncBufReadExt, BufReader};
use uuid::Uuid;
/// CLI channel — stdin/stdout, always available, zero deps
pub struct CliChannel;
impl CliChannel {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl Channel for CliChannel {
fn name(&self) -> &str {
"cli"
}
async fn send(&self, message: &str, _recipient: &str) -> anyhow::Result<()> {
println!("{message}");
Ok(())
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let stdin = io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
if line == "/quit" || line == "/exit" {
break;
}
let msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: "user".to_string(),
content: line,
channel: "cli".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if tx.send(msg).await.is_err() {
break;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cli_channel_name() {
assert_eq!(CliChannel::new().name(), "cli");
}
#[tokio::test]
async fn cli_channel_send_does_not_panic() {
let ch = CliChannel::new();
let result = ch.send("hello", "user").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn cli_channel_send_empty_message() {
let ch = CliChannel::new();
let result = ch.send("", "").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn cli_channel_health_check() {
let ch = CliChannel::new();
assert!(ch.health_check().await);
}
#[test]
fn channel_message_struct() {
let msg = ChannelMessage {
id: "test-id".into(),
sender: "user".into(),
content: "hello".into(),
channel: "cli".into(),
timestamp: 1234567890,
};
assert_eq!(msg.id, "test-id");
assert_eq!(msg.sender, "user");
assert_eq!(msg.content, "hello");
assert_eq!(msg.channel, "cli");
assert_eq!(msg.timestamp, 1234567890);
}
#[test]
fn channel_message_clone() {
let msg = ChannelMessage {
id: "id".into(),
sender: "s".into(),
content: "c".into(),
channel: "ch".into(),
timestamp: 0,
};
let cloned = msg.clone();
assert_eq!(cloned.id, msg.id);
assert_eq!(cloned.content, msg.content);
}
}

271
src/channels/discord.rs Normal file
View file

@ -0,0 +1,271 @@
use super::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio_tungstenite::tungstenite::Message;
use uuid::Uuid;
/// Discord channel — connects via Gateway WebSocket for real-time messages
pub struct DiscordChannel {
bot_token: String,
guild_id: Option<String>,
client: reqwest::Client,
}
impl DiscordChannel {
pub fn new(bot_token: String, guild_id: Option<String>) -> Self {
Self {
bot_token,
guild_id,
client: reqwest::Client::new(),
}
}
fn bot_user_id_from_token(token: &str) -> Option<String> {
// Discord bot tokens are base64(bot_user_id).timestamp.hmac
let part = token.split('.').next()?;
base64_decode(part)
}
}
const BASE64_ALPHABET: &[u8] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
/// Minimal base64 decode (no extra dep) — only needs to decode the user ID portion
#[allow(clippy::cast_possible_truncation)]
fn base64_decode(input: &str) -> Option<String> {
let padded = match input.len() % 4 {
2 => format!("{input}=="),
3 => format!("{input}="),
_ => input.to_string(),
};
let mut bytes = Vec::new();
let chars: Vec<u8> = padded.bytes().collect();
for chunk in chars.chunks(4) {
if chunk.len() < 4 {
break;
}
let mut v = [0usize; 4];
for (i, &b) in chunk.iter().enumerate() {
if b == b'=' {
v[i] = 0;
} else {
v[i] = BASE64_ALPHABET.iter().position(|&a| a == b)?;
}
}
bytes.push(((v[0] << 2) | (v[1] >> 4)) as u8);
if chunk[2] != b'=' {
bytes.push((((v[1] & 0xF) << 4) | (v[2] >> 2)) as u8);
}
if chunk[3] != b'=' {
bytes.push((((v[2] & 0x3) << 6) | v[3]) as u8);
}
}
String::from_utf8(bytes).ok()
}
#[async_trait]
impl Channel for DiscordChannel {
fn name(&self) -> &str {
"discord"
}
async fn send(&self, message: &str, channel_id: &str) -> anyhow::Result<()> {
let url = format!("https://discord.com/api/v10/channels/{channel_id}/messages");
let body = json!({ "content": message });
self.client
.post(&url)
.header("Authorization", format!("Bot {}", self.bot_token))
.json(&body)
.send()
.await?;
Ok(())
}
#[allow(clippy::too_many_lines)]
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let bot_user_id = Self::bot_user_id_from_token(&self.bot_token).unwrap_or_default();
// Get Gateway URL
let gw_resp: serde_json::Value = self
.client
.get("https://discord.com/api/v10/gateway/bot")
.header("Authorization", format!("Bot {}", self.bot_token))
.send()
.await?
.json()
.await?;
let gw_url = gw_resp
.get("url")
.and_then(|u| u.as_str())
.unwrap_or("wss://gateway.discord.gg");
let ws_url = format!("{gw_url}/?v=10&encoding=json");
tracing::info!("Discord: connecting to gateway...");
let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?;
let (mut write, mut read) = ws_stream.split();
// Read Hello (opcode 10)
let hello = read.next().await.ok_or(anyhow::anyhow!("No hello"))??;
let hello_data: serde_json::Value = serde_json::from_str(&hello.to_string())?;
let heartbeat_interval = hello_data
.get("d")
.and_then(|d| d.get("heartbeat_interval"))
.and_then(serde_json::Value::as_u64)
.unwrap_or(41250);
// Send Identify (opcode 2)
let identify = json!({
"op": 2,
"d": {
"token": self.bot_token,
"intents": 33281, // GUILDS | GUILD_MESSAGES | MESSAGE_CONTENT | DIRECT_MESSAGES
"properties": {
"os": "linux",
"browser": "zeroclaw",
"device": "zeroclaw"
}
}
});
write.send(Message::Text(identify.to_string())).await?;
tracing::info!("Discord: connected and identified");
// Spawn heartbeat task
let (hb_tx, mut hb_rx) = tokio::sync::mpsc::channel::<()>(1);
let hb_interval = heartbeat_interval;
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_millis(hb_interval));
loop {
interval.tick().await;
if hb_tx.send(()).await.is_err() {
break;
}
}
});
let guild_filter = self.guild_id.clone();
loop {
tokio::select! {
_ = hb_rx.recv() => {
let hb = json!({"op": 1, "d": null});
if write.send(Message::Text(hb.to_string())).await.is_err() {
break;
}
}
msg = read.next() => {
let msg = match msg {
Some(Ok(Message::Text(t))) => t,
Some(Ok(Message::Close(_))) | None => break,
_ => continue,
};
let event: serde_json::Value = match serde_json::from_str(&msg) {
Ok(e) => e,
Err(_) => continue,
};
// Only handle MESSAGE_CREATE (opcode 0, type "MESSAGE_CREATE")
let event_type = event.get("t").and_then(|t| t.as_str()).unwrap_or("");
if event_type != "MESSAGE_CREATE" {
continue;
}
let Some(d) = event.get("d") else {
continue;
};
// Skip messages from the bot itself
let author_id = d.get("author").and_then(|a| a.get("id")).and_then(|i| i.as_str()).unwrap_or("");
if author_id == bot_user_id {
continue;
}
// Skip bot messages
if d.get("author").and_then(|a| a.get("bot")).and_then(serde_json::Value::as_bool).unwrap_or(false) {
continue;
}
// Guild filter
if let Some(ref gid) = guild_filter {
let msg_guild = d.get("guild_id").and_then(serde_json::Value::as_str).unwrap_or("");
if msg_guild != gid {
continue;
}
}
let content = d.get("content").and_then(|c| c.as_str()).unwrap_or("");
if content.is_empty() {
continue;
}
let channel_id = d.get("channel_id").and_then(|c| c.as_str()).unwrap_or("").to_string();
let channel_msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: channel_id,
content: content.to_string(),
channel: "discord".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if tx.send(channel_msg).await.is_err() {
break;
}
}
}
}
Ok(())
}
async fn health_check(&self) -> bool {
self.client
.get("https://discord.com/api/v10/users/@me")
.header("Authorization", format!("Bot {}", self.bot_token))
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn discord_channel_name() {
let ch = DiscordChannel::new("fake".into(), None);
assert_eq!(ch.name(), "discord");
}
#[test]
fn base64_decode_bot_id() {
// "MTIzNDU2" decodes to "123456"
let decoded = base64_decode("MTIzNDU2");
assert_eq!(decoded, Some("123456".to_string()));
}
#[test]
fn bot_user_id_extraction() {
// Token format: base64(user_id).timestamp.hmac
let token = "MTIzNDU2.fake.hmac";
let id = DiscordChannel::bot_user_id_from_token(token);
assert_eq!(id, Some("123456".to_string()));
}
}

265
src/channels/imessage.rs Normal file
View file

@ -0,0 +1,265 @@
use crate::channels::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use directories::UserDirs;
use tokio::sync::mpsc;
/// iMessage channel using macOS `AppleScript` bridge.
/// Polls the Messages database for new messages and sends replies via `osascript`.
#[derive(Clone)]
pub struct IMessageChannel {
allowed_contacts: Vec<String>,
poll_interval_secs: u64,
}
impl IMessageChannel {
pub fn new(allowed_contacts: Vec<String>) -> Self {
Self {
allowed_contacts,
poll_interval_secs: 3,
}
}
fn is_contact_allowed(&self, sender: &str) -> bool {
if self.allowed_contacts.iter().any(|u| u == "*") {
return true;
}
self.allowed_contacts.iter().any(|u| {
u.eq_ignore_ascii_case(sender)
})
}
}
#[async_trait]
impl Channel for IMessageChannel {
fn name(&self) -> &str {
"imessage"
}
async fn send(&self, message: &str, target: &str) -> anyhow::Result<()> {
let escaped_msg = message.replace('\\', "\\\\").replace('"', "\\\"");
let script = format!(
r#"tell application "Messages"
set targetService to 1st account whose service type = iMessage
set targetBuddy to participant "{target}" of targetService
send "{escaped_msg}" to targetBuddy
end tell"#
);
let output = tokio::process::Command::new("osascript")
.arg("-e")
.arg(&script)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("iMessage send failed: {stderr}");
}
Ok(())
}
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
tracing::info!("iMessage channel listening (AppleScript bridge)...");
// Query the Messages SQLite database for new messages
// The database is at ~/Library/Messages/chat.db
let db_path = UserDirs::new()
.map(|u| u.home_dir().join("Library/Messages/chat.db"))
.ok_or_else(|| anyhow::anyhow!("Cannot find home directory"))?;
if !db_path.exists() {
anyhow::bail!(
"Messages database not found at {}. Ensure Messages.app is set up and Full Disk Access is granted.",
db_path.display()
);
}
// Track the last ROWID we've seen
let mut last_rowid = get_max_rowid(&db_path).await.unwrap_or(0);
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(self.poll_interval_secs)).await;
let new_messages = fetch_new_messages(&db_path, last_rowid).await;
match new_messages {
Ok(messages) => {
for (rowid, sender, text) in messages {
if rowid > last_rowid {
last_rowid = rowid;
}
if !self.is_contact_allowed(&sender) {
continue;
}
if text.trim().is_empty() {
continue;
}
let msg = ChannelMessage {
id: rowid.to_string(),
sender: sender.clone(),
content: text,
channel: "imessage".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if tx.send(msg).await.is_err() {
return Ok(());
}
}
}
Err(e) => {
tracing::warn!("iMessage poll error: {e}");
}
}
}
}
async fn health_check(&self) -> bool {
if !cfg!(target_os = "macos") {
return false;
}
let db_path = UserDirs::new()
.map(|u| u.home_dir().join("Library/Messages/chat.db"))
.unwrap_or_default();
db_path.exists()
}
}
/// Get the current max ROWID from the messages table
async fn get_max_rowid(db_path: &std::path::Path) -> anyhow::Result<i64> {
let output = tokio::process::Command::new("sqlite3")
.arg(db_path)
.arg("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0;")
.output()
.await?;
let stdout = String::from_utf8_lossy(&output.stdout);
let rowid = stdout.trim().parse::<i64>().unwrap_or(0);
Ok(rowid)
}
/// Fetch messages newer than `since_rowid`
async fn fetch_new_messages(
db_path: &std::path::Path,
since_rowid: i64,
) -> anyhow::Result<Vec<(i64, String, String)>> {
let query = format!(
"SELECT m.ROWID, h.id, m.text \
FROM message m \
JOIN handle h ON m.handle_id = h.ROWID \
WHERE m.ROWID > {since_rowid} \
AND m.is_from_me = 0 \
AND m.text IS NOT NULL \
ORDER BY m.ROWID ASC \
LIMIT 20;"
);
let output = tokio::process::Command::new("sqlite3")
.arg("-separator")
.arg("|")
.arg(db_path)
.arg(&query)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("sqlite3 query failed: {stderr}");
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut results = Vec::new();
for line in stdout.lines() {
let parts: Vec<&str> = line.splitn(3, '|').collect();
if parts.len() == 3 {
if let Ok(rowid) = parts[0].parse::<i64>() {
results.push((rowid, parts[1].to_string(), parts[2].to_string()));
}
}
}
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn creates_with_contacts() {
let ch = IMessageChannel::new(vec!["+1234567890".into()]);
assert_eq!(ch.allowed_contacts.len(), 1);
assert_eq!(ch.poll_interval_secs, 3);
}
#[test]
fn creates_with_empty_contacts() {
let ch = IMessageChannel::new(vec![]);
assert!(ch.allowed_contacts.is_empty());
}
#[test]
fn wildcard_allows_anyone() {
let ch = IMessageChannel::new(vec!["*".into()]);
assert!(ch.is_contact_allowed("+1234567890"));
assert!(ch.is_contact_allowed("random@icloud.com"));
assert!(ch.is_contact_allowed(""));
}
#[test]
fn specific_contact_allowed() {
let ch = IMessageChannel::new(vec!["+1234567890".into(), "user@icloud.com".into()]);
assert!(ch.is_contact_allowed("+1234567890"));
assert!(ch.is_contact_allowed("user@icloud.com"));
}
#[test]
fn unknown_contact_denied() {
let ch = IMessageChannel::new(vec!["+1234567890".into()]);
assert!(!ch.is_contact_allowed("+9999999999"));
assert!(!ch.is_contact_allowed("hacker@evil.com"));
}
#[test]
fn contact_case_insensitive() {
let ch = IMessageChannel::new(vec!["User@iCloud.com".into()]);
assert!(ch.is_contact_allowed("user@icloud.com"));
assert!(ch.is_contact_allowed("USER@ICLOUD.COM"));
}
#[test]
fn empty_allowlist_denies_all() {
let ch = IMessageChannel::new(vec![]);
assert!(!ch.is_contact_allowed("+1234567890"));
assert!(!ch.is_contact_allowed("anyone"));
}
#[test]
fn name_returns_imessage() {
let ch = IMessageChannel::new(vec![]);
assert_eq!(ch.name(), "imessage");
}
#[test]
fn wildcard_among_others_still_allows_all() {
let ch = IMessageChannel::new(vec!["+111".into(), "*".into(), "+222".into()]);
assert!(ch.is_contact_allowed("totally-unknown"));
}
#[test]
fn contact_with_spaces_exact_match() {
let ch = IMessageChannel::new(vec![" spaced ".into()]);
assert!(ch.is_contact_allowed(" spaced "));
assert!(!ch.is_contact_allowed("spaced"));
}
}

467
src/channels/matrix.rs Normal file
View file

@ -0,0 +1,467 @@
use crate::channels::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use reqwest::Client;
use serde::Deserialize;
use tokio::sync::mpsc;
/// Matrix channel using the Client-Server API (no SDK needed).
/// Connects to any Matrix homeserver (Element, Synapse, etc.).
#[derive(Clone)]
pub struct MatrixChannel {
homeserver: String,
access_token: String,
room_id: String,
allowed_users: Vec<String>,
client: Client,
}
#[derive(Debug, Deserialize)]
struct SyncResponse {
next_batch: String,
#[serde(default)]
rooms: Rooms,
}
#[derive(Debug, Deserialize, Default)]
struct Rooms {
#[serde(default)]
join: std::collections::HashMap<String, JoinedRoom>,
}
#[derive(Debug, Deserialize)]
struct JoinedRoom {
#[serde(default)]
timeline: Timeline,
}
#[derive(Debug, Deserialize, Default)]
struct Timeline {
#[serde(default)]
events: Vec<TimelineEvent>,
}
#[derive(Debug, Deserialize)]
struct TimelineEvent {
#[serde(rename = "type")]
event_type: String,
sender: String,
#[serde(default)]
content: EventContent,
}
#[derive(Debug, Deserialize, Default)]
struct EventContent {
#[serde(default)]
body: Option<String>,
#[serde(default)]
msgtype: Option<String>,
}
#[derive(Debug, Deserialize)]
struct WhoAmIResponse {
user_id: String,
}
impl MatrixChannel {
pub fn new(
homeserver: String,
access_token: String,
room_id: String,
allowed_users: Vec<String>,
) -> Self {
let homeserver = if homeserver.ends_with('/') {
homeserver[..homeserver.len() - 1].to_string()
} else {
homeserver
};
Self {
homeserver,
access_token,
room_id,
allowed_users,
client: Client::new(),
}
}
fn is_user_allowed(&self, sender: &str) -> bool {
if self.allowed_users.iter().any(|u| u == "*") {
return true;
}
self.allowed_users
.iter()
.any(|u| u.eq_ignore_ascii_case(sender))
}
async fn get_my_user_id(&self) -> anyhow::Result<String> {
let url = format!(
"{}/_matrix/client/v3/account/whoami",
self.homeserver
);
let resp = self
.client
.get(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Matrix whoami failed: {err}");
}
let who: WhoAmIResponse = resp.json().await?;
Ok(who.user_id)
}
}
#[async_trait]
impl Channel for MatrixChannel {
fn name(&self) -> &str {
"matrix"
}
async fn send(&self, message: &str, _target: &str) -> anyhow::Result<()> {
let txn_id = format!("zc_{}", chrono::Utc::now().timestamp_millis());
let url = format!(
"{}/_matrix/client/v3/rooms/{}/send/m.room.message/{}",
self.homeserver, self.room_id, txn_id
);
let body = serde_json::json!({
"msgtype": "m.text",
"body": message
});
let resp = self
.client
.put(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Matrix send failed: {err}");
}
Ok(())
}
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
tracing::info!("Matrix channel listening on room {}...", self.room_id);
let my_user_id = self.get_my_user_id().await?;
// Initial sync to get the since token
let url = format!(
"{}/_matrix/client/v3/sync?timeout=30000&filter={{\"room\":{{\"timeline\":{{\"limit\":1}}}}}}",
self.homeserver
);
let resp = self
.client
.get(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Matrix initial sync failed: {err}");
}
let sync: SyncResponse = resp.json().await?;
let mut since = sync.next_batch;
// Long-poll loop
loop {
let url = format!(
"{}/_matrix/client/v3/sync?since={}&timeout=30000",
self.homeserver, since
);
let resp = self
.client
.get(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.send()
.await;
let resp = match resp {
Ok(r) => r,
Err(e) => {
tracing::warn!("Matrix sync error: {e}, retrying...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue;
}
};
if !resp.status().is_success() {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue;
}
let sync: SyncResponse = resp.json().await?;
since = sync.next_batch;
// Process events from our room
if let Some(room) = sync.rooms.join.get(&self.room_id) {
for event in &room.timeline.events {
// Skip our own messages
if event.sender == my_user_id {
continue;
}
// Only process text messages
if event.event_type != "m.room.message" {
continue;
}
if event.content.msgtype.as_deref() != Some("m.text") {
continue;
}
let Some(ref body) = event.content.body else {
continue;
};
if !self.is_user_allowed(&event.sender) {
continue;
}
let msg = ChannelMessage {
id: format!("mx_{}", chrono::Utc::now().timestamp_millis()),
sender: event.sender.clone(),
content: body.clone(),
channel: "matrix".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if tx.send(msg).await.is_err() {
return Ok(());
}
}
}
}
}
async fn health_check(&self) -> bool {
let url = format!(
"{}/_matrix/client/v3/account/whoami",
self.homeserver
);
let Ok(resp) = self
.client
.get(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.send()
.await
else {
return false;
};
resp.status().is_success()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_channel() -> MatrixChannel {
MatrixChannel::new(
"https://matrix.org".to_string(),
"syt_test_token".to_string(),
"!room:matrix.org".to_string(),
vec!["@user:matrix.org".to_string()],
)
}
#[test]
fn creates_with_correct_fields() {
let ch = make_channel();
assert_eq!(ch.homeserver, "https://matrix.org");
assert_eq!(ch.access_token, "syt_test_token");
assert_eq!(ch.room_id, "!room:matrix.org");
assert_eq!(ch.allowed_users.len(), 1);
}
#[test]
fn strips_trailing_slash() {
let ch = MatrixChannel::new(
"https://matrix.org/".to_string(),
"tok".to_string(),
"!r:m".to_string(),
vec![],
);
assert_eq!(ch.homeserver, "https://matrix.org");
}
#[test]
fn no_trailing_slash_unchanged() {
let ch = MatrixChannel::new(
"https://matrix.org".to_string(),
"tok".to_string(),
"!r:m".to_string(),
vec![],
);
assert_eq!(ch.homeserver, "https://matrix.org");
}
#[test]
fn multiple_trailing_slashes_strips_one() {
let ch = MatrixChannel::new(
"https://matrix.org//".to_string(),
"tok".to_string(),
"!r:m".to_string(),
vec![],
);
assert_eq!(ch.homeserver, "https://matrix.org/");
}
#[test]
fn wildcard_allows_anyone() {
let ch = MatrixChannel::new(
"https://m.org".to_string(),
"tok".to_string(),
"!r:m".to_string(),
vec!["*".to_string()],
);
assert!(ch.is_user_allowed("@anyone:matrix.org"));
assert!(ch.is_user_allowed("@hacker:evil.org"));
}
#[test]
fn specific_user_allowed() {
let ch = make_channel();
assert!(ch.is_user_allowed("@user:matrix.org"));
}
#[test]
fn unknown_user_denied() {
let ch = make_channel();
assert!(!ch.is_user_allowed("@stranger:matrix.org"));
assert!(!ch.is_user_allowed("@evil:hacker.org"));
}
#[test]
fn user_case_insensitive() {
let ch = MatrixChannel::new(
"https://m.org".to_string(),
"tok".to_string(),
"!r:m".to_string(),
vec!["@User:Matrix.org".to_string()],
);
assert!(ch.is_user_allowed("@user:matrix.org"));
assert!(ch.is_user_allowed("@USER:MATRIX.ORG"));
}
#[test]
fn empty_allowlist_denies_all() {
let ch = MatrixChannel::new(
"https://m.org".to_string(),
"tok".to_string(),
"!r:m".to_string(),
vec![],
);
assert!(!ch.is_user_allowed("@anyone:matrix.org"));
}
#[test]
fn name_returns_matrix() {
let ch = make_channel();
assert_eq!(ch.name(), "matrix");
}
#[test]
fn sync_response_deserializes_empty() {
let json = r#"{"next_batch":"s123","rooms":{"join":{}}}"#;
let resp: SyncResponse = serde_json::from_str(json).unwrap();
assert_eq!(resp.next_batch, "s123");
assert!(resp.rooms.join.is_empty());
}
#[test]
fn sync_response_deserializes_with_events() {
let json = r#"{
"next_batch": "s456",
"rooms": {
"join": {
"!room:matrix.org": {
"timeline": {
"events": [
{
"type": "m.room.message",
"sender": "@user:matrix.org",
"content": {
"msgtype": "m.text",
"body": "Hello!"
}
}
]
}
}
}
}
}"#;
let resp: SyncResponse = serde_json::from_str(json).unwrap();
assert_eq!(resp.next_batch, "s456");
let room = resp.rooms.join.get("!room:matrix.org").unwrap();
assert_eq!(room.timeline.events.len(), 1);
assert_eq!(room.timeline.events[0].sender, "@user:matrix.org");
assert_eq!(room.timeline.events[0].content.body.as_deref(), Some("Hello!"));
assert_eq!(room.timeline.events[0].content.msgtype.as_deref(), Some("m.text"));
}
#[test]
fn sync_response_ignores_non_text_events() {
let json = r#"{
"next_batch": "s789",
"rooms": {
"join": {
"!room:m": {
"timeline": {
"events": [
{
"type": "m.room.member",
"sender": "@user:m",
"content": {}
}
]
}
}
}
}
}"#;
let resp: SyncResponse = serde_json::from_str(json).unwrap();
let room = resp.rooms.join.get("!room:m").unwrap();
assert_eq!(room.timeline.events[0].event_type, "m.room.member");
assert!(room.timeline.events[0].content.body.is_none());
}
#[test]
fn whoami_response_deserializes() {
let json = r#"{"user_id":"@bot:matrix.org"}"#;
let resp: WhoAmIResponse = serde_json::from_str(json).unwrap();
assert_eq!(resp.user_id, "@bot:matrix.org");
}
#[test]
fn event_content_defaults() {
let json = r#"{"type":"m.room.message","sender":"@u:m","content":{}}"#;
let event: TimelineEvent = serde_json::from_str(json).unwrap();
assert!(event.content.body.is_none());
assert!(event.content.msgtype.is_none());
}
#[test]
fn sync_response_missing_rooms_defaults() {
let json = r#"{"next_batch":"s0"}"#;
let resp: SyncResponse = serde_json::from_str(json).unwrap();
assert!(resp.rooms.join.is_empty());
}
}

550
src/channels/mod.rs Normal file
View file

@ -0,0 +1,550 @@
pub mod cli;
pub mod discord;
pub mod imessage;
pub mod matrix;
pub mod slack;
pub mod telegram;
pub mod traits;
pub use cli::CliChannel;
pub use discord::DiscordChannel;
pub use imessage::IMessageChannel;
pub use matrix::MatrixChannel;
pub use slack::SlackChannel;
pub use telegram::TelegramChannel;
pub use traits::Channel;
use crate::config::Config;
use crate::memory::{self, Memory};
use crate::providers::{self, Provider};
use anyhow::Result;
use std::sync::Arc;
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
/// Load workspace identity files and build a system prompt.
///
/// Follows the `OpenClaw` framework structure:
/// 1. Tooling — tool list + descriptions
/// 2. Safety — guardrail reminder
/// 3. Skills — compact list with paths (loaded on-demand)
/// 4. Workspace — working directory
/// 5. Bootstrap files — AGENTS, SOUL, TOOLS, IDENTITY, USER, HEARTBEAT, BOOTSTRAP, MEMORY
/// 6. Date & Time — timezone for cache stability
/// 7. Runtime — host, OS, model
///
/// Daily memory files (`memory/*.md`) are NOT injected — they are accessed
/// on-demand via `memory_recall` / `memory_search` tools.
pub fn build_system_prompt(
workspace_dir: &std::path::Path,
model_name: &str,
tools: &[(&str, &str)],
skills: &[crate::skills::Skill],
) -> String {
use std::fmt::Write;
let mut prompt = String::with_capacity(8192);
// ── 1. Tooling ──────────────────────────────────────────────
if !tools.is_empty() {
prompt.push_str("## Tools\n\n");
prompt.push_str("You have access to the following tools:\n\n");
for (name, desc) in tools {
let _ = writeln!(prompt, "- **{name}**: {desc}");
}
prompt.push('\n');
}
// ── 2. Safety ───────────────────────────────────────────────
prompt.push_str("## Safety\n\n");
prompt.push_str(
"- Do not exfiltrate private data.\n\
- Do not run destructive commands without asking.\n\
- Do not bypass oversight or approval mechanisms.\n\
- Prefer `trash` over `rm` (recoverable beats gone forever).\n\
- When in doubt, ask before acting externally.\n\n",
);
// ── 3. Skills (compact list — load on-demand) ───────────────
if !skills.is_empty() {
prompt.push_str("## Available Skills\n\n");
prompt.push_str(
"Skills are loaded on demand. Use `read` on the skill path to get full instructions.\n\n",
);
prompt.push_str("<available_skills>\n");
for skill in skills {
let _ = writeln!(prompt, " <skill>");
let _ = writeln!(prompt, " <name>{}</name>", skill.name);
let _ = writeln!(prompt, " <description>{}</description>", skill.description);
let location = workspace_dir.join("skills").join(&skill.name).join("SKILL.md");
let _ = writeln!(prompt, " <location>{}</location>", location.display());
let _ = writeln!(prompt, " </skill>");
}
prompt.push_str("</available_skills>\n\n");
}
// ── 4. Workspace ────────────────────────────────────────────
let _ = writeln!(prompt, "## Workspace\n\nWorking directory: `{}`\n", workspace_dir.display());
// ── 5. Bootstrap files (injected into context) ──────────────
prompt.push_str("## Project Context\n\n");
prompt.push_str("The following workspace files define your identity, behavior, and context.\n\n");
let bootstrap_files = [
"AGENTS.md",
"SOUL.md",
"TOOLS.md",
"IDENTITY.md",
"USER.md",
"HEARTBEAT.md",
];
for filename in &bootstrap_files {
inject_workspace_file(&mut prompt, workspace_dir, filename);
}
// BOOTSTRAP.md — only if it exists (first-run ritual)
let bootstrap_path = workspace_dir.join("BOOTSTRAP.md");
if bootstrap_path.exists() {
inject_workspace_file(&mut prompt, workspace_dir, "BOOTSTRAP.md");
}
// MEMORY.md — curated long-term memory (main session only)
inject_workspace_file(&mut prompt, workspace_dir, "MEMORY.md");
// ── 6. Date & Time ──────────────────────────────────────────
let now = chrono::Local::now();
let tz = now.format("%Z").to_string();
let _ = writeln!(prompt, "## Current Date & Time\n\nTimezone: {tz}\n");
// ── 7. Runtime ──────────────────────────────────────────────
let host = hostname::get()
.map_or_else(|_| "unknown".into(), |h| h.to_string_lossy().to_string());
let _ = writeln!(
prompt,
"## Runtime\n\nHost: {host} | OS: {} | Model: {model_name}\n",
std::env::consts::OS,
);
if prompt.is_empty() {
"You are ZeroClaw, a fast and efficient AI assistant built in Rust. Be helpful, concise, and direct.".to_string()
} else {
prompt
}
}
/// Inject a single workspace file into the prompt with truncation and missing-file markers.
fn inject_workspace_file(prompt: &mut String, workspace_dir: &std::path::Path, filename: &str) {
use std::fmt::Write;
let path = workspace_dir.join(filename);
match std::fs::read_to_string(&path) {
Ok(content) => {
let trimmed = content.trim();
if trimmed.is_empty() {
return;
}
let _ = writeln!(prompt, "### {filename}\n");
if trimmed.len() > BOOTSTRAP_MAX_CHARS {
prompt.push_str(&trimmed[..BOOTSTRAP_MAX_CHARS]);
let _ = writeln!(
prompt,
"\n\n[... truncated at {BOOTSTRAP_MAX_CHARS} chars — use `read` for full file]\n"
);
} else {
prompt.push_str(trimmed);
prompt.push_str("\n\n");
}
}
Err(_) => {
// Missing-file marker (matches OpenClaw behavior)
let _ = writeln!(prompt, "### {filename}\n\n[File not found: {filename}]\n");
}
}
}
pub fn handle_command(command: super::ChannelCommands, config: &Config) -> Result<()> {
match command {
super::ChannelCommands::Start => {
// Handled in main.rs (needs async), this is unreachable
unreachable!("Start is handled in main.rs")
}
super::ChannelCommands::List => {
println!("Channels:");
println!(" ✅ CLI (always available)");
for (name, configured) in [
("Telegram", config.channels_config.telegram.is_some()),
("Discord", config.channels_config.discord.is_some()),
("Slack", config.channels_config.slack.is_some()),
("Webhook", config.channels_config.webhook.is_some()),
("iMessage", config.channels_config.imessage.is_some()),
("Matrix", config.channels_config.matrix.is_some()),
] {
println!(
" {} {name}",
if configured { "" } else { "" }
);
}
println!("\nTo start channels: zeroclaw channel start");
println!("To configure: zeroclaw onboard");
Ok(())
}
super::ChannelCommands::Add {
channel_type,
config: _,
} => {
anyhow::bail!("Channel type '{channel_type}' — use `zeroclaw onboard` to configure channels");
}
super::ChannelCommands::Remove { name } => {
anyhow::bail!("Remove channel '{name}' — edit ~/.zeroclaw/config.toml directly");
}
}
}
/// Start all configured channels and route messages to the agent
#[allow(clippy::too_many_lines)]
pub async fn start_channels(config: Config) -> Result<()> {
let provider: Arc<dyn Provider> = Arc::from(providers::create_provider(
config.default_provider.as_deref().unwrap_or("openrouter"),
config.api_key.as_deref(),
)?);
let model = config
.default_model
.clone()
.unwrap_or_else(|| "anthropic/claude-sonnet-4-20250514".into());
let temperature = config.default_temperature;
let mem: Arc<dyn Memory> =
Arc::from(memory::create_memory(&config.memory, &config.workspace_dir)?);
// Build system prompt from workspace identity files + skills
let workspace = config.workspace_dir.clone();
let skills = crate::skills::load_skills(&workspace);
// Collect tool descriptions for the prompt
let tool_descs: Vec<(&str, &str)> = vec![
("shell", "Execute terminal commands"),
("file_read", "Read file contents"),
("file_write", "Write file contents"),
("memory_store", "Save to memory"),
("memory_recall", "Search memory"),
("memory_forget", "Delete a memory entry"),
];
let system_prompt = build_system_prompt(&workspace, &model, &tool_descs, &skills);
if !skills.is_empty() {
println!(" 🧩 Skills: {}", skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>().join(", "));
}
// Collect active channels
let mut channels: Vec<Arc<dyn Channel>> = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push(Arc::new(TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
)));
}
if let Some(ref dc) = config.channels_config.discord {
channels.push(Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
)));
}
if let Some(ref sl) = config.channels_config.slack {
channels.push(Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
)));
}
if let Some(ref im) = config.channels_config.imessage {
channels.push(Arc::new(IMessageChannel::new(
im.allowed_contacts.clone(),
)));
}
if let Some(ref mx) = config.channels_config.matrix {
channels.push(Arc::new(MatrixChannel::new(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
)));
}
if channels.is_empty() {
println!("No channels configured. Run `zeroclaw onboard` to set up channels.");
return Ok(());
}
println!("🦀 ZeroClaw Channel Server");
println!(" 🤖 Model: {model}");
println!(" 🧠 Memory: {} (auto-save: {})", config.memory.backend, if config.memory.auto_save { "on" } else { "off" });
println!(" 📡 Channels: {}", channels.iter().map(|c| c.name()).collect::<Vec<_>>().join(", "));
println!();
println!(" Listening for messages... (Ctrl+C to stop)");
println!();
// Single message bus — all channels send messages here
let (tx, mut rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(100);
// Spawn a listener for each channel
let mut handles = Vec::new();
for ch in &channels {
let ch = ch.clone();
let tx = tx.clone();
handles.push(tokio::spawn(async move {
if let Err(e) = ch.listen(tx).await {
tracing::error!("Channel {} error: {e}", ch.name());
}
}));
}
drop(tx); // Drop our copy so rx closes when all channels stop
// Process incoming messages — call the LLM and reply
while let Some(msg) = rx.recv().await {
println!(
" 💬 [{}] from {}: {}",
msg.channel,
msg.sender,
if msg.content.len() > 80 {
format!("{}...", &msg.content[..80])
} else {
msg.content.clone()
}
);
// Auto-save to memory
if config.memory.auto_save {
let _ = mem
.store(
&format!("{}_{}", msg.channel, msg.sender),
&msg.content,
crate::memory::MemoryCategory::Conversation,
)
.await;
}
// Call the LLM with system prompt (identity + soul + tools)
match provider.chat_with_system(Some(&system_prompt), &msg.content, &model, temperature).await {
Ok(response) => {
println!(
" 🤖 Reply: {}",
if response.len() > 80 {
format!("{}...", &response[..80])
} else {
response.clone()
}
);
// Find the channel that sent this message and reply
for ch in &channels {
if ch.name() == msg.channel {
if let Err(e) = ch.send(&response, &msg.sender).await {
eprintln!(" ❌ Failed to reply on {}: {e}", ch.name());
}
break;
}
}
}
Err(e) => {
eprintln!(" ❌ LLM error: {e}");
for ch in &channels {
if ch.name() == msg.channel {
let _ = ch
.send(&format!("⚠️ Error: {e}"), &msg.sender)
.await;
break;
}
}
}
}
}
// Wait for all channel tasks
for h in handles {
let _ = h.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_workspace() -> TempDir {
let tmp = TempDir::new().unwrap();
// Create minimal workspace files
std::fs::write(tmp.path().join("SOUL.md"), "# Soul\nBe helpful.").unwrap();
std::fs::write(tmp.path().join("IDENTITY.md"), "# Identity\nName: ZeroClaw").unwrap();
std::fs::write(tmp.path().join("USER.md"), "# User\nName: Test User").unwrap();
std::fs::write(tmp.path().join("AGENTS.md"), "# Agents\nFollow instructions.").unwrap();
std::fs::write(tmp.path().join("TOOLS.md"), "# Tools\nUse shell carefully.").unwrap();
std::fs::write(tmp.path().join("HEARTBEAT.md"), "# Heartbeat\nCheck status.").unwrap();
std::fs::write(tmp.path().join("MEMORY.md"), "# Memory\nUser likes Rust.").unwrap();
tmp
}
#[test]
fn prompt_contains_all_sections() {
let ws = make_workspace();
let tools = vec![("shell", "Run commands"), ("file_read", "Read files")];
let prompt = build_system_prompt(ws.path(), "test-model", &tools, &[]);
// Section headers
assert!(prompt.contains("## Tools"), "missing Tools section");
assert!(prompt.contains("## Safety"), "missing Safety section");
assert!(prompt.contains("## Workspace"), "missing Workspace section");
assert!(prompt.contains("## Project Context"), "missing Project Context");
assert!(prompt.contains("## Current Date & Time"), "missing Date/Time");
assert!(prompt.contains("## Runtime"), "missing Runtime section");
}
#[test]
fn prompt_injects_tools() {
let ws = make_workspace();
let tools = vec![("shell", "Run commands"), ("memory_recall", "Search memory")];
let prompt = build_system_prompt(ws.path(), "gpt-4o", &tools, &[]);
assert!(prompt.contains("**shell**"));
assert!(prompt.contains("Run commands"));
assert!(prompt.contains("**memory_recall**"));
}
#[test]
fn prompt_injects_safety() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt.contains("Do not exfiltrate private data"));
assert!(prompt.contains("Do not run destructive commands"));
assert!(prompt.contains("Prefer `trash` over `rm`"));
}
#[test]
fn prompt_injects_workspace_files() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt.contains("### SOUL.md"), "missing SOUL.md header");
assert!(prompt.contains("Be helpful"), "missing SOUL content");
assert!(prompt.contains("### IDENTITY.md"), "missing IDENTITY.md");
assert!(prompt.contains("Name: ZeroClaw"), "missing IDENTITY content");
assert!(prompt.contains("### USER.md"), "missing USER.md");
assert!(prompt.contains("### AGENTS.md"), "missing AGENTS.md");
assert!(prompt.contains("### TOOLS.md"), "missing TOOLS.md");
assert!(prompt.contains("### HEARTBEAT.md"), "missing HEARTBEAT.md");
assert!(prompt.contains("### MEMORY.md"), "missing MEMORY.md");
assert!(prompt.contains("User likes Rust"), "missing MEMORY content");
}
#[test]
fn prompt_missing_file_markers() {
let tmp = TempDir::new().unwrap();
// Empty workspace — no files at all
let prompt = build_system_prompt(tmp.path(), "model", &[], &[]);
assert!(prompt.contains("[File not found: SOUL.md]"));
assert!(prompt.contains("[File not found: AGENTS.md]"));
assert!(prompt.contains("[File not found: IDENTITY.md]"));
}
#[test]
fn prompt_bootstrap_only_if_exists() {
let ws = make_workspace();
// No BOOTSTRAP.md — should not appear
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(!prompt.contains("### BOOTSTRAP.md"), "BOOTSTRAP.md should not appear when missing");
// Create BOOTSTRAP.md — should appear
std::fs::write(ws.path().join("BOOTSTRAP.md"), "# Bootstrap\nFirst run.").unwrap();
let prompt2 = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt2.contains("### BOOTSTRAP.md"), "BOOTSTRAP.md should appear when present");
assert!(prompt2.contains("First run"));
}
#[test]
fn prompt_no_daily_memory_injection() {
let ws = make_workspace();
let memory_dir = ws.path().join("memory");
std::fs::create_dir_all(&memory_dir).unwrap();
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
std::fs::write(memory_dir.join(format!("{today}.md")), "# Daily\nSome note.").unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
// Daily notes should NOT be in the system prompt (on-demand via tools)
assert!(!prompt.contains("Daily Notes"), "daily notes should not be auto-injected");
assert!(!prompt.contains("Some note"), "daily content should not be in prompt");
}
#[test]
fn prompt_runtime_metadata() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "claude-sonnet-4", &[], &[]);
assert!(prompt.contains("Model: claude-sonnet-4"));
assert!(prompt.contains(&format!("OS: {}", std::env::consts::OS)));
assert!(prompt.contains("Host:"));
}
#[test]
fn prompt_skills_compact_list() {
let ws = make_workspace();
let skills = vec![crate::skills::Skill {
name: "code-review".into(),
description: "Review code for bugs".into(),
version: "1.0.0".into(),
author: None,
tags: vec![],
tools: vec![],
prompts: vec!["Long prompt content that should NOT appear in system prompt".into()],
}];
let prompt = build_system_prompt(ws.path(), "model", &[], &skills);
assert!(prompt.contains("<available_skills>"), "missing skills XML");
assert!(prompt.contains("<name>code-review</name>"));
assert!(prompt.contains("<description>Review code for bugs</description>"));
assert!(prompt.contains("SKILL.md</location>"));
assert!(prompt.contains("loaded on demand"), "should mention on-demand loading");
// Full prompt content should NOT be dumped
assert!(!prompt.contains("Long prompt content that should NOT appear"));
}
#[test]
fn prompt_truncation() {
let ws = make_workspace();
// Write a file larger than BOOTSTRAP_MAX_CHARS
let big_content = "x".repeat(BOOTSTRAP_MAX_CHARS + 1000);
std::fs::write(ws.path().join("AGENTS.md"), &big_content).unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt.contains("truncated at"), "large files should be truncated");
assert!(!prompt.contains(&big_content), "full content should not appear");
}
#[test]
fn prompt_empty_files_skipped() {
let ws = make_workspace();
std::fs::write(ws.path().join("TOOLS.md"), "").unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
// Empty file should not produce a header
assert!(!prompt.contains("### TOOLS.md"), "empty files should be skipped");
}
#[test]
fn prompt_workspace_path() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[]);
assert!(prompt.contains(&format!("Working directory: `{}`", ws.path().display())));
}
}

174
src/channels/slack.rs Normal file
View file

@ -0,0 +1,174 @@
use super::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use uuid::Uuid;
/// Slack channel — polls conversations.history via Web API
pub struct SlackChannel {
bot_token: String,
channel_id: Option<String>,
client: reqwest::Client,
}
impl SlackChannel {
pub fn new(bot_token: String, channel_id: Option<String>) -> Self {
Self {
bot_token,
channel_id,
client: reqwest::Client::new(),
}
}
/// Get the bot's own user ID so we can ignore our own messages
async fn get_bot_user_id(&self) -> Option<String> {
let resp: serde_json::Value = self
.client
.get("https://slack.com/api/auth.test")
.bearer_auth(&self.bot_token)
.send()
.await
.ok()?
.json()
.await
.ok()?;
resp.get("user_id")
.and_then(|u| u.as_str())
.map(String::from)
}
}
#[async_trait]
impl Channel for SlackChannel {
fn name(&self) -> &str {
"slack"
}
async fn send(&self, message: &str, channel: &str) -> anyhow::Result<()> {
let body = serde_json::json!({
"channel": channel,
"text": message
});
self.client
.post("https://slack.com/api/chat.postMessage")
.bearer_auth(&self.bot_token)
.json(&body)
.send()
.await?;
Ok(())
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let channel_id = self
.channel_id
.clone()
.ok_or_else(|| anyhow::anyhow!("Slack channel_id required for listening"))?;
let bot_user_id = self.get_bot_user_id().await.unwrap_or_default();
let mut last_ts = String::new();
tracing::info!("Slack channel listening on #{channel_id}...");
loop {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let mut params = vec![
("channel", channel_id.clone()),
("limit", "10".to_string()),
];
if !last_ts.is_empty() {
params.push(("oldest", last_ts.clone()));
}
let resp = match self
.client
.get("https://slack.com/api/conversations.history")
.bearer_auth(&self.bot_token)
.query(&params)
.send()
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!("Slack poll error: {e}");
continue;
}
};
let data: serde_json::Value = match resp.json().await {
Ok(d) => d,
Err(e) => {
tracing::warn!("Slack parse error: {e}");
continue;
}
};
if let Some(messages) = data.get("messages").and_then(|m| m.as_array()) {
// Messages come newest-first, reverse to process oldest first
for msg in messages.iter().rev() {
let ts = msg.get("ts").and_then(|t| t.as_str()).unwrap_or("");
let user = msg
.get("user")
.and_then(|u| u.as_str())
.unwrap_or("unknown");
let text = msg.get("text").and_then(|t| t.as_str()).unwrap_or("");
// Skip bot's own messages
if user == bot_user_id {
continue;
}
// Skip empty or already-seen
if text.is_empty() || ts <= last_ts.as_str() {
continue;
}
last_ts = ts.to_string();
let channel_msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: channel_id.clone(),
content: text.to_string(),
channel: "slack".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if tx.send(channel_msg).await.is_err() {
return Ok(());
}
}
}
}
}
async fn health_check(&self) -> bool {
self.client
.get("https://slack.com/api/auth.test")
.bearer_auth(&self.bot_token)
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn slack_channel_name() {
let ch = SlackChannel::new("xoxb-fake".into(), None);
assert_eq!(ch.name(), "slack");
}
#[test]
fn slack_channel_with_channel_id() {
let ch = SlackChannel::new("xoxb-fake".into(), Some("C12345".into()));
assert_eq!(ch.channel_id, Some("C12345".to_string()));
}
}

182
src/channels/telegram.rs Normal file
View file

@ -0,0 +1,182 @@
use super::traits::{Channel, ChannelMessage};
use async_trait::async_trait;
use uuid::Uuid;
/// Telegram channel — long-polls the Bot API for updates
pub struct TelegramChannel {
bot_token: String,
allowed_users: Vec<String>,
client: reqwest::Client,
}
impl TelegramChannel {
pub fn new(bot_token: String, allowed_users: Vec<String>) -> Self {
Self {
bot_token,
allowed_users,
client: reqwest::Client::new(),
}
}
fn api_url(&self, method: &str) -> String {
format!("https://api.telegram.org/bot{}/{method}", self.bot_token)
}
fn is_user_allowed(&self, username: &str) -> bool {
self.allowed_users.iter().any(|u| u == "*" || u == username)
}
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
async fn send(&self, message: &str, chat_id: &str) -> anyhow::Result<()> {
let body = serde_json::json!({
"chat_id": chat_id,
"text": message,
"parse_mode": "Markdown"
});
self.client
.post(self.api_url("sendMessage"))
.json(&body)
.send()
.await?;
Ok(())
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let mut offset: i64 = 0;
tracing::info!("Telegram channel listening for messages...");
loop {
let url = self.api_url("getUpdates");
let body = serde_json::json!({
"offset": offset,
"timeout": 30,
"allowed_updates": ["message"]
});
let resp = match self.client.post(&url).json(&body).send().await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Telegram poll error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
let data: serde_json::Value = match resp.json().await {
Ok(d) => d,
Err(e) => {
tracing::warn!("Telegram parse error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
if let Some(results) = data.get("result").and_then(serde_json::Value::as_array) {
for update in results {
// Advance offset past this update
if let Some(uid) = update.get("update_id").and_then(serde_json::Value::as_i64) {
offset = uid + 1;
}
let Some(message) = update.get("message") else {
continue;
};
let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else {
continue;
};
let username = message
.get("from")
.and_then(|f| f.get("username"))
.and_then(|u| u.as_str())
.unwrap_or("unknown");
if !self.is_user_allowed(username) {
tracing::warn!("Telegram: ignoring message from unauthorized user: {username}");
continue;
}
let chat_id = message
.get("chat")
.and_then(|c| c.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string())
.unwrap_or_default();
let msg = ChannelMessage {
id: Uuid::new_v4().to_string(),
sender: chat_id,
content: text.to_string(),
channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if tx.send(msg).await.is_err() {
return Ok(());
}
}
}
}
}
async fn health_check(&self) -> bool {
self.client
.get(self.api_url("getMe"))
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn telegram_channel_name() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()]);
assert_eq!(ch.name(), "telegram");
}
#[test]
fn telegram_api_url() {
let ch = TelegramChannel::new("123:ABC".into(), vec![]);
assert_eq!(
ch.api_url("getMe"),
"https://api.telegram.org/bot123:ABC/getMe"
);
}
#[test]
fn telegram_user_allowed_wildcard() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()]);
assert!(ch.is_user_allowed("anyone"));
}
#[test]
fn telegram_user_allowed_specific() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "bob".into()]);
assert!(ch.is_user_allowed("alice"));
assert!(!ch.is_user_allowed("eve"));
}
#[test]
fn telegram_user_denied_empty() {
let ch = TelegramChannel::new("t".into(), vec![]);
assert!(!ch.is_user_allowed("anyone"));
}
}

29
src/channels/traits.rs Normal file
View file

@ -0,0 +1,29 @@
use async_trait::async_trait;
/// A message received from or sent to a channel
#[derive(Debug, Clone)]
pub struct ChannelMessage {
pub id: String,
pub sender: String,
pub content: String,
pub channel: String,
pub timestamp: u64,
}
/// Core channel trait — implement for any messaging platform
#[async_trait]
pub trait Channel: Send + Sync {
/// Human-readable channel name
fn name(&self) -> &str;
/// Send a message through this channel
async fn send(&self, message: &str, recipient: &str) -> anyhow::Result<()>;
/// Start listening for incoming messages (long-running)
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
/// Check if channel is healthy
async fn health_check(&self) -> bool {
true
}
}