fix(dingtalk,daemon): process stream callbacks and supervise DingTalk channel
Include DingTalk in daemon supervised channel detection so the listener starts in daemon mode. Handle CALLBACK stream frames, subscribe to bot message topic, and improve session webhook routing for private/group replies. Add regression tests for supervised-channel detection and DingTalk payload/chat-id parsing.
This commit is contained in:
parent
32bfe1d186
commit
4b89e91a5a
2 changed files with 88 additions and 40 deletions
|
|
@ -7,7 +7,9 @@ use tokio::sync::RwLock;
|
|||
use tokio_tungstenite::tungstenite::Message;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// DingTalk channel — connects via Stream Mode WebSocket for real-time messages.
|
||||
const DINGTALK_BOT_CALLBACK_TOPIC: &str = "/v1.0/im/bot/messages/get";
|
||||
|
||||
/// DingTalk (钉钉) channel — connects via Stream Mode WebSocket for real-time messages.
|
||||
/// Replies are sent through per-message session webhook URLs.
|
||||
pub struct DingTalkChannel {
|
||||
client_id: String,
|
||||
|
|
@ -41,11 +43,46 @@ impl DingTalkChannel {
|
|||
self.allowed_users.iter().any(|u| u == "*" || u == user_id)
|
||||
}
|
||||
|
||||
fn parse_stream_data(frame: &serde_json::Value) -> Option<serde_json::Value> {
|
||||
match frame.get("data") {
|
||||
Some(serde_json::Value::String(raw)) => serde_json::from_str(raw).ok(),
|
||||
Some(serde_json::Value::Object(_)) => frame.get("data").cloned(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_chat_id(data: &serde_json::Value, sender_id: &str) -> String {
|
||||
let is_private_chat = data
|
||||
.get("conversationType")
|
||||
.and_then(|value| {
|
||||
value
|
||||
.as_str()
|
||||
.map(|v| v == "1")
|
||||
.or_else(|| value.as_i64().map(|v| v == 1))
|
||||
})
|
||||
.unwrap_or(true);
|
||||
|
||||
if is_private_chat {
|
||||
sender_id.to_string()
|
||||
} else {
|
||||
data.get("conversationId")
|
||||
.and_then(|c| c.as_str())
|
||||
.unwrap_or(sender_id)
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a connection with DingTalk's gateway to get a WebSocket endpoint.
|
||||
async fn register_connection(&self) -> anyhow::Result<GatewayResponse> {
|
||||
let body = serde_json::json!({
|
||||
"clientId": self.client_id,
|
||||
"clientSecret": self.client_secret,
|
||||
"subscriptions": [
|
||||
{
|
||||
"type": "CALLBACK",
|
||||
"topic": DINGTALK_BOT_CALLBACK_TOPIC,
|
||||
}
|
||||
],
|
||||
});
|
||||
|
||||
let resp = self
|
||||
|
|
@ -65,17 +102,6 @@ impl DingTalkChannel {
|
|||
Ok(gw)
|
||||
}
|
||||
|
||||
fn resolve_reply_target(
|
||||
sender_id: &str,
|
||||
conversation_type: &str,
|
||||
conversation_id: Option<&str>,
|
||||
) -> String {
|
||||
if conversation_type == "1" {
|
||||
sender_id.to_string()
|
||||
} else {
|
||||
conversation_id.unwrap_or(sender_id).to_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
@ -168,13 +194,14 @@ impl Channel for DingTalkChannel {
|
|||
break;
|
||||
}
|
||||
}
|
||||
"EVENT" => {
|
||||
// Parse the chatbot callback data from the event
|
||||
let data_str = frame.get("data").and_then(|d| d.as_str()).unwrap_or("{}");
|
||||
|
||||
let data: serde_json::Value = match serde_json::from_str(data_str) {
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
"EVENT" | "CALLBACK" => {
|
||||
// Parse the chatbot callback data from the frame.
|
||||
let data = match Self::parse_stream_data(&frame) {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
tracing::debug!("DingTalk: frame has no parseable data payload");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Extract message content
|
||||
|
|
@ -201,22 +228,16 @@ impl Channel for DingTalkChannel {
|
|||
continue;
|
||||
}
|
||||
|
||||
let conversation_type = data
|
||||
.get("conversationType")
|
||||
.and_then(|c| c.as_str())
|
||||
.unwrap_or("1");
|
||||
|
||||
// Private chat uses sender ID, group chat uses conversation ID
|
||||
let chat_id = Self::resolve_reply_target(
|
||||
sender_id,
|
||||
conversation_type,
|
||||
data.get("conversationId").and_then(|c| c.as_str()),
|
||||
);
|
||||
// Private chat uses sender ID, group chat uses conversation ID.
|
||||
let chat_id = Self::resolve_chat_id(&data, sender_id);
|
||||
|
||||
// Store session webhook for later replies
|
||||
if let Some(webhook) = data.get("sessionWebhook").and_then(|w| w.as_str()) {
|
||||
let webhook = webhook.to_string();
|
||||
let mut webhooks = self.session_webhooks.write().await;
|
||||
webhooks.insert(chat_id.clone(), webhook.to_string());
|
||||
// Use both keys so reply routing works for both group and private flows.
|
||||
webhooks.insert(chat_id.clone(), webhook.clone());
|
||||
webhooks.insert(sender_id.to_string(), webhook);
|
||||
}
|
||||
|
||||
// Acknowledge the event
|
||||
|
|
@ -319,20 +340,36 @@ client_secret = "secret"
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_reply_target_private_chat_uses_sender_id() {
|
||||
let target = DingTalkChannel::resolve_reply_target("staff_1", "1", Some("conv_1"));
|
||||
assert_eq!(target, "staff_1");
|
||||
fn parse_stream_data_supports_string_payload() {
|
||||
let frame = serde_json::json!({
|
||||
"data": "{\"text\":{\"content\":\"hello\"}}"
|
||||
});
|
||||
let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap();
|
||||
assert_eq!(
|
||||
parsed.get("text").and_then(|v| v.get("content")),
|
||||
Some(&serde_json::json!("hello"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_reply_target_group_chat_uses_conversation_id() {
|
||||
let target = DingTalkChannel::resolve_reply_target("staff_1", "2", Some("conv_1"));
|
||||
assert_eq!(target, "conv_1");
|
||||
fn parse_stream_data_supports_object_payload() {
|
||||
let frame = serde_json::json!({
|
||||
"data": {"text": {"content": "hello"}}
|
||||
});
|
||||
let parsed = DingTalkChannel::parse_stream_data(&frame).unwrap();
|
||||
assert_eq!(
|
||||
parsed.get("text").and_then(|v| v.get("content")),
|
||||
Some(&serde_json::json!("hello"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resolve_reply_target_group_chat_falls_back_to_sender_id() {
|
||||
let target = DingTalkChannel::resolve_reply_target("staff_1", "2", None);
|
||||
assert_eq!(target, "staff_1");
|
||||
fn resolve_chat_id_handles_numeric_group_conversation_type() {
|
||||
let data = serde_json::json!({
|
||||
"conversationType": 2,
|
||||
"conversationId": "cid-group",
|
||||
});
|
||||
let chat_id = DingTalkChannel::resolve_chat_id(&data, "staff-1");
|
||||
assert_eq!(chat_id, "cid-group");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -299,4 +299,15 @@ mod tests {
|
|||
});
|
||||
assert!(has_supervised_channels(&config));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detects_dingtalk_as_supervised_channel() {
|
||||
let mut config = Config::default();
|
||||
config.channels_config.dingtalk = Some(crate::config::schema::DingTalkConfig {
|
||||
client_id: "client_id".into(),
|
||||
client_secret: "client_secret".into(),
|
||||
allowed_users: vec!["*".into()],
|
||||
});
|
||||
assert!(has_supervised_channels(&config));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue