diff --git a/src/channels/imessage.rs b/src/channels/imessage.rs index e81d6a1..9675d15 100644 --- a/src/channels/imessage.rs +++ b/src/channels/imessage.rs @@ -149,7 +149,7 @@ end tell"# // Open a persistent read-only connection instead of creating // a new one on every 3-second poll cycle. let path = db_path.to_path_buf(); - let mut conn = tokio::task::spawn_blocking(move || -> anyhow::Result { + let conn = tokio::task::spawn_blocking(move || -> anyhow::Result { Ok(Connection::open_with_flags( &path, OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, @@ -158,25 +158,28 @@ end tell"# .await??; // Track the last ROWID we've seen (shuttle conn in and out) - let (c, initial_rowid) = + let (mut conn, initial_rowid) = tokio::task::spawn_blocking(move || -> anyhow::Result<(Connection, i64)> { - let mut stmt = - conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?; - let rowid: Option = stmt.query_row([], |row| row.get(0))?; - Ok((conn, rowid.unwrap_or(0))) + let rowid = { + let mut stmt = + conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?; + let rowid: Option = stmt.query_row([], |row| row.get(0))?; + rowid.unwrap_or(0) + }; + Ok((conn, rowid)) }) .await??; - conn = c; let mut last_rowid = initial_rowid; loop { tokio::time::sleep(tokio::time::Duration::from_secs(self.poll_interval_secs)).await; let since = last_rowid; - let poll_result = tokio::task::spawn_blocking( - move || -> anyhow::Result<(Connection, Vec<(i64, String, String)>)> { - let mut stmt = conn.prepare( - "SELECT m.ROWID, h.id, m.text \ + let (returned_conn, poll_result) = tokio::task::spawn_blocking( + move || -> (Connection, anyhow::Result>) { + let result = (|| -> anyhow::Result> { + let mut stmt = conn.prepare( + "SELECT m.ROWID, h.id, m.text \ FROM message m \ JOIN handle h ON m.handle_id = h.ROWID \ WHERE m.ROWID > ?1 \ @@ -184,23 +187,27 @@ end tell"# AND m.text IS NOT NULL \ ORDER BY m.ROWID ASC \ LIMIT 20", - )?; - let rows = stmt.query_map([since], |row| { - Ok(( - row.get::<_, i64>(0)?, - row.get::<_, String>(1)?, - row.get::<_, String>(2)?, - )) - })?; - let results = rows.collect::, _>>()?; - Ok((conn, results)) + )?; + let rows = stmt.query_map([since], |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + )) + })?; + let results = rows.collect::, _>>()?; + Ok(results) + })(); + + (conn, result) }, ) - .await?; + .await + .map_err(|e| anyhow::anyhow!("iMessage poll worker join error: {e}"))?; + conn = returned_conn; match poll_result { - Ok((c, messages)) => { - conn = c; + Ok(messages) => { for (rowid, sender, text) in messages { if rowid > last_rowid { last_rowid = rowid;