diff --git a/README.md b/README.md index f7095ca..ce0a980 100644 --- a/README.md +++ b/README.md @@ -601,6 +601,8 @@ window_allowlist = [] # optional window title/process allowlist hints enabled = false # opt-in: 1000+ OAuth apps via composio.dev # api_key = "cmp_..." # optional: stored encrypted when [secrets].encrypt = true entity_id = "default" # default user_id for Composio tool calls +# Runtime tip: if execute asks for connected_account_id, run composio with +# action='list_accounts' and app='gmail' (or your toolkit) to retrieve account IDs. [identity] format = "openclaw" # "openclaw" (default, markdown files) or "aieos" (JSON) diff --git a/docs/config-reference.md b/docs/config-reference.md index cb8f3a0..2e8278e 100644 --- a/docs/config-reference.md +++ b/docs/config-reference.md @@ -75,6 +75,7 @@ Notes: - Backward compatibility: legacy `enable = true` is accepted as an alias for `enabled = true`. - If `enabled = false` or `api_key` is missing, the `composio` tool is not registered. - Typical flow: call `connect`, complete browser OAuth, then run `execute` for the desired tool action. +- If Composio returns a missing connected-account reference error, call `list_accounts` (optionally with `app`) and pass the returned `connected_account_id` to `execute`. ## `[multimodal]` diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 70919ff..0464924 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -1795,7 +1795,7 @@ pub async fn start_channels(config: Config) -> Result<()> { if config.composio.enabled { tool_descs.push(( "composio", - "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover, 'execute' to run (optionally with connected_account_id), 'connect' to OAuth.", + "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover actions, 'list_accounts' to retrieve connected account IDs, 'execute' to run (optionally with connected_account_id), and 'connect' for OAuth.", )); } tool_descs.push(( diff --git a/src/tools/composio.rs b/src/tools/composio.rs index bf26748..ef403dc 100644 --- a/src/tools/composio.rs +++ b/src/tools/composio.rs @@ -11,9 +11,11 @@ use crate::security::policy::ToolOperation; use crate::security::SecurityPolicy; use anyhow::Context; use async_trait::async_trait; +use parking_lot::RwLock; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::collections::HashMap; use std::sync::Arc; const COMPOSIO_API_BASE_V2: &str = "https://backend.composio.dev/api/v2"; @@ -33,6 +35,7 @@ pub struct ComposioTool { api_key: String, default_entity_id: String, security: Arc, + recent_connected_accounts: RwLock>, } impl ComposioTool { @@ -45,6 +48,7 @@ impl ComposioTool { api_key: api_key.to_string(), default_entity_id: normalize_entity_id(default_entity_id.unwrap_or("default")), security, + recent_connected_accounts: RwLock::new(HashMap::new()), } } @@ -120,20 +124,127 @@ impl ComposioTool { Ok(body.items) } + /// List connected accounts for a user and optional toolkit/app. + async fn list_connected_accounts( + &self, + app_name: Option<&str>, + entity_id: Option<&str>, + ) -> anyhow::Result> { + let url = format!("{COMPOSIO_API_BASE_V3}/connected_accounts"); + let mut req = self.client().get(&url).header("x-api-key", &self.api_key); + + req = req.query(&[ + ("limit", "50"), + ("order_by", "updated_at"), + ("order_direction", "desc"), + ("statuses", "INITIALIZING"), + ("statuses", "ACTIVE"), + ("statuses", "INITIATED"), + ]); + + if let Some(app) = app_name + .map(normalize_app_slug) + .filter(|app| !app.is_empty()) + { + req = req.query(&[("toolkit_slugs", app.as_str())]); + } + + if let Some(entity) = entity_id { + req = req.query(&[("user_ids", entity)]); + } + + let resp = req.send().await?; + if !resp.status().is_success() { + let err = response_error(resp).await; + anyhow::bail!("Composio v3 connected accounts lookup failed: {err}"); + } + + let body: ComposioConnectedAccountsResponse = resp + .json() + .await + .context("Failed to decode Composio v3 connected accounts response")?; + Ok(body.items) + } + + fn cache_connected_account(&self, app_name: &str, entity_id: &str, connected_account_id: &str) { + let key = connected_account_cache_key(app_name, entity_id); + self.recent_connected_accounts + .write() + .insert(key, connected_account_id.to_string()); + } + + fn get_cached_connected_account(&self, app_name: &str, entity_id: &str) -> Option { + let key = connected_account_cache_key(app_name, entity_id); + self.recent_connected_accounts.read().get(&key).cloned() + } + + async fn resolve_connected_account_ref( + &self, + app_name: Option<&str>, + entity_id: Option<&str>, + ) -> anyhow::Result> { + let app = app_name + .map(normalize_app_slug) + .filter(|app| !app.is_empty()); + let entity = entity_id.map(normalize_entity_id); + let (Some(app), Some(entity)) = (app, entity) else { + return Ok(None); + }; + + if let Some(cached) = self.get_cached_connected_account(&app, &entity) { + return Ok(Some(cached)); + } + + let accounts = self + .list_connected_accounts(Some(&app), Some(&entity)) + .await?; + let mut usable = accounts.into_iter().filter(|acct| acct.is_usable()); + let Some(first) = usable.next() else { + return Ok(None); + }; + if usable.next().is_some() { + return Ok(None); + } + + self.cache_connected_account(&app, &entity, &first.id); + Ok(Some(first.id)) + } + /// Execute a Composio action/tool with given parameters. /// /// Uses v3 endpoint first and falls back to v2 for compatibility. pub async fn execute_action( &self, action_name: &str, + app_name_hint: Option<&str>, params: serde_json::Value, entity_id: Option<&str>, connected_account_ref: Option<&str>, ) -> anyhow::Result { let tool_slug = normalize_tool_slug(action_name); + let app_hint = app_name_hint + .map(normalize_app_slug) + .filter(|app| !app.is_empty()) + .or_else(|| infer_app_slug_from_action_name(action_name)); + let normalized_entity_id = entity_id.map(normalize_entity_id); + let explicit_account_ref = connected_account_ref.and_then(|candidate| { + let trimmed = candidate.trim(); + (!trimmed.is_empty()).then_some(trimmed.to_string()) + }); + let resolved_account_ref = if explicit_account_ref.is_some() { + explicit_account_ref + } else { + self.resolve_connected_account_ref(app_hint.as_deref(), entity_id) + .await? + }; match self - .execute_action_v3(&tool_slug, params.clone(), entity_id, connected_account_ref) + .execute_action_v3( + &tool_slug, + params.clone(), + entity_id, + resolved_account_ref.as_deref(), + ) .await { Ok(result) => Ok(result), @@ -156,8 +267,13 @@ impl ComposioTool { } anyhow::bail!( - "Composio execute failed on v3 ({v3_err}) and v2 fallback attempts ({})", - v2_errors.join(" | ") + "Composio execute failed on v3 ({v3_err}) and v2 fallback attempts ({}){}", + v2_errors.join(" | "), + build_connected_account_hint( + app_hint.as_deref(), + normalized_entity_id.as_deref(), + resolved_account_ref.as_deref(), + ) ); } } @@ -269,7 +385,7 @@ impl ComposioTool { app_name: Option<&str>, auth_config_id: Option<&str>, entity_id: &str, - ) -> anyhow::Result { + ) -> anyhow::Result { let v3 = self .get_connection_url_v3(app_name, auth_config_id, entity_id) .await; @@ -296,7 +412,7 @@ impl ComposioTool { app_name: Option<&str>, auth_config_id: Option<&str>, entity_id: &str, - ) -> anyhow::Result { + ) -> anyhow::Result { let auth_config_id = match auth_config_id { Some(id) => id.to_string(), None => { @@ -330,15 +446,19 @@ impl ComposioTool { .json() .await .context("Failed to decode Composio v3 connect response")?; - extract_redirect_url(&result) - .ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v3 response")) + let redirect_url = extract_redirect_url(&result) + .ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v3 response"))?; + Ok(ComposioConnectionLink { + redirect_url, + connected_account_id: extract_connected_account_id(&result), + }) } async fn get_connection_url_v2( &self, app_name: &str, entity_id: &str, - ) -> anyhow::Result { + ) -> anyhow::Result { let url = format!("{COMPOSIO_API_BASE_V2}/connectedAccounts"); let body = json!({ @@ -363,8 +483,12 @@ impl ComposioTool { .json() .await .context("Failed to decode Composio v2 connect response")?; - extract_redirect_url(&result) - .ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v2 response")) + let redirect_url = extract_redirect_url(&result) + .ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v2 response"))?; + Ok(ComposioConnectionLink { + redirect_url, + connected_account_id: extract_connected_account_id(&result), + }) } async fn resolve_auth_config_id(&self, app_name: &str) -> anyhow::Result { @@ -418,6 +542,7 @@ impl Tool for ComposioTool { fn description(&self) -> &str { "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). \ Use action='list' to see available actions, action='execute' with action_name/tool_slug, params, and optional connected_account_id, \ + action='list_accounts' to list connected accounts and IDs, \ or action='connect' with app/auth_config_id to get OAuth URL." } @@ -427,12 +552,12 @@ impl Tool for ComposioTool { "properties": { "action": { "type": "string", - "description": "The operation: 'list' (list available actions), 'execute' (run an action), or 'connect' (get OAuth URL)", - "enum": ["list", "execute", "connect"] + "description": "The operation: 'list' (list available actions), 'list_accounts' (list connected accounts), 'execute' (run an action), or 'connect' (get OAuth URL)", + "enum": ["list", "list_accounts", "execute", "connect"] }, "app": { "type": "string", - "description": "Toolkit slug filter for 'list', or toolkit/app for 'connect' (e.g. 'gmail', 'notion', 'github')" + "description": "Toolkit slug filter for 'list' or 'list_accounts', optional app hint for 'execute', or toolkit/app for 'connect' (e.g. 'gmail', 'notion', 'github')" }, "action_name": { "type": "string", @@ -515,6 +640,55 @@ impl Tool for ComposioTool { } } + "list_accounts" => { + let app = args.get("app").and_then(|v| v.as_str()); + match self.list_connected_accounts(app, Some(entity_id)).await { + Ok(accounts) => { + if accounts.is_empty() { + let app_hint = app + .map(|value| format!(" for app '{value}'")) + .unwrap_or_default(); + return Ok(ToolResult { + success: true, + output: format!( + "No connected accounts found{app_hint} for entity '{entity_id}'. Run action='connect' first." + ), + error: None, + }); + } + + let summary: Vec = accounts + .iter() + .take(20) + .map(|account| { + let toolkit = account.toolkit_slug().unwrap_or("?"); + format!("- {} [{}] toolkit={toolkit}", account.id, account.status) + }) + .collect(); + let total = accounts.len(); + let output = format!( + "Found {total} connected accounts (entity '{entity_id}'):\n{}{}\nUse connected_account_id in action='execute' when needed.", + summary.join("\n"), + if total > 20 { + format!("\n... and {} more", total - 20) + } else { + String::new() + } + ); + Ok(ToolResult { + success: true, + output, + error: None, + }) + } + Err(e) => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Failed to list connected accounts: {e}")), + }), + } + } + "execute" => { if let Err(error) = self .security @@ -535,11 +709,12 @@ impl Tool for ComposioTool { anyhow::anyhow!("Missing 'action_name' (or 'tool_slug') for execute") })?; + let app = args.get("app").and_then(|v| v.as_str()); let params = args.get("params").cloned().unwrap_or(json!({})); let acct_ref = args.get("connected_account_id").and_then(|v| v.as_str()); match self - .execute_action(action_name, params, Some(entity_id), acct_ref) + .execute_action(action_name, app, params, Some(entity_id), acct_ref) .await { Ok(result) => { @@ -582,12 +757,24 @@ impl Tool for ComposioTool { .get_connection_url(app, auth_config_id, entity_id) .await { - Ok(url) => { + Ok(link) => { let target = app.unwrap_or(auth_config_id.unwrap_or("provided auth config")); + let mut output = format!( + "Open this URL to connect {target}:\n{}", + link.redirect_url + ); + if let Some(connected_account_id) = link.connected_account_id.as_deref() { + if let Some(app_name) = app { + self.cache_connected_account(app_name, entity_id, connected_account_id); + } + output.push_str(&format!( + "\nConnected account ID: {connected_account_id}" + )); + } Ok(ToolResult { success: true, - output: format!("Open this URL to connect {target}:\n{url}"), + output, error: None, }) } @@ -603,7 +790,7 @@ impl Tool for ComposioTool { success: false, output: String::new(), error: Some(format!( - "Unknown action '{action}'. Use 'list', 'execute', or 'connect'." + "Unknown action '{action}'. Use 'list', 'list_accounts', 'execute', or 'connect'." )), }), } @@ -627,6 +814,67 @@ fn normalize_legacy_action_name(action_name: &str) -> String { action_name.trim().replace('-', "_").to_ascii_uppercase() } +fn normalize_app_slug(app_name: &str) -> String { + app_name + .trim() + .replace('_', "-") + .to_ascii_lowercase() + .split('-') + .filter(|part| !part.is_empty()) + .collect::>() + .join("-") +} + +fn infer_app_slug_from_action_name(action_name: &str) -> Option { + let trimmed = action_name.trim(); + if trimmed.is_empty() { + return None; + } + + let raw = if trimmed.contains('-') { + trimmed.split('-').next() + } else if trimmed.contains('_') { + trimmed.split('_').next() + } else { + None + }?; + + let app = normalize_app_slug(raw); + (!app.is_empty()).then_some(app) +} + +fn connected_account_cache_key(app_name: &str, entity_id: &str) -> String { + format!( + "{}:{}", + normalize_entity_id(entity_id), + normalize_app_slug(app_name) + ) +} + +fn build_connected_account_hint( + app_hint: Option<&str>, + entity_id: Option<&str>, + connected_account_ref: Option<&str>, +) -> String { + if connected_account_ref.is_some() { + return String::new(); + } + + let Some(entity) = entity_id else { + return String::new(); + }; + + if let Some(app) = app_hint { + format!( + " Hint: use action='list_accounts' with app='{app}' and entity_id='{entity}' to retrieve connected_account_id." + ) + } else { + format!( + " Hint: use action='list_accounts' with entity_id='{entity}' to retrieve connected_account_id." + ) + } +} + fn map_v3_tools_to_actions(items: Vec) -> Vec { items .into_iter() @@ -662,6 +910,26 @@ fn extract_redirect_url(result: &serde_json::Value) -> Option { .map(ToString::to_string) } +fn extract_connected_account_id(result: &serde_json::Value) -> Option { + result + .get("connected_account_id") + .and_then(|v| v.as_str()) + .or_else(|| result.get("connectedAccountId").and_then(|v| v.as_str())) + .or_else(|| { + result + .get("data") + .and_then(|v| v.get("connected_account_id")) + .and_then(|v| v.as_str()) + }) + .or_else(|| { + result + .get("data") + .and_then(|v| v.get("connectedAccountId")) + .and_then(|v| v.as_str()) + }) + .map(ToString::to_string) +} + async fn response_error(resp: reqwest::Response) -> String { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); @@ -734,6 +1002,35 @@ struct ComposioToolsResponse { items: Vec, } +#[derive(Debug, Deserialize)] +struct ComposioConnectedAccountsResponse { + #[serde(default)] + items: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct ComposioConnectedAccount { + id: String, + #[serde(default)] + status: String, + #[serde(default)] + toolkit: Option, +} + +impl ComposioConnectedAccount { + fn is_usable(&self) -> bool { + self.status.eq_ignore_ascii_case("INITIALIZING") + || self.status.eq_ignore_ascii_case("ACTIVE") + || self.status.eq_ignore_ascii_case("INITIATED") + } + + fn toolkit_slug(&self) -> Option<&str> { + self.toolkit + .as_ref() + .and_then(|toolkit| toolkit.slug.as_deref()) + } +} + #[derive(Debug, Clone, Deserialize)] struct ComposioV3Tool { #[serde(default)] @@ -762,6 +1059,12 @@ struct ComposioAuthConfigsResponse { items: Vec, } +#[derive(Debug, Clone)] +pub struct ComposioConnectionLink { + pub redirect_url: String, + pub connected_account_id: Option, +} + #[derive(Debug, Clone, Deserialize)] struct ComposioAuthConfig { id: String, @@ -828,6 +1131,13 @@ mod tests { assert!(schema["properties"]["connected_account_id"].is_object()); let required = schema["required"].as_array().unwrap(); assert!(required.contains(&json!("action"))); + let enum_values = schema["properties"]["action"]["enum"] + .as_array() + .unwrap() + .iter() + .filter_map(|v| v.as_str()) + .collect::>(); + assert!(enum_values.contains(&"list_accounts")); } #[test] @@ -999,6 +1309,81 @@ mod tests { ); } + #[test] + fn normalize_app_slug_removes_spaces_and_normalizes_case() { + assert_eq!(normalize_app_slug(" Gmail "), "gmail"); + assert_eq!(normalize_app_slug("GITHUB_APP"), "github-app"); + } + + #[test] + fn infer_app_slug_from_action_name_handles_v2_and_v3_formats() { + assert_eq!( + infer_app_slug_from_action_name("gmail-fetch-emails").as_deref(), + Some("gmail") + ); + assert_eq!( + infer_app_slug_from_action_name("GMAIL_FETCH_EMAILS").as_deref(), + Some("gmail") + ); + assert!(infer_app_slug_from_action_name("execute").is_none()); + } + + #[test] + fn connected_account_cache_key_is_stable() { + assert_eq!( + connected_account_cache_key("GMAIL", " default "), + "default:gmail" + ); + } + + #[test] + fn build_connected_account_hint_returns_guidance_when_missing_ref() { + let hint = build_connected_account_hint(Some("gmail"), Some("default"), None); + assert!(hint.contains("list_accounts")); + assert!(hint.contains("gmail")); + assert!(hint.contains("default")); + } + + #[test] + fn build_connected_account_hint_without_app_is_still_actionable() { + let hint = build_connected_account_hint(None, Some("default"), None); + assert!(hint.contains("list_accounts")); + assert!(hint.contains("entity_id='default'")); + assert!(!hint.contains("app='")); + } + + #[test] + fn connected_account_is_usable_for_initializing_active_and_initiated() { + for status in ["INITIALIZING", "ACTIVE", "INITIATED"] { + let account = ComposioConnectedAccount { + id: "ca_1".to_string(), + status: status.to_string(), + toolkit: None, + }; + assert!(account.is_usable(), "status {status} should be usable"); + } + } + + #[test] + fn extract_connected_account_id_supports_common_shapes() { + let root = json!({"connected_account_id": "ca_root"}); + let camel = json!({"connectedAccountId": "ca_camel"}); + let nested = json!({"data": {"connected_account_id": "ca_nested"}}); + + assert_eq!( + extract_connected_account_id(&root).as_deref(), + Some("ca_root") + ); + assert_eq!( + extract_connected_account_id(&camel).as_deref(), + Some("ca_camel") + ); + assert_eq!( + extract_connected_account_id(&nested).as_deref(), + Some("ca_nested") + ); + } + #[test] fn extract_redirect_url_supports_v2_and_v3_shapes() { let v2 = json!({"redirectUrl": "https://app.composio.dev/connect-v2"});