fix(composio): migrate tool API calls to v3 with v2 fallback (#309) (#310)

This commit is contained in:
Chummy 2026-02-16 18:58:06 +08:00 committed by GitHub
parent 1530a8707d
commit 79a6f180a8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -7,12 +7,14 @@
// The Composio API key is stored in the encrypted secret store.
use super::traits::{Tool, ToolResult};
use anyhow::Context;
use async_trait::async_trait;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
const COMPOSIO_API_BASE: &str = "https://backend.composio.dev/api/v2";
const COMPOSIO_API_BASE_V2: &str = "https://backend.composio.dev/api/v2";
const COMPOSIO_API_BASE_V3: &str = "https://backend.composio.dev/api/v3";
/// A tool that proxies actions to the Composio managed tool platform.
pub struct ComposioTool {
@ -33,11 +35,50 @@ impl ComposioTool {
}
/// List available Composio apps/actions for the authenticated user.
///
/// Uses v3 endpoint first and falls back to v2 for compatibility.
pub async fn list_actions(
&self,
app_name: Option<&str>,
) -> anyhow::Result<Vec<ComposioAction>> {
let mut url = format!("{COMPOSIO_API_BASE}/actions");
match self.list_actions_v3(app_name).await {
Ok(items) => Ok(items),
Err(v3_err) => {
let v2 = self.list_actions_v2(app_name).await;
match v2 {
Ok(items) => Ok(items),
Err(v2_err) => anyhow::bail!(
"Composio action listing failed on v3 ({v3_err}) and v2 fallback ({v2_err})"
),
}
}
}
}
async fn list_actions_v3(&self, app_name: Option<&str>) -> anyhow::Result<Vec<ComposioAction>> {
let url = format!("{COMPOSIO_API_BASE_V3}/tools");
let mut req = self.client.get(&url).header("x-api-key", &self.api_key);
req = req.query(&[("limit", 200_u16)]);
if let Some(app) = app_name {
req = req.query(&[("toolkit_slug", app)]);
}
let resp = req.send().await?;
if !resp.status().is_success() {
let err = response_error(resp).await;
anyhow::bail!("Composio v3 API error: {err}");
}
let body: ComposioToolsResponse = resp
.json()
.await
.context("Failed to decode Composio v3 tools response")?;
Ok(map_v3_tools_to_actions(body.items))
}
async fn list_actions_v2(&self, app_name: Option<&str>) -> anyhow::Result<Vec<ComposioAction>> {
let mut url = format!("{COMPOSIO_API_BASE_V2}/actions");
if let Some(app) = app_name {
url = format!("{url}?appNames={app}");
}
@ -50,22 +91,85 @@ impl ComposioTool {
.await?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
anyhow::bail!("Composio API error: {err}");
let err = response_error(resp).await;
anyhow::bail!("Composio v2 API error: {err}");
}
let body: ComposioActionsResponse = resp.json().await?;
let body: ComposioActionsResponse = resp
.json()
.await
.context("Failed to decode Composio v2 actions response")?;
Ok(body.items)
}
/// Execute a Composio action by name with given parameters.
/// Execute a Composio action/tool with given parameters.
///
/// Uses v3 endpoint first and falls back to v2 for compatibility.
pub async fn execute_action(
&self,
action_name: &str,
params: serde_json::Value,
entity_id: Option<&str>,
) -> anyhow::Result<serde_json::Value> {
let url = format!("{COMPOSIO_API_BASE}/actions/{action_name}/execute");
let tool_slug = normalize_tool_slug(action_name);
match self
.execute_action_v3(&tool_slug, params.clone(), entity_id)
.await
{
Ok(result) => Ok(result),
Err(v3_err) => match self.execute_action_v2(action_name, params, entity_id).await {
Ok(result) => Ok(result),
Err(v2_err) => anyhow::bail!(
"Composio execute failed on v3 ({v3_err}) and v2 fallback ({v2_err})"
),
},
}
}
async fn execute_action_v3(
&self,
tool_slug: &str,
params: serde_json::Value,
entity_id: Option<&str>,
) -> anyhow::Result<serde_json::Value> {
let url = format!("{COMPOSIO_API_BASE_V3}/tools/execute/{tool_slug}");
let mut body = json!({
"arguments": params,
});
if let Some(entity) = entity_id {
body["user_id"] = json!(entity);
}
let resp = self
.client
.post(&url)
.header("x-api-key", &self.api_key)
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = response_error(resp).await;
anyhow::bail!("Composio v3 action execution failed: {err}");
}
let result: serde_json::Value = resp
.json()
.await
.context("Failed to decode Composio v3 execute response")?;
Ok(result)
}
async fn execute_action_v2(
&self,
action_name: &str,
params: serde_json::Value,
entity_id: Option<&str>,
) -> anyhow::Result<serde_json::Value> {
let url = format!("{COMPOSIO_API_BASE_V2}/actions/{action_name}/execute");
let mut body = json!({
"input": params,
@ -84,21 +188,96 @@ impl ComposioTool {
.await?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
anyhow::bail!("Composio action execution failed: {err}");
let err = response_error(resp).await;
anyhow::bail!("Composio v2 action execution failed: {err}");
}
let result: serde_json::Value = resp.json().await?;
let result: serde_json::Value = resp
.json()
.await
.context("Failed to decode Composio v2 execute response")?;
Ok(result)
}
/// Get the OAuth connection URL for a specific app.
/// Get the OAuth connection URL for a specific app/toolkit or auth config.
///
/// Uses v3 endpoint first and falls back to v2 for compatibility.
pub async fn get_connection_url(
&self,
app_name: Option<&str>,
auth_config_id: Option<&str>,
entity_id: &str,
) -> anyhow::Result<String> {
let v3 = self
.get_connection_url_v3(app_name, auth_config_id, entity_id)
.await;
match v3 {
Ok(url) => Ok(url),
Err(v3_err) => {
let app = app_name.ok_or_else(|| {
anyhow::anyhow!(
"Composio v3 connect failed ({v3_err}) and v2 fallback requires 'app'"
)
})?;
match self.get_connection_url_v2(app, entity_id).await {
Ok(url) => Ok(url),
Err(v2_err) => anyhow::bail!(
"Composio connect failed on v3 ({v3_err}) and v2 fallback ({v2_err})"
),
}
}
}
}
async fn get_connection_url_v3(
&self,
app_name: Option<&str>,
auth_config_id: Option<&str>,
entity_id: &str,
) -> anyhow::Result<String> {
let auth_config_id = match auth_config_id {
Some(id) => id.to_string(),
None => {
let app = app_name.ok_or_else(|| {
anyhow::anyhow!("Missing 'app' or 'auth_config_id' for v3 connect")
})?;
self.resolve_auth_config_id(app).await?
}
};
let url = format!("{COMPOSIO_API_BASE_V3}/connected_accounts/link");
let body = json!({
"auth_config_id": auth_config_id,
"user_id": entity_id,
});
let resp = self
.client
.post(&url)
.header("x-api-key", &self.api_key)
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = response_error(resp).await;
anyhow::bail!("Composio v3 connect failed: {err}");
}
let result: serde_json::Value = resp
.json()
.await
.context("Failed to decode Composio v3 connect response")?;
extract_redirect_url(&result)
.ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v3 response"))
}
async fn get_connection_url_v2(
&self,
app_name: &str,
entity_id: &str,
) -> anyhow::Result<String> {
let url = format!("{COMPOSIO_API_BASE}/connectedAccounts");
let url = format!("{COMPOSIO_API_BASE_V2}/connectedAccounts");
let body = json!({
"integrationId": app_name,
@ -114,16 +293,57 @@ impl ComposioTool {
.await?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
anyhow::bail!("Failed to get connection URL: {err}");
let err = response_error(resp).await;
anyhow::bail!("Composio v2 connect failed: {err}");
}
let result: serde_json::Value = resp.json().await?;
result
.get("redirectUrl")
.and_then(|v| v.as_str())
.map(String::from)
.ok_or_else(|| anyhow::anyhow!("No redirect URL in response"))
let result: serde_json::Value = resp
.json()
.await
.context("Failed to decode Composio v2 connect response")?;
extract_redirect_url(&result)
.ok_or_else(|| anyhow::anyhow!("No redirect URL in Composio v2 response"))
}
async fn resolve_auth_config_id(&self, app_name: &str) -> anyhow::Result<String> {
let url = format!("{COMPOSIO_API_BASE_V3}/auth_configs");
let resp = self
.client
.get(&url)
.header("x-api-key", &self.api_key)
.query(&[
("toolkit_slug", app_name),
("show_disabled", "true"),
("limit", "25"),
])
.send()
.await?;
if !resp.status().is_success() {
let err = response_error(resp).await;
anyhow::bail!("Composio v3 auth config lookup failed: {err}");
}
let body: ComposioAuthConfigsResponse = resp
.json()
.await
.context("Failed to decode Composio v3 auth configs response")?;
if body.items.is_empty() {
anyhow::bail!(
"No auth config found for toolkit '{app_name}'. Create one in Composio first."
);
}
let preferred = body
.items
.iter()
.find(|cfg| cfg.is_enabled())
.or_else(|| body.items.first())
.context("No usable auth config returned by Composio")?;
Ok(preferred.id.clone())
}
}
@ -135,7 +355,8 @@ impl Tool for ComposioTool {
fn description(&self) -> &str {
"Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). \
Use action='list' to see available actions, or action='execute' with action_name and params."
Use action='list' to see available actions, action='execute' with action_name/tool_slug and params, \
or action='connect' with app/auth_config_id to get OAuth URL."
}
fn parameters_schema(&self) -> serde_json::Value {
@ -149,11 +370,15 @@ impl Tool for ComposioTool {
},
"app": {
"type": "string",
"description": "App name filter for 'list', or app name for 'connect' (e.g. 'gmail', 'notion', 'github')"
"description": "Toolkit slug filter for 'list', or toolkit/app for 'connect' (e.g. 'gmail', 'notion', 'github')"
},
"action_name": {
"type": "string",
"description": "The Composio action name to execute (e.g. 'GMAIL_FETCH_EMAILS')"
"description": "Action/tool identifier to execute (legacy aliases supported)"
},
"tool_slug": {
"type": "string",
"description": "Preferred v3 tool slug to execute (alias of action_name)"
},
"params": {
"type": "object",
@ -161,7 +386,11 @@ impl Tool for ComposioTool {
},
"entity_id": {
"type": "string",
"description": "Entity ID for multi-user setups (defaults to 'default')"
"description": "Entity/user ID for multi-user setups (defaults to 'default')"
},
"auth_config_id": {
"type": "string",
"description": "Optional Composio v3 auth config id for connect flow"
}
},
"required": ["action"]
@ -222,9 +451,12 @@ impl Tool for ComposioTool {
"execute" => {
let action_name = args
.get("action_name")
.get("tool_slug")
.or_else(|| args.get("action_name"))
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'action_name' for execute"))?;
.ok_or_else(|| {
anyhow::anyhow!("Missing 'action_name' (or 'tool_slug') for execute")
})?;
let params = args.get("params").cloned().unwrap_or(json!({}));
@ -250,17 +482,26 @@ impl Tool for ComposioTool {
}
"connect" => {
let app = args
.get("app")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'app' for connect"))?;
let app = args.get("app").and_then(|v| v.as_str());
let auth_config_id = args.get("auth_config_id").and_then(|v| v.as_str());
match self.get_connection_url(app, entity_id).await {
Ok(url) => Ok(ToolResult {
success: true,
output: format!("Open this URL to connect {app}:\n{url}"),
error: None,
}),
if app.is_none() && auth_config_id.is_none() {
anyhow::bail!("Missing 'app' or 'auth_config_id' for connect");
}
match self
.get_connection_url(app, auth_config_id, entity_id)
.await
{
Ok(url) => {
let target =
app.unwrap_or(auth_config_id.unwrap_or("provided auth config"));
Ok(ToolResult {
success: true,
output: format!("Open this URL to connect {target}:\n{url}"),
error: None,
})
}
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
@ -280,6 +521,74 @@ impl Tool for ComposioTool {
}
}
fn normalize_tool_slug(action_name: &str) -> String {
action_name.trim().replace('_', "-").to_ascii_lowercase()
}
fn map_v3_tools_to_actions(items: Vec<ComposioV3Tool>) -> Vec<ComposioAction> {
items
.into_iter()
.filter_map(|item| {
let name = item.slug.or(item.name.clone())?;
let app_name = item
.toolkit
.as_ref()
.and_then(|toolkit| toolkit.slug.clone().or(toolkit.name.clone()))
.or(item.app_name);
let description = item.description.or(item.name);
Some(ComposioAction {
name,
app_name,
description,
enabled: true,
})
})
.collect()
}
fn extract_redirect_url(result: &serde_json::Value) -> Option<String> {
result
.get("redirect_url")
.and_then(|v| v.as_str())
.or_else(|| result.get("redirectUrl").and_then(|v| v.as_str()))
.or_else(|| {
result
.get("data")
.and_then(|v| v.get("redirect_url"))
.and_then(|v| v.as_str())
})
.map(ToString::to_string)
}
async fn response_error(resp: reqwest::Response) -> String {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if body.trim().is_empty() {
return format!("HTTP {}", status.as_u16());
}
if let Some(api_error) = extract_api_error_message(&body) {
format!("HTTP {}: {api_error}", status.as_u16())
} else {
format!("HTTP {}: {body}", status.as_u16())
}
}
fn extract_api_error_message(body: &str) -> Option<String> {
let parsed: serde_json::Value = serde_json::from_str(body).ok()?;
parsed
.get("error")
.and_then(|v| v.get("message"))
.and_then(|v| v.as_str())
.map(ToString::to_string)
.or_else(|| {
parsed
.get("message")
.and_then(|v| v.as_str())
.map(ToString::to_string)
})
}
// ── API response types ──────────────────────────────────────────
#[derive(Debug, Deserialize)]
@ -288,6 +597,59 @@ struct ComposioActionsResponse {
items: Vec<ComposioAction>,
}
#[derive(Debug, Deserialize)]
struct ComposioToolsResponse {
#[serde(default)]
items: Vec<ComposioV3Tool>,
}
#[derive(Debug, Clone, Deserialize)]
struct ComposioV3Tool {
#[serde(default)]
slug: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
description: Option<String>,
#[serde(rename = "appName", default)]
app_name: Option<String>,
#[serde(default)]
toolkit: Option<ComposioToolkitRef>,
}
#[derive(Debug, Clone, Deserialize)]
struct ComposioToolkitRef {
#[serde(default)]
slug: Option<String>,
#[serde(default)]
name: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ComposioAuthConfigsResponse {
#[serde(default)]
items: Vec<ComposioAuthConfig>,
}
#[derive(Debug, Clone, Deserialize)]
struct ComposioAuthConfig {
id: String,
#[serde(default)]
status: Option<String>,
#[serde(default)]
enabled: Option<bool>,
}
impl ComposioAuthConfig {
fn is_enabled(&self) -> bool {
self.enabled.unwrap_or(false)
|| self
.status
.as_deref()
.is_some_and(|v| v.eq_ignore_ascii_case("enabled"))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComposioAction {
pub name: String,
@ -323,8 +685,10 @@ mod tests {
let schema = tool.parameters_schema();
assert!(schema["properties"]["action"].is_object());
assert!(schema["properties"]["action_name"].is_object());
assert!(schema["properties"]["tool_slug"].is_object());
assert!(schema["properties"]["params"].is_object());
assert!(schema["properties"]["app"].is_object());
assert!(schema["properties"]["auth_config_id"].is_object());
let required = schema["required"].as_array().unwrap();
assert!(required.contains(&json!("action")));
}
@ -362,7 +726,7 @@ mod tests {
}
#[tokio::test]
async fn connect_without_app_returns_error() {
async fn connect_without_target_returns_error() {
let tool = ComposioTool::new("test-key");
let result = tool.execute(json!({"action": "connect"})).await;
assert!(result.is_err());
@ -400,4 +764,92 @@ mod tests {
let resp: ComposioActionsResponse = serde_json::from_str(json_str).unwrap();
assert!(resp.items.is_empty());
}
#[test]
fn composio_v3_tools_response_maps_to_actions() {
let json_str = r#"{
"items": [
{
"slug": "gmail-fetch-emails",
"name": "Gmail Fetch Emails",
"description": "Fetch inbox emails",
"toolkit": { "slug": "gmail", "name": "Gmail" }
}
]
}"#;
let resp: ComposioToolsResponse = serde_json::from_str(json_str).unwrap();
let actions = map_v3_tools_to_actions(resp.items);
assert_eq!(actions.len(), 1);
assert_eq!(actions[0].name, "gmail-fetch-emails");
assert_eq!(actions[0].app_name.as_deref(), Some("gmail"));
assert_eq!(
actions[0].description.as_deref(),
Some("Fetch inbox emails")
);
}
#[test]
fn normalize_tool_slug_supports_legacy_action_name() {
assert_eq!(
normalize_tool_slug("GMAIL_FETCH_EMAILS"),
"gmail-fetch-emails"
);
assert_eq!(
normalize_tool_slug(" github-list-repos "),
"github-list-repos"
);
}
#[test]
fn extract_redirect_url_supports_v2_and_v3_shapes() {
let v2 = json!({"redirectUrl": "https://app.composio.dev/connect-v2"});
let v3 = json!({"redirect_url": "https://app.composio.dev/connect-v3"});
let nested = json!({"data": {"redirect_url": "https://app.composio.dev/connect-nested"}});
assert_eq!(
extract_redirect_url(&v2).as_deref(),
Some("https://app.composio.dev/connect-v2")
);
assert_eq!(
extract_redirect_url(&v3).as_deref(),
Some("https://app.composio.dev/connect-v3")
);
assert_eq!(
extract_redirect_url(&nested).as_deref(),
Some("https://app.composio.dev/connect-nested")
);
}
#[test]
fn auth_config_prefers_enabled_status() {
let enabled = ComposioAuthConfig {
id: "cfg_1".into(),
status: Some("ENABLED".into()),
enabled: None,
};
let disabled = ComposioAuthConfig {
id: "cfg_2".into(),
status: Some("DISABLED".into()),
enabled: Some(false),
};
assert!(enabled.is_enabled());
assert!(!disabled.is_enabled());
}
#[test]
fn extract_api_error_message_from_common_shapes() {
let nested = r#"{"error":{"message":"tool not found"}}"#;
let flat = r#"{"message":"invalid api key"}"#;
assert_eq!(
extract_api_error_message(nested).as_deref(),
Some("tool not found")
);
assert_eq!(
extract_api_error_message(flat).as_deref(),
Some("invalid api key")
);
assert_eq!(extract_api_error_message("not-json"), None);
}
}