From b662c713193a6b6391af211ff57ca77145c4efd3 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 11 Sep 2016 18:42:08 +0300 Subject: [PATCH] Use in-memory sync for flags --- db.sql | 2 - operetta.js | 155 +++++++++++++++++++++++++++++++--------------------- 2 files changed, 92 insertions(+), 65 deletions(-) diff --git a/db.sql b/db.sql index b86f28d..769790f 100644 --- a/db.sql +++ b/db.sql @@ -48,14 +48,12 @@ create table messages ( time timestamptz not null, size unsigned not null, flags varchar(255)[] not null, - vertag int not null default 0, foreign key (folder_id) references folders (id) on delete cascade on update cascade ); create unique index messages_unique on messages (folder_id, uid); create index messages_flags on messages using gin (folder_id, flags); create index messages_messageid on messages (messageid); create index messages_refs on messages using gin (refs); -create index messages_vertag on messages (folder_id, vertag); create index messages_time on messages (folder_id, time); create index messages_text on messages using gin (text_index); create or replace function fn_messages_text_index() returns trigger diff --git a/operetta.js b/operetta.js index 25fb694..9c193b6 100644 --- a/operetta.js +++ b/operetta.js @@ -276,7 +276,6 @@ Syncer.sync = function*(account) // IMAP sync: http://tools.ietf.org/html/rfc4549 var [ row ] = yield pg.select('*').from('folders') .where({ account_id: accountId, name: box.name }).rows(gen.ef()); - self.versionTag = 0; if (row.length) { row = row[0]; @@ -287,12 +286,6 @@ Syncer.sync = function*(account) yield pg.delete('messages').where({ folder_id: row.id }) .where(pg.sql('uid is not null')).run(gen.ef()); } - else - { - [ self.versionTag ] = yield pg.select('MAX(vertag)').from('messages') - .where({ folder_id: row.id }).val(gen.ef()); - self.versionTag = self.versionTag || 0; - } } else { @@ -311,42 +304,12 @@ Syncer.sync = function*(account) var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef()); if (changedSince) { - process.stderr.write(account.email+'/'+box.name+': quick resync\n'); - self.vanished = []; - yield* self.runFetch(srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags'); - if (self.vanished.length) - { - let lst = []; - let dia = []; - for (let i = 0; i < self.vanished.length; i++) - { - if (self.vanished[i][1]) - dia.push('uid >= '+self.vanished[i][0]+' AND uid <= '+self.vanished[i][1]); - else - lst.push(self.vanished[i][0]); - } - if (lst.length) - dia.push('uid IN ('+lst.join(',')+')'); - yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')'))); - } + yield* self.quickResync(srv, account, box, boxId, maxUid, changedSince); } else if (maxUid) { // list messages, update flags and version tag - self.versionTag++; - if (self.versionTag >= 0x7fffffff) - { - yield pg.update('messages', { vertag: 0 }).where({ folder_id: boxId }).run(gen.ef()); - self.versionTag = 1; - } - - process.stderr.write(account.email+'/'+box.name+': full resync\n'); - process.stderr.write('\rsynchronizing 0'); - yield* self.runFetch(srv, '1:'+maxUid, {}, boxId, 'updateFlags'); - process.stderr.write('\n'); - - // delete messages removed from IMAP server - yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('uid is not null'), pg.sql.lt('vertag', self.versionTag))); + yield* self.fullResync(srv, account, box, boxId, maxUid); } // fetch new messages @@ -365,6 +328,92 @@ Syncer.sync = function*(account) self.releaseConnection(accountId, 'S'); } +Syncer.fullResync = function*(srv, account, box, boxId, maxUid) +{ + var [ flags ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef()); + flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); + + var updateFlags = []; + + process.stderr.write(account.email+'/'+box.name+': full resync\n'); + process.stderr.write('\rsynchronizing 0'); + yield* this.runFetch(srv, '1:'+maxUid, {}, boxId, 'queueFlags', { flags: flags, updateFlags: updateFlags }); + process.stderr.write('\n'); + + this.updateFlags(boxId, updateFlags); + + // delete messages removed from IMAP server + flags = Object.keys(flags); + if (flags.length) + yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql.in('uid', flags))); +} + +Syncer.queueFlags = function*(messages, boxId, fetchState) +{ + for (var i = 0; i < messages.length; i++) + { + var m = messages[i][0]; + if (!fetchState.flags[m.uid]) + this.missing.push(m.uid); + else + { + if (fetchState.flags[m.uid].join(',') != m.flags.join(',')) + fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); + delete fetchState.flags[m.uid]; + } + } + fetchState.synced += messages.length; + process.stderr.write('\rsynchronizing '+fetchState.synced); +} + +Syncer.updateFlags = function*(boxId, updateFlags) +{ + if (updateFlags.length) + { + yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') }) + .from('('+pg.sql.values(updateFlags)+') AS t (uid, flags)') + .where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef()); + } +} + +Syncer.quickResync = function*(srv, account, box, boxId, maxUid, changedSince) +{ + var updateFlags = []; + this.vanished = []; + + process.stderr.write(account.email+'/'+box.name+': quick resync\n'); + yield* this.runFetch( + srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, + boxId, 'queueQuickFlags', { updateFlags: updateFlags } + ); + this.updateFlags(boxId, updateFlags); + + if (this.vanished.length) + { + let lst = [], dia = []; + for (let i = 0; i < this.vanished.length; i++) + { + if (this.vanished[i][1]) + dia.push('uid >= '+this.vanished[i][0]+' AND uid <= '+this.vanished[i][1]); + else + lst.push(this.vanished[i][0]); + } + if (lst.length) + dia.push('uid IN ('+lst.join(',')+')'); + yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')'))); + } + delete this.vanished; +} + +Syncer.queueQuickFlags = function*(messages, boxId, fetchState) +{ + for (var i = 0; i < messages.length; i++) + { + var m = messages[i][0]; + fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); + } +} + Syncer.deleteMessages = function*(where) { yield pg.update('threads', { first_msg: null }) @@ -378,12 +427,13 @@ Syncer.deleteMessages = function*(where) yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); } -Syncer.runFetch = function*(srv, what, params, boxId, processor) +Syncer.runFetch = function*(srv, what, params, boxId, processor, args) { var self = this; var f = srv.fetch(what, params); var fetchState = { + ...(args||{}), parsed: 0, paused: false, synced: 0, @@ -512,31 +562,11 @@ Syncer.parseMessage = function*(msg, seqnum, boxId) msgrow.folder_id = boxId; msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); var nf = msgrow.flags.filter(f => f != 'seen'); - msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; + nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; + msgrow.flags = nf.sort(); return [ msgrow, attrs ]; } -Syncer.updateFlags = function*(messages, boxId, fetchState) -{ - yield gen.throttle(3); - var self = this; - var rows = messages.map(m => ({ - uid: m[0].uid, - flags: toPgArray(m[0].flags) - })); - var [ updated ] = yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]'), vertag: self.versionTag }) - .from('('+pg.sql.values(rows)+') AS t (uid, flags)') - .where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).returning('m.uid').rows(gen.ef()); - var uh = {}, i; - for (i = 0; i < updated.length; i++) - uh[updated[i].uid] = true; - for (i = 0; i < messages.length; i++) - if (!uh[messages[i][0].uid]) - self.missing.push(messages[i][0].uid); - fetchState.synced += messages.length; - process.stderr.write('\rsynchronizing '+fetchState.synced); -} - Syncer.saveMessages = function*(messages, boxId) { var self = this; @@ -602,7 +632,6 @@ Syncer.addMessage = function*(msgrow, attrs) msgrow.time = header.date; msgrow.flags = toPgArray(msgrow.flags); msgrow.refs = toPgArray(header.references); - msgrow.vertag = self.versionTag; var thisIsFirst = false; if (header.references.length)