diff --git a/src/tools/composio.rs b/src/tools/composio.rs index 4602d5d..3096549 100644 --- a/src/tools/composio.rs +++ b/src/tools/composio.rs @@ -7,12 +7,14 @@ // The Composio API key is stored in the encrypted secret store. use super::traits::{Tool, ToolResult}; +use anyhow::Context; use async_trait::async_trait; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::json; -const COMPOSIO_API_BASE: &str = "https://backend.composio.dev/api/v2"; +const COMPOSIO_API_BASE_V2: &str = "https://backend.composio.dev/api/v2"; +const COMPOSIO_API_BASE_V3: &str = "https://backend.composio.dev/api/v3"; /// A tool that proxies actions to the Composio managed tool platform. pub struct ComposioTool { @@ -33,11 +35,50 @@ impl ComposioTool { } /// List available Composio apps/actions for the authenticated user. + /// + /// Uses v3 endpoint first and falls back to v2 for compatibility. pub async fn list_actions( &self, app_name: Option<&str>, ) -> anyhow::Result> { - let mut url = format!("{COMPOSIO_API_BASE}/actions"); + match self.list_actions_v3(app_name).await { + Ok(items) => Ok(items), + Err(v3_err) => { + let v2 = self.list_actions_v2(app_name).await; + match v2 { + Ok(items) => Ok(items), + Err(v2_err) => anyhow::bail!( + "Composio action listing failed on v3 ({v3_err}) and v2 fallback ({v2_err})" + ), + } + } + } + } + + async fn list_actions_v3(&self, app_name: Option<&str>) -> anyhow::Result> { + let url = format!("{COMPOSIO_API_BASE_V3}/tools"); + let mut req = self.client.get(&url).header("x-api-key", &self.api_key); + + req = req.query(&[("limit", 200_u16)]); + if let Some(app) = app_name { + req = req.query(&[("toolkit_slug", app)]); + } + + let resp = req.send().await?; + if !resp.status().is_success() { + let err = response_error(resp).await; + anyhow::bail!("Composio v3 API error: {err}"); + } + + let body: ComposioToolsResponse = resp + .json() + .await + .context("Failed to decode Composio v3 tools response")?; + Ok(map_v3_tools_to_actions(body.items)) + } + + async fn list_actions_v2(&self, app_name: Option<&str>) -> anyhow::Result> { + let mut url = format!("{COMPOSIO_API_BASE_V2}/actions"); if let Some(app) = app_name { url = format!("{url}?appNames={app}"); } @@ -50,22 +91,85 @@ impl ComposioTool { .await?; if !resp.status().is_success() { - let err = resp.text().await.unwrap_or_default(); - anyhow::bail!("Composio API error: {err}"); + let err = response_error(resp).await; + anyhow::bail!("Composio v2 API error: {err}"); } - let body: ComposioActionsResponse = resp.json().await?; + let body: ComposioActionsResponse = resp + .json() + .await + .context("Failed to decode Composio v2 actions response")?; Ok(body.items) } - /// Execute a Composio action by name with given parameters. + /// Execute a Composio action/tool with given parameters. + /// + /// Uses v3 endpoint first and falls back to v2 for compatibility. pub async fn execute_action( &self, action_name: &str, params: serde_json::Value, entity_id: Option<&str>, ) -> anyhow::Result { - let url = format!("{COMPOSIO_API_BASE}/actions/{action_name}/execute"); + let tool_slug = normalize_tool_slug(action_name); + + match self + .execute_action_v3(&tool_slug, params.clone(), entity_id) + .await + { + Ok(result) => Ok(result), + Err(v3_err) => match self.execute_action_v2(action_name, params, entity_id).await { + Ok(result) => Ok(result), + Err(v2_err) => anyhow::bail!( + "Composio execute failed on v3 ({v3_err}) and v2 fallback ({v2_err})" + ), + }, + } + } + + async fn execute_action_v3( + &self, + tool_slug: &str, + params: serde_json::Value, + entity_id: Option<&str>, + ) -> anyhow::Result { + let url = format!("{COMPOSIO_API_BASE_V3}/tools/execute/{tool_slug}"); + + let mut body = json!({ + "arguments": params, + }); + + if let Some(entity) = entity_id { + body["user_id"] = json!(entity); + } + + let resp = self + .client + .post(&url) + .header("x-api-key", &self.api_key) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let err = response_error(resp).await; + anyhow::bail!("Composio v3 action execution failed: {err}"); + } + + let result: serde_json::Value = resp + .json() + .await + .context("Failed to decode Composio v3 execute response")?; + Ok(result) + } + + async fn execute_action_v2( + &self, + action_name: &str, + params: serde_json::Value, + entity_id: Option<&str>, + ) -> anyhow::Result { + let url = format!("{COMPOSIO_API_BASE_V2}/actions/{action_name}/execute"); let mut body = json!({ "input": params, @@ -84,21 +188,96 @@ impl ComposioTool { .await?; if !resp.status().is_success() { - let err = resp.text().await.unwrap_or_default(); - anyhow::bail!("Composio action execution failed: {err}"); + let err = response_error(resp).await; + anyhow::bail!("Composio v2 action execution failed: {err}"); } - let result: serde_json::Value = resp.json().await?; + let result: serde_json::Value = resp + .json() + .await + .context("Failed to decode Composio v2 execute response")?; Ok(result) } - /// Get the OAuth connection URL for a specific app. + /// Get the OAuth connection URL for a specific app/toolkit or auth config. + /// + /// Uses v3 endpoint first and falls back to v2 for compatibility. pub async fn get_connection_url( + &self, + app_name: Option<&str>, + auth_config_id: Option<&str>, + entity_id: &str, + ) -> anyhow::Result { + let v3 = self + .get_connection_url_v3(app_name, auth_config_id, entity_id) + .await; + match v3 { + Ok(url) => Ok(url), + Err(v3_err) => { + let app = app_name.ok_or_else(|| { + anyhow::anyhow!( + "Composio v3 connect failed ({v3_err}) and v2 fallback requires 'app'" + ) + })?; + match self.get_connection_url_v2(app, entity_id).await { + Ok(url) => Ok(url), + Err(v2_err) => anyhow::bail!( + "Composio connect failed on v3 ({v3_err}) and v2 fallback ({v2_err})" + ), + } + } + } + } + + async fn get_connection_url_v3( + &self, + app_name: Option<&str>, + auth_config_id: Option<&str>, + entity_id: &str, + ) -> anyhow::Result { + let auth_config_id = match auth_config_id { + Some(id) => id.to_string(), + None => { + let app = app_name.ok_or_else(|| { + anyhow::anyhow!("Missing 'app' or 'auth_config_id' for v3 connect") + })?; + self.resolve_auth_config_id(app).await? + } + }; + + let url = format!("{COMPOSIO_API_BASE_V3}/connected_accounts/link"); + let body = json!({ + "auth_config_id": auth_config_id, + "user_id": entity_id, + }); + + let resp = self + .client + .post(&url) + .header("x-api-key", &self.api_key) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let err = response_error(resp).await; + anyhow::bail!("Composio v3 connect failed: {err}"); + } + + let result: serde_json::Value = resp + .json() + .await + .context("Failed to decode Composio v3 connect response")?; + extract_redirect_url(&result) + .ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v3 response")) + } + + async fn get_connection_url_v2( &self, app_name: &str, entity_id: &str, ) -> anyhow::Result { - let url = format!("{COMPOSIO_API_BASE}/connectedAccounts"); + let url = format!("{COMPOSIO_API_BASE_V2}/connectedAccounts"); let body = json!({ "integrationId": app_name, @@ -114,16 +293,57 @@ impl ComposioTool { .await?; if !resp.status().is_success() { - let err = resp.text().await.unwrap_or_default(); - anyhow::bail!("Failed to get connection URL: {err}"); + let err = response_error(resp).await; + anyhow::bail!("Composio v2 connect failed: {err}"); } - let result: serde_json::Value = resp.json().await?; - result - .get("redirectUrl") - .and_then(|v| v.as_str()) - .map(String::from) - .ok_or_else(|| anyhow::anyhow!("No redirect URL in response")) + let result: serde_json::Value = resp + .json() + .await + .context("Failed to decode Composio v2 connect response")?; + extract_redirect_url(&result) + .ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v2 response")) + } + + async fn resolve_auth_config_id(&self, app_name: &str) -> anyhow::Result { + let url = format!("{COMPOSIO_API_BASE_V3}/auth_configs"); + + let resp = self + .client + .get(&url) + .header("x-api-key", &self.api_key) + .query(&[ + ("toolkit_slug", app_name), + ("show_disabled", "true"), + ("limit", "25"), + ]) + .send() + .await?; + + if !resp.status().is_success() { + let err = response_error(resp).await; + anyhow::bail!("Composio v3 auth config lookup failed: {err}"); + } + + let body: ComposioAuthConfigsResponse = resp + .json() + .await + .context("Failed to decode Composio v3 auth configs response")?; + + if body.items.is_empty() { + anyhow::bail!( + "No auth config found for toolkit '{app_name}'. Create one in Composio first." + ); + } + + let preferred = body + .items + .iter() + .find(|cfg| cfg.is_enabled()) + .or_else(|| body.items.first()) + .context("No usable auth config returned by Composio")?; + + Ok(preferred.id.clone()) } } @@ -135,7 +355,8 @@ impl Tool for ComposioTool { fn description(&self) -> &str { "Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). \ - Use action='list' to see available actions, or action='execute' with action_name and params." + Use action='list' to see available actions, action='execute' with action_name/tool_slug and params, \ + or action='connect' with app/auth_config_id to get OAuth URL." } fn parameters_schema(&self) -> serde_json::Value { @@ -149,11 +370,15 @@ impl Tool for ComposioTool { }, "app": { "type": "string", - "description": "App name filter for 'list', or app name for 'connect' (e.g. 'gmail', 'notion', 'github')" + "description": "Toolkit slug filter for 'list', or toolkit/app for 'connect' (e.g. 'gmail', 'notion', 'github')" }, "action_name": { "type": "string", - "description": "The Composio action name to execute (e.g. 'GMAIL_FETCH_EMAILS')" + "description": "Action/tool identifier to execute (legacy aliases supported)" + }, + "tool_slug": { + "type": "string", + "description": "Preferred v3 tool slug to execute (alias of action_name)" }, "params": { "type": "object", @@ -161,7 +386,11 @@ impl Tool for ComposioTool { }, "entity_id": { "type": "string", - "description": "Entity ID for multi-user setups (defaults to 'default')" + "description": "Entity/user ID for multi-user setups (defaults to 'default')" + }, + "auth_config_id": { + "type": "string", + "description": "Optional Composio v3 auth config id for connect flow" } }, "required": ["action"] @@ -222,9 +451,12 @@ impl Tool for ComposioTool { "execute" => { let action_name = args - .get("action_name") + .get("tool_slug") + .or_else(|| args.get("action_name")) .and_then(|v| v.as_str()) - .ok_or_else(|| anyhow::anyhow!("Missing 'action_name' for execute"))?; + .ok_or_else(|| { + anyhow::anyhow!("Missing 'action_name' (or 'tool_slug') for execute") + })?; let params = args.get("params").cloned().unwrap_or(json!({})); @@ -250,17 +482,26 @@ impl Tool for ComposioTool { } "connect" => { - let app = args - .get("app") - .and_then(|v| v.as_str()) - .ok_or_else(|| anyhow::anyhow!("Missing 'app' for connect"))?; + let app = args.get("app").and_then(|v| v.as_str()); + let auth_config_id = args.get("auth_config_id").and_then(|v| v.as_str()); - match self.get_connection_url(app, entity_id).await { - Ok(url) => Ok(ToolResult { - success: true, - output: format!("Open this URL to connect {app}:\n{url}"), - error: None, - }), + if app.is_none() && auth_config_id.is_none() { + anyhow::bail!("Missing 'app' or 'auth_config_id' for connect"); + } + + match self + .get_connection_url(app, auth_config_id, entity_id) + .await + { + Ok(url) => { + let target = + app.unwrap_or(auth_config_id.unwrap_or("provided auth config")); + Ok(ToolResult { + success: true, + output: format!("Open this URL to connect {target}:\n{url}"), + error: None, + }) + } Err(e) => Ok(ToolResult { success: false, output: String::new(), @@ -280,6 +521,74 @@ impl Tool for ComposioTool { } } +fn normalize_tool_slug(action_name: &str) -> String { + action_name.trim().replace('_', "-").to_ascii_lowercase() +} + +fn map_v3_tools_to_actions(items: Vec) -> Vec { + items + .into_iter() + .filter_map(|item| { + let name = item.slug.or(item.name.clone())?; + let app_name = item + .toolkit + .as_ref() + .and_then(|toolkit| toolkit.slug.clone().or(toolkit.name.clone())) + .or(item.app_name); + let description = item.description.or(item.name); + Some(ComposioAction { + name, + app_name, + description, + enabled: true, + }) + }) + .collect() +} + +fn extract_redirect_url(result: &serde_json::Value) -> Option { + result + .get("redirect_url") + .and_then(|v| v.as_str()) + .or_else(|| result.get("redirectUrl").and_then(|v| v.as_str())) + .or_else(|| { + result + .get("data") + .and_then(|v| v.get("redirect_url")) + .and_then(|v| v.as_str()) + }) + .map(ToString::to_string) +} + +async fn response_error(resp: reqwest::Response) -> String { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + if body.trim().is_empty() { + return format!("HTTP {}", status.as_u16()); + } + + if let Some(api_error) = extract_api_error_message(&body) { + format!("HTTP {}: {api_error}", status.as_u16()) + } else { + format!("HTTP {}: {body}", status.as_u16()) + } +} + +fn extract_api_error_message(body: &str) -> Option { + let parsed: serde_json::Value = serde_json::from_str(body).ok()?; + parsed + .get("error") + .and_then(|v| v.get("message")) + .and_then(|v| v.as_str()) + .map(ToString::to_string) + .or_else(|| { + parsed + .get("message") + .and_then(|v| v.as_str()) + .map(ToString::to_string) + }) +} + // ── API response types ────────────────────────────────────────── #[derive(Debug, Deserialize)] @@ -288,6 +597,59 @@ struct ComposioActionsResponse { items: Vec, } +#[derive(Debug, Deserialize)] +struct ComposioToolsResponse { + #[serde(default)] + items: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct ComposioV3Tool { + #[serde(default)] + slug: Option, + #[serde(default)] + name: Option, + #[serde(default)] + description: Option, + #[serde(rename = "appName", default)] + app_name: Option, + #[serde(default)] + toolkit: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct ComposioToolkitRef { + #[serde(default)] + slug: Option, + #[serde(default)] + name: Option, +} + +#[derive(Debug, Deserialize)] +struct ComposioAuthConfigsResponse { + #[serde(default)] + items: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct ComposioAuthConfig { + id: String, + #[serde(default)] + status: Option, + #[serde(default)] + enabled: Option, +} + +impl ComposioAuthConfig { + fn is_enabled(&self) -> bool { + self.enabled.unwrap_or(false) + || self + .status + .as_deref() + .is_some_and(|v| v.eq_ignore_ascii_case("enabled")) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ComposioAction { pub name: String, @@ -323,8 +685,10 @@ mod tests { let schema = tool.parameters_schema(); assert!(schema["properties"]["action"].is_object()); assert!(schema["properties"]["action_name"].is_object()); + assert!(schema["properties"]["tool_slug"].is_object()); assert!(schema["properties"]["params"].is_object()); assert!(schema["properties"]["app"].is_object()); + assert!(schema["properties"]["auth_config_id"].is_object()); let required = schema["required"].as_array().unwrap(); assert!(required.contains(&json!("action"))); } @@ -362,7 +726,7 @@ mod tests { } #[tokio::test] - async fn connect_without_app_returns_error() { + async fn connect_without_target_returns_error() { let tool = ComposioTool::new("test-key"); let result = tool.execute(json!({"action": "connect"})).await; assert!(result.is_err()); @@ -400,4 +764,92 @@ mod tests { let resp: ComposioActionsResponse = serde_json::from_str(json_str).unwrap(); assert!(resp.items.is_empty()); } + + #[test] + fn composio_v3_tools_response_maps_to_actions() { + let json_str = r#"{ + "items": [ + { + "slug": "gmail-fetch-emails", + "name": "Gmail Fetch Emails", + "description": "Fetch inbox emails", + "toolkit": { "slug": "gmail", "name": "Gmail" } + } + ] + }"#; + let resp: ComposioToolsResponse = serde_json::from_str(json_str).unwrap(); + let actions = map_v3_tools_to_actions(resp.items); + assert_eq!(actions.len(), 1); + assert_eq!(actions[0].name, "gmail-fetch-emails"); + assert_eq!(actions[0].app_name.as_deref(), Some("gmail")); + assert_eq!( + actions[0].description.as_deref(), + Some("Fetch inbox emails") + ); + } + + #[test] + fn normalize_tool_slug_supports_legacy_action_name() { + assert_eq!( + normalize_tool_slug("GMAIL_FETCH_EMAILS"), + "gmail-fetch-emails" + ); + assert_eq!( + normalize_tool_slug(" github-list-repos "), + "github-list-repos" + ); + } + + #[test] + fn extract_redirect_url_supports_v2_and_v3_shapes() { + let v2 = json!({"redirectUrl": "https://app.composio.dev/connect-v2"}); + let v3 = json!({"redirect_url": "https://app.composio.dev/connect-v3"}); + let nested = json!({"data": {"redirect_url": "https://app.composio.dev/connect-nested"}}); + + assert_eq!( + extract_redirect_url(&v2).as_deref(), + Some("https://app.composio.dev/connect-v2") + ); + assert_eq!( + extract_redirect_url(&v3).as_deref(), + Some("https://app.composio.dev/connect-v3") + ); + assert_eq!( + extract_redirect_url(&nested).as_deref(), + Some("https://app.composio.dev/connect-nested") + ); + } + + #[test] + fn auth_config_prefers_enabled_status() { + let enabled = ComposioAuthConfig { + id: "cfg_1".into(), + status: Some("ENABLED".into()), + enabled: None, + }; + let disabled = ComposioAuthConfig { + id: "cfg_2".into(), + status: Some("DISABLED".into()), + enabled: Some(false), + }; + + assert!(enabled.is_enabled()); + assert!(!disabled.is_enabled()); + } + + #[test] + fn extract_api_error_message_from_common_shapes() { + let nested = r#"{"error":{"message":"tool not found"}}"#; + let flat = r#"{"message":"invalid api key"}"#; + + assert_eq!( + extract_api_error_message(nested).as_deref(), + Some("tool not found") + ); + assert_eq!( + extract_api_error_message(flat).as_deref(), + Some("invalid api key") + ); + assert_eq!(extract_api_error_message("not-json"), None); + } }