From fda15026d6521b57bbcbf3c567a0cbfac6a13e16 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 25 Jul 2016 14:53:23 +0300 Subject: [PATCH] Faster deleted sync with version tags --- db.sql | 2 ++ operetta.js | 64 +++++++++++++++++++++++++++++++++-------------------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/db.sql b/db.sql index 80605c2..81d1bd6 100644 --- a/db.sql +++ b/db.sql @@ -43,12 +43,14 @@ create table messages ( body text not null, time timestamptz not null, flags varchar(255)[] not null, + vertag int not null default 0, foreign key (folder_id) references folders (id) on delete cascade on update cascade ); create unique index messages_unique on messages (folder_id, uid); create index messages_flags on messages using gin (folder_id, flags); create index messages_messageid on messages (messageid); create index messages_refs on messages using gin (refs); +create index messages_vertag on messages (folder_id, vertag); create table threads ( id serial not null primary key, diff --git a/operetta.js b/operetta.js index c3776dc..37605f5 100644 --- a/operetta.js +++ b/operetta.js @@ -58,7 +58,9 @@ Syncer.sync = function*(account) var [ box ] = yield srv.openBox(k, true, gen.ef()); var boxId; // IMAP sync: http://tools.ietf.org/html/rfc4549 - var [ row ] = yield pg.select('*').from('folders').where({ account_id: accountId, name: box.name }).row(gen.ef()); + var [ row ] = yield pg.select('*').from('folders') + .where({ account_id: accountId, name: box.name }).row(gen.ef()); + self.versionTag = 0; if (row) { boxId = row.id; @@ -69,8 +71,8 @@ Syncer.sync = function*(account) } else { - yield pg.update('messages', { flags: pg.sql('(array_remove(flags, \'deleted\') || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id }) - .where(pg.sql('uid is not null')).run(gen.ef()); + [ self.versionTag ] = yield pg.select('MAX(vertag)').from('messages') + .where({ folder_id: row.id }).val(gen.ef()); } yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) .where({ id: row.id }).run(gen.ef()); @@ -87,29 +89,42 @@ Syncer.sync = function*(account) boxId = row.id; } - // list messages, update flags - process.stderr.write('\rsynchronizing 0'); - yield* self.runFetch('1:*', {}, boxId, 'updateFlags'); - process.stderr.write('\n'); + var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef()); + if (maxUid) + { + // list messages, update flags and version tag + self.versionTag++; + if (self.versionTag >= 0x7fffffff) + { + yield pg.update('messages', { vertag: 0 }).where({ folder_id: boxId }).run(gen.ef()); + self.versionTag = 1; + } - // delete messages removed from IMAP server - yield pg.update('threads', { first_msg: null }) - .where(pg.sql('first_msg IN ('+ - pg.select('id').from('messages').where({ folder_id: boxId }) - .where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])')) - +')')).run(gen.ef()); - yield pg.delete('messages').where({ folder_id: boxId }) - .where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])')).run(gen.ef()); - yield pg.update('threads', - { first_msg: pg.sql('('+ - pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')' - ) }).where(pg.sql('first_msg IS NULL')).run(gen.ef()); - yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); + process.stderr.write('\rsynchronizing 0'); + yield* self.runFetch('1:'+maxUid, {}, boxId, 'updateFlags'); + process.stderr.write('\n'); + + // delete messages removed from IMAP server + yield pg.update('threads', { first_msg: null }) + .where(pg.sql('first_msg IN ('+ + pg.select('id').from('messages') + .where({ folder_id: boxId }) + .where(pg.sql('uid is not null')) + .where(pg.sql.lt('vertag', self.versionTag)) + +')')).run(gen.ef()); + yield pg.delete('messages') + .where({ folder_id: boxId }) + .where(pg.sql('uid is not null')) + .where(pg.sql.lt('vertag', self.versionTag)).run(gen.ef()); + yield pg.update('threads', + { first_msg: pg.sql('('+ + pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')' + ) }).where(pg.sql('first_msg IS NULL')).run(gen.ef()); + yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); + } // fetch new messages - var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef()); - maxUid = maxUid ? maxUid+1 : 1; - yield* self.runFetch(maxUid+':*', { + yield* self.runFetch((maxUid ? maxUid+1 : 1)+':*', { size: true, bodies: 'HEADER' }, boxId, 'saveMessages'); @@ -241,7 +256,7 @@ Syncer.updateFlags = function*(messages, boxId) flags: toPgArray(m[0].flags) })); // TODO check if something is missing - yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') }) + yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]'), vertag: self.versionTag }) .from('('+pg.sql.values(rows)+') AS t (uid, flags)') .where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef()); self.synced += messages.length; @@ -310,6 +325,7 @@ Syncer.addMessage = function*(msgrow, attrs) msgrow.time = header.date; msgrow.flags = toPgArray(msgrow.flags); msgrow.refs = toPgArray(header.references); + msgrow.vertag = self.versionTag; if (header.references.length) { var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')