From 7c3f2f565fb021cacd8672dbbbdbb2208bd7db6d Mon Sep 17 00:00:00 2001 From: argenis de la rosa Date: Sat, 14 Feb 2026 18:10:39 -0500 Subject: [PATCH] fix(imessage): replace sqlite CLI path with rusqlite parameterized reads - use rusqlite with SQLITE_OPEN_READ_ONLY | SQLITE_OPEN_NO_MUTEX - run sync sqlite reads via spawn_blocking - bind since_rowid with ?1 parameter to avoid SQL interpolation - add comprehensive edge-case tests for message fetch and rowid helpers Fixes #50 --- src/channels/imessage.rs | 204 +++++++++++++++++++++++++-------------- 1 file changed, 134 insertions(+), 70 deletions(-) diff --git a/src/channels/imessage.rs b/src/channels/imessage.rs index 7a0e76c..e6e78b4 100644 --- a/src/channels/imessage.rs +++ b/src/channels/imessage.rs @@ -210,9 +210,7 @@ async fn get_max_rowid(db_path: &Path) -> anyhow::Result { &path, OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, )?; - let mut stmt = conn.prepare( - "SELECT MAX(ROWID) FROM message WHERE is_from_me = 0" - )?; + let mut stmt = conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?; let rowid: Option = stmt.query_row([], |row| row.get(0))?; Ok(rowid.unwrap_or(0)) }) @@ -228,31 +226,32 @@ async fn fetch_new_messages( since_rowid: i64, ) -> anyhow::Result> { let path = db_path.to_path_buf(); - let results = tokio::task::spawn_blocking(move || -> anyhow::Result> { - let conn = Connection::open_with_flags( - &path, - OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, - )?; - let mut stmt = conn.prepare( - "SELECT m.ROWID, h.id, m.text \ + let results = + tokio::task::spawn_blocking(move || -> anyhow::Result> { + let conn = Connection::open_with_flags( + &path, + OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, + )?; + let mut stmt = conn.prepare( + "SELECT m.ROWID, h.id, m.text \ FROM message m \ JOIN handle h ON m.handle_id = h.ROWID \ WHERE m.ROWID > ?1 \ AND m.is_from_me = 0 \ AND m.text IS NOT NULL \ ORDER BY m.ROWID ASC \ - LIMIT 20" - )?; - let rows = stmt.query_map([since_rowid], |row| { - Ok(( - row.get::<_, i64>(0)?, - row.get::<_, String>(1)?, - row.get::<_, String>(2)?, - )) - })?; - rows.collect::, _>>().map_err(Into::into) - }) - .await??; + LIMIT 20", + )?; + let rows = stmt.query_map([since_rowid], |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + )) + })?; + rows.collect::, _>>().map_err(Into::into) + }) + .await??; Ok(results) } @@ -536,9 +535,9 @@ mod tests { fn create_test_db() -> (tempfile::TempDir, std::path::PathBuf) { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("chat.db"); - + let conn = Connection::open(&db_path).unwrap(); - + // Create minimal schema matching macOS Messages.app conn.execute_batch( "CREATE TABLE handle ( @@ -551,9 +550,10 @@ mod tests { text TEXT, is_from_me INTEGER DEFAULT 0, FOREIGN KEY (handle_id) REFERENCES handle(ROWID) - );" - ).unwrap(); - + );", + ) + .unwrap(); + (dir, db_path) } @@ -569,11 +569,15 @@ mod tests { #[tokio::test] async fn get_max_rowid_with_messages() { let (_dir, db_path) = create_test_db(); - + // Insert test data { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (100, 1, 'Hello', 0)", [] @@ -588,7 +592,7 @@ mod tests { [] ).unwrap(); } - + let result = get_max_rowid(&db_path).await.unwrap(); // Should return 200, not 300 (ignores is_from_me=1) assert_eq!(result, 200); @@ -612,12 +616,20 @@ mod tests { #[tokio::test] async fn fetch_new_messages_returns_correct_data() { let (_dir, db_path) = create_test_db(); - + // Insert test data { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (2, 'user@example.com')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (2, 'user@example.com')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'First message', 0)", [] @@ -627,21 +639,35 @@ mod tests { [] ).unwrap(); } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); assert_eq!(result.len(), 2); - assert_eq!(result[0], (10, "+1234567890".to_string(), "First message".to_string())); - assert_eq!(result[1], (20, "user@example.com".to_string(), "Second message".to_string())); + assert_eq!( + result[0], + (10, "+1234567890".to_string(), "First message".to_string()) + ); + assert_eq!( + result[1], + ( + 20, + "user@example.com".to_string(), + "Second message".to_string() + ) + ); } #[tokio::test] async fn fetch_new_messages_filters_by_rowid() { let (_dir, db_path) = create_test_db(); - + // Insert test data { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Old message', 0)", [] @@ -651,7 +677,7 @@ mod tests { [] ).unwrap(); } - + // Fetch only messages after ROWID 15 let result = fetch_new_messages(&db_path, 15).await.unwrap(); assert_eq!(result.len(), 1); @@ -662,11 +688,15 @@ mod tests { #[tokio::test] async fn fetch_new_messages_excludes_sent_messages() { let (_dir, db_path) = create_test_db(); - + // Insert test data { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Received', 0)", [] @@ -676,7 +706,7 @@ mod tests { [] ).unwrap(); } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); assert_eq!(result.len(), 1); assert_eq!(result[0].2, "Received"); @@ -685,21 +715,26 @@ mod tests { #[tokio::test] async fn fetch_new_messages_excludes_null_text() { let (_dir, db_path) = create_test_db(); - + // Insert test data { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Has text', 0)", [] ).unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (20, 1, NULL, 0)", - [] - ).unwrap(); + [], + ) + .unwrap(); } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); assert_eq!(result.len(), 1); assert_eq!(result[0].2, "Has text"); @@ -708,11 +743,15 @@ mod tests { #[tokio::test] async fn fetch_new_messages_respects_limit() { let (_dir, db_path) = create_test_db(); - + // Insert 25 messages (limit is 20) { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); for i in 1..=25 { conn.execute( &format!("INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES ({i}, 1, 'Message {i}', 0)"), @@ -720,7 +759,7 @@ mod tests { ).unwrap(); } } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); assert_eq!(result.len(), 20); // Limited to 20 assert_eq!(result[0].0, 1); // First message @@ -730,11 +769,15 @@ mod tests { #[tokio::test] async fn fetch_new_messages_ordered_by_rowid_asc() { let (_dir, db_path) = create_test_db(); - + // Insert messages out of order { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (30, 1, 'Third', 0)", [] @@ -748,7 +791,7 @@ mod tests { [] ).unwrap(); } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); assert_eq!(result.len(), 3); assert_eq!(result[0].0, 10); @@ -766,17 +809,21 @@ mod tests { #[tokio::test] async fn fetch_new_messages_handles_special_characters() { let (_dir, db_path) = create_test_db(); - + // Insert message with special characters (potential SQL injection patterns) { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Hello \"world'' OR 1=1; DROP TABLE message;--', 0)", [] ).unwrap(); } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); assert_eq!(result.len(), 1); // The special characters should be preserved, not interpreted as SQL @@ -786,16 +833,20 @@ mod tests { #[tokio::test] async fn fetch_new_messages_handles_unicode() { let (_dir, db_path) = create_test_db(); - + { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Hello 🦀 世界 مرحبا', 0)", [] ).unwrap(); } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); assert_eq!(result.len(), 1); assert_eq!(result[0].2, "Hello 🦀 世界 مرحبا"); @@ -804,16 +855,21 @@ mod tests { #[tokio::test] async fn fetch_new_messages_handles_empty_text() { let (_dir, db_path) = create_test_db(); - + { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, '', 0)", - [] - ).unwrap(); + [], + ) + .unwrap(); } - + let result = fetch_new_messages(&db_path, 0).await.unwrap(); // Empty string is NOT NULL, so it's included assert_eq!(result.len(), 1); @@ -823,16 +879,20 @@ mod tests { #[tokio::test] async fn fetch_new_messages_negative_rowid_edge_case() { let (_dir, db_path) = create_test_db(); - + { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Test', 0)", [] ).unwrap(); } - + // Negative rowid should still work (fetch all messages with ROWID > -1) let result = fetch_new_messages(&db_path, -1).await.unwrap(); assert_eq!(result.len(), 1); @@ -841,16 +901,20 @@ mod tests { #[tokio::test] async fn fetch_new_messages_large_rowid_edge_case() { let (_dir, db_path) = create_test_db(); - + { let conn = Connection::open(&db_path).unwrap(); - conn.execute("INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", []).unwrap(); + conn.execute( + "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')", + [], + ) + .unwrap(); conn.execute( "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Test', 0)", [] ).unwrap(); } - + // Very large rowid should return empty (no messages after this) let result = fetch_new_messages(&db_path, i64::MAX - 1).await.unwrap(); assert!(result.is_empty());