fix(imessage): preserve sqlite conn across polling safely

This commit is contained in:
Chummy 2026-02-18 14:29:26 +08:00
parent 1ddcb0a573
commit 83b098d7ac

View file

@ -149,7 +149,7 @@ end tell"#
// Open a persistent read-only connection instead of creating // Open a persistent read-only connection instead of creating
// a new one on every 3-second poll cycle. // a new one on every 3-second poll cycle.
let path = db_path.to_path_buf(); let path = db_path.to_path_buf();
let mut conn = tokio::task::spawn_blocking(move || -> anyhow::Result<Connection> { let conn = tokio::task::spawn_blocking(move || -> anyhow::Result<Connection> {
Ok(Connection::open_with_flags( Ok(Connection::open_with_flags(
&path, &path,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
@ -158,25 +158,28 @@ end tell"#
.await??; .await??;
// Track the last ROWID we've seen (shuttle conn in and out) // Track the last ROWID we've seen (shuttle conn in and out)
let (c, initial_rowid) = let (mut conn, initial_rowid) =
tokio::task::spawn_blocking(move || -> anyhow::Result<(Connection, i64)> { tokio::task::spawn_blocking(move || -> anyhow::Result<(Connection, i64)> {
let mut stmt = let rowid = {
conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?; let mut stmt =
let rowid: Option<i64> = stmt.query_row([], |row| row.get(0))?; conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?;
Ok((conn, rowid.unwrap_or(0))) let rowid: Option<i64> = stmt.query_row([], |row| row.get(0))?;
rowid.unwrap_or(0)
};
Ok((conn, rowid))
}) })
.await??; .await??;
conn = c;
let mut last_rowid = initial_rowid; let mut last_rowid = initial_rowid;
loop { loop {
tokio::time::sleep(tokio::time::Duration::from_secs(self.poll_interval_secs)).await; tokio::time::sleep(tokio::time::Duration::from_secs(self.poll_interval_secs)).await;
let since = last_rowid; let since = last_rowid;
let poll_result = tokio::task::spawn_blocking( let (returned_conn, poll_result) = tokio::task::spawn_blocking(
move || -> anyhow::Result<(Connection, Vec<(i64, String, String)>)> { move || -> (Connection, anyhow::Result<Vec<(i64, String, String)>>) {
let mut stmt = conn.prepare( let result = (|| -> anyhow::Result<Vec<(i64, String, String)>> {
"SELECT m.ROWID, h.id, m.text \ let mut stmt = conn.prepare(
"SELECT m.ROWID, h.id, m.text \
FROM message m \ FROM message m \
JOIN handle h ON m.handle_id = h.ROWID \ JOIN handle h ON m.handle_id = h.ROWID \
WHERE m.ROWID > ?1 \ WHERE m.ROWID > ?1 \
@ -184,23 +187,27 @@ end tell"#
AND m.text IS NOT NULL \ AND m.text IS NOT NULL \
ORDER BY m.ROWID ASC \ ORDER BY m.ROWID ASC \
LIMIT 20", LIMIT 20",
)?; )?;
let rows = stmt.query_map([since], |row| { let rows = stmt.query_map([since], |row| {
Ok(( Ok((
row.get::<_, i64>(0)?, row.get::<_, i64>(0)?,
row.get::<_, String>(1)?, row.get::<_, String>(1)?,
row.get::<_, String>(2)?, row.get::<_, String>(2)?,
)) ))
})?; })?;
let results = rows.collect::<Result<Vec<_>, _>>()?; let results = rows.collect::<Result<Vec<_>, _>>()?;
Ok((conn, results)) Ok(results)
})();
(conn, result)
}, },
) )
.await?; .await
.map_err(|e| anyhow::anyhow!("iMessage poll worker join error: {e}"))?;
conn = returned_conn;
match poll_result { match poll_result {
Ok((c, messages)) => { Ok(messages) => {
conn = c;
for (rowid, sender, text) in messages { for (rowid, sender, text) in messages {
if rowid > last_rowid { if rowid > last_rowid {
last_rowid = rowid; last_rowid = rowid;