fix: add channel message timeouts, Telegram fallback, and fix identity/observer tests
Closes #184
This commit is contained in:
parent
be6474b815
commit
dca95cac7a
5 changed files with 89 additions and 30 deletions
|
|
@ -24,15 +24,17 @@ use crate::config::Config;
|
||||||
use crate::memory::{self, Memory};
|
use crate::memory::{self, Memory};
|
||||||
use crate::providers::{self, Provider};
|
use crate::providers::{self, Provider};
|
||||||
use crate::util::truncate_with_ellipsis;
|
use crate::util::truncate_with_ellipsis;
|
||||||
|
use crate::identity;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
|
/// Maximum characters per injected workspace file (matches `OpenClaw` default).
|
||||||
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
|
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
|
||||||
|
|
||||||
const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
|
const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
|
||||||
const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
|
const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
|
||||||
|
const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 90;
|
||||||
|
|
||||||
fn spawn_supervised_listener(
|
fn spawn_supervised_listener(
|
||||||
ch: Arc<dyn Channel>,
|
ch: Arc<dyn Channel>,
|
||||||
|
|
@ -187,11 +189,11 @@ pub fn build_system_prompt(
|
||||||
|
|
||||||
// Check if AIEOS identity is configured
|
// Check if AIEOS identity is configured
|
||||||
if let Some(config) = identity_config {
|
if let Some(config) = identity_config {
|
||||||
if crate::identity::is_aieos_configured(config) {
|
if identity::is_aieos_configured(config) {
|
||||||
// Load AIEOS identity
|
// Load AIEOS identity
|
||||||
match crate::identity::load_aieos_identity(config, workspace_dir) {
|
match identity::load_aieos_identity(config, workspace_dir) {
|
||||||
Ok(Some(aieos_identity)) => {
|
Ok(Some(aieos_identity)) => {
|
||||||
let aieos_prompt = crate::identity::aieos_to_system_prompt(&aieos_identity);
|
let aieos_prompt = identity::aieos_to_system_prompt(&aieos_identity);
|
||||||
if !aieos_prompt.is_empty() {
|
if !aieos_prompt.is_empty() {
|
||||||
prompt.push_str(&aieos_prompt);
|
prompt.push_str(&aieos_prompt);
|
||||||
prompt.push_str("\n\n");
|
prompt.push_str("\n\n");
|
||||||
|
|
@ -684,13 +686,20 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call the LLM with system prompt (identity + soul + tools)
|
// Call the LLM with system prompt (identity + soul + tools)
|
||||||
match provider
|
println!(" ⏳ Processing message...");
|
||||||
.chat_with_system(Some(&system_prompt), &msg.content, &model, temperature)
|
let started_at = Instant::now();
|
||||||
.await
|
|
||||||
{
|
let llm_result = tokio::time::timeout(
|
||||||
Ok(response) => {
|
Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS),
|
||||||
|
provider.chat_with_system(Some(&system_prompt), &msg.content, &model, temperature),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match llm_result {
|
||||||
|
Ok(Ok(response)) => {
|
||||||
println!(
|
println!(
|
||||||
" 🤖 Reply: {}",
|
" 🤖 Reply ({}ms): {}",
|
||||||
|
started_at.elapsed().as_millis(),
|
||||||
truncate_with_ellipsis(&response, 80)
|
truncate_with_ellipsis(&response, 80)
|
||||||
);
|
);
|
||||||
// Find the channel that sent this message and reply
|
// Find the channel that sent this message and reply
|
||||||
|
|
@ -703,8 +712,11 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Ok(Err(e)) => {
|
||||||
eprintln!(" ❌ LLM error: {e}");
|
eprintln!(
|
||||||
|
" ❌ LLM error after {}ms: {e}",
|
||||||
|
started_at.elapsed().as_millis()
|
||||||
|
);
|
||||||
for ch in &channels {
|
for ch in &channels {
|
||||||
if ch.name() == msg.channel {
|
if ch.name() == msg.channel {
|
||||||
let _ = ch.send(&format!("⚠️ Error: {e}"), &msg.sender).await;
|
let _ = ch.send(&format!("⚠️ Error: {e}"), &msg.sender).await;
|
||||||
|
|
@ -712,6 +724,28 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(_) => {
|
||||||
|
let timeout_msg = format!(
|
||||||
|
"LLM response timed out after {}s",
|
||||||
|
CHANNEL_MESSAGE_TIMEOUT_SECS
|
||||||
|
);
|
||||||
|
eprintln!(
|
||||||
|
" ❌ {} (elapsed: {}ms)",
|
||||||
|
timeout_msg,
|
||||||
|
started_at.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
for ch in &channels {
|
||||||
|
if ch.name() == msg.channel {
|
||||||
|
let _ = ch
|
||||||
|
.send(
|
||||||
|
"⚠️ Request timed out while waiting for the model. Please try again.",
|
||||||
|
&msg.sender,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1045,9 +1079,9 @@ mod tests {
|
||||||
let ws = make_workspace();
|
let ws = make_workspace();
|
||||||
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config));
|
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config));
|
||||||
|
|
||||||
// Should fall back to OpenClaw format
|
// Should fall back to OpenClaw format when AIEOS file is not found
|
||||||
|
// (Error is logged to stderr with filename, not included in prompt)
|
||||||
assert!(prompt.contains("### SOUL.md"));
|
assert!(prompt.contains("### SOUL.md"));
|
||||||
assert!(prompt.contains("[File not found: nonexistent.json]"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
|
|
@ -370,26 +370,52 @@ impl Channel for TelegramChannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send(&self, message: &str, chat_id: &str) -> anyhow::Result<()> {
|
async fn send(&self, message: &str, chat_id: &str) -> anyhow::Result<()> {
|
||||||
let body = serde_json::json!({
|
let markdown_body = serde_json::json!({
|
||||||
"chat_id": chat_id,
|
"chat_id": chat_id,
|
||||||
"text": message,
|
"text": message,
|
||||||
"parse_mode": "Markdown"
|
"parse_mode": "Markdown"
|
||||||
});
|
});
|
||||||
|
|
||||||
let resp = self
|
let markdown_resp = self
|
||||||
.client
|
.client
|
||||||
.post(self.api_url("sendMessage"))
|
.post(self.api_url("sendMessage"))
|
||||||
.json(&body)
|
.json(&markdown_body)
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if !resp.status().is_success() {
|
if markdown_resp.status().is_success() {
|
||||||
let status = resp.status();
|
return Ok(());
|
||||||
let err = resp
|
}
|
||||||
.text()
|
|
||||||
.await
|
let markdown_status = markdown_resp.status();
|
||||||
.unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
|
let markdown_err = markdown_resp.text().await.unwrap_or_default();
|
||||||
anyhow::bail!("Telegram sendMessage failed ({status}): {err}");
|
tracing::warn!(
|
||||||
|
status = ?markdown_status,
|
||||||
|
"Telegram sendMessage with Markdown failed; retrying without parse_mode"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Retry without parse_mode as a compatibility fallback.
|
||||||
|
let plain_body = serde_json::json!({
|
||||||
|
"chat_id": chat_id,
|
||||||
|
"text": message,
|
||||||
|
});
|
||||||
|
let plain_resp = self
|
||||||
|
.client
|
||||||
|
.post(self.api_url("sendMessage"))
|
||||||
|
.json(&plain_body)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if !plain_resp.status().is_success() {
|
||||||
|
let plain_status = plain_resp.status();
|
||||||
|
let plain_err = plain_resp.text().await.unwrap_or_default();
|
||||||
|
anyhow::bail!(
|
||||||
|
"Telegram sendMessage failed (markdown {}: {}; plain {}: {})",
|
||||||
|
markdown_status,
|
||||||
|
markdown_err,
|
||||||
|
plain_status,
|
||||||
|
plain_err
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ use std::path::Path;
|
||||||
///
|
///
|
||||||
/// This follows the AIEOS schema for defining AI agent identity, personality,
|
/// This follows the AIEOS schema for defining AI agent identity, personality,
|
||||||
/// and behavior. See https://aieos.org for the full specification.
|
/// and behavior. See https://aieos.org for the full specification.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
pub struct AieosIdentity {
|
pub struct AieosIdentity {
|
||||||
/// Core identity: names, bio, origin, residence
|
/// Core identity: names, bio, origin, residence
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
|
@ -580,6 +580,7 @@ mod tests {
|
||||||
first: Some("Nova".into()),
|
first: Some("Nova".into()),
|
||||||
last: Some("AI".into()),
|
last: Some("AI".into()),
|
||||||
nickname: Some("Nov".into()),
|
nickname: Some("Nov".into()),
|
||||||
|
full: Some("Nova AI".into()),
|
||||||
}),
|
}),
|
||||||
bio: Some("A helpful assistant.".into()),
|
bio: Some("A helpful assistant.".into()),
|
||||||
origin: Some("Silicon Valley".into()),
|
origin: Some("Silicon Valley".into()),
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ mod doctor;
|
||||||
mod gateway;
|
mod gateway;
|
||||||
mod health;
|
mod health;
|
||||||
mod heartbeat;
|
mod heartbeat;
|
||||||
|
mod identity;
|
||||||
mod integrations;
|
mod integrations;
|
||||||
mod memory;
|
mod memory;
|
||||||
mod migration;
|
mod migration;
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ pub enum ObserverMetric {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Core observability trait — implement for any backend
|
/// Core observability trait — implement for any backend
|
||||||
pub trait Observer: Send + Sync {
|
pub trait Observer: Send + Sync + 'static {
|
||||||
/// Record a discrete event
|
/// Record a discrete event
|
||||||
fn record_event(&self, event: &ObserverEvent);
|
fn record_event(&self, event: &ObserverEvent);
|
||||||
|
|
||||||
|
|
@ -52,9 +52,6 @@ pub trait Observer: Send + Sync {
|
||||||
|
|
||||||
/// Downcast to `Any` for backend-specific operations
|
/// Downcast to `Any` for backend-specific operations
|
||||||
fn as_any(&self) -> &dyn std::any::Any where Self: Sized {
|
fn as_any(&self) -> &dyn std::any::Any where Self: Sized {
|
||||||
// Default implementation returns a placeholder that will fail on downcast.
|
self
|
||||||
// Implementors should override this to return `self`.
|
|
||||||
struct Placeholder;
|
|
||||||
std::any::TypeId::of::<Placeholder>()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue