perf(imessage): reuse persistent SQLite connection across poll cycles

Problem:
The iMessage listener opened a new SQLite connection to the Messages
database on every ~3-second poll cycle via get_max_rowid() and
fetch_new_messages(), creating ~40 connection open/close cycles per
minute. Each cycle incurs filesystem syscalls, WAL header reads,
and potential page cache cold starts.

Fix:
Open a single read-only connection before the poll loop and reuse it
across iterations using the 'shuttle' pattern: the connection is moved
into each spawn_blocking closure and returned alongside the results,
then reassigned for the next iteration. This eliminates per-poll
connection overhead while preserving the spawn_blocking pattern that
keeps SQLite I/O off the Tokio runtime thread.

The standalone get_max_rowid() and fetch_new_messages() helper
functions are retained for use by tests and other callers.

Ref: zeroclaw-labs/zeroclaw#710 (Item 9)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Alex Gorevski 2026-02-17 19:53:18 -08:00 committed by Chummy
parent 14066d094f
commit 1ddcb0a573

View file

@ -146,16 +146,61 @@ end tell"#
);
}
// Track the last ROWID we've seen
let mut last_rowid = get_max_rowid(&db_path).await.unwrap_or(0);
// Open a persistent read-only connection instead of creating
// a new one on every 3-second poll cycle.
let path = db_path.to_path_buf();
let mut conn = tokio::task::spawn_blocking(move || -> anyhow::Result<Connection> {
Ok(Connection::open_with_flags(
&path,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?)
})
.await??;
// Track the last ROWID we've seen (shuttle conn in and out)
let (c, initial_rowid) =
tokio::task::spawn_blocking(move || -> anyhow::Result<(Connection, i64)> {
let mut stmt =
conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?;
let rowid: Option<i64> = stmt.query_row([], |row| row.get(0))?;
Ok((conn, rowid.unwrap_or(0)))
})
.await??;
conn = c;
let mut last_rowid = initial_rowid;
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(self.poll_interval_secs)).await;
let new_messages = fetch_new_messages(&db_path, last_rowid).await;
let since = last_rowid;
let poll_result = tokio::task::spawn_blocking(
move || -> anyhow::Result<(Connection, Vec<(i64, String, String)>)> {
let mut stmt = conn.prepare(
"SELECT m.ROWID, h.id, m.text \
FROM message m \
JOIN handle h ON m.handle_id = h.ROWID \
WHERE m.ROWID > ?1 \
AND m.is_from_me = 0 \
AND m.text IS NOT NULL \
ORDER BY m.ROWID ASC \
LIMIT 20",
)?;
let rows = stmt.query_map([since], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
))
})?;
let results = rows.collect::<Result<Vec<_>, _>>()?;
Ok((conn, results))
},
)
.await?;
match new_messages {
Ok(messages) => {
match poll_result {
Ok((c, messages)) => {
conn = c;
for (rowid, sender, text) in messages {
if rowid > last_rowid {
last_rowid = rowid;