Faster deleted sync with version tags
parent
18a086793f
commit
fda15026d6
2
db.sql
2
db.sql
|
@ -43,12 +43,14 @@ create table messages (
|
|||
body text not null,
|
||||
time timestamptz not null,
|
||||
flags varchar(255)[] not null,
|
||||
vertag int not null default 0,
|
||||
foreign key (folder_id) references folders (id) on delete cascade on update cascade
|
||||
);
|
||||
create unique index messages_unique on messages (folder_id, uid);
|
||||
create index messages_flags on messages using gin (folder_id, flags);
|
||||
create index messages_messageid on messages (messageid);
|
||||
create index messages_refs on messages using gin (refs);
|
||||
create index messages_vertag on messages (folder_id, vertag);
|
||||
|
||||
create table threads (
|
||||
id serial not null primary key,
|
||||
|
|
64
operetta.js
64
operetta.js
|
@ -58,7 +58,9 @@ Syncer.sync = function*(account)
|
|||
var [ box ] = yield srv.openBox(k, true, gen.ef());
|
||||
var boxId;
|
||||
// IMAP sync: http://tools.ietf.org/html/rfc4549
|
||||
var [ row ] = yield pg.select('*').from('folders').where({ account_id: accountId, name: box.name }).row(gen.ef());
|
||||
var [ row ] = yield pg.select('*').from('folders')
|
||||
.where({ account_id: accountId, name: box.name }).row(gen.ef());
|
||||
self.versionTag = 0;
|
||||
if (row)
|
||||
{
|
||||
boxId = row.id;
|
||||
|
@ -69,8 +71,8 @@ Syncer.sync = function*(account)
|
|||
}
|
||||
else
|
||||
{
|
||||
yield pg.update('messages', { flags: pg.sql('(array_remove(flags, \'deleted\') || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id })
|
||||
.where(pg.sql('uid is not null')).run(gen.ef());
|
||||
[ self.versionTag ] = yield pg.select('MAX(vertag)').from('messages')
|
||||
.where({ folder_id: row.id }).val(gen.ef());
|
||||
}
|
||||
yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
|
||||
.where({ id: row.id }).run(gen.ef());
|
||||
|
@ -87,29 +89,42 @@ Syncer.sync = function*(account)
|
|||
boxId = row.id;
|
||||
}
|
||||
|
||||
// list messages, update flags
|
||||
process.stderr.write('\rsynchronizing 0');
|
||||
yield* self.runFetch('1:*', {}, boxId, 'updateFlags');
|
||||
process.stderr.write('\n');
|
||||
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
|
||||
if (maxUid)
|
||||
{
|
||||
// list messages, update flags and version tag
|
||||
self.versionTag++;
|
||||
if (self.versionTag >= 0x7fffffff)
|
||||
{
|
||||
yield pg.update('messages', { vertag: 0 }).where({ folder_id: boxId }).run(gen.ef());
|
||||
self.versionTag = 1;
|
||||
}
|
||||
|
||||
// delete messages removed from IMAP server
|
||||
yield pg.update('threads', { first_msg: null })
|
||||
.where(pg.sql('first_msg IN ('+
|
||||
pg.select('id').from('messages').where({ folder_id: boxId })
|
||||
.where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])'))
|
||||
+')')).run(gen.ef());
|
||||
yield pg.delete('messages').where({ folder_id: boxId })
|
||||
.where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])')).run(gen.ef());
|
||||
yield pg.update('threads',
|
||||
{ first_msg: pg.sql('('+
|
||||
pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')'
|
||||
) }).where(pg.sql('first_msg IS NULL')).run(gen.ef());
|
||||
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
|
||||
process.stderr.write('\rsynchronizing 0');
|
||||
yield* self.runFetch('1:'+maxUid, {}, boxId, 'updateFlags');
|
||||
process.stderr.write('\n');
|
||||
|
||||
// delete messages removed from IMAP server
|
||||
yield pg.update('threads', { first_msg: null })
|
||||
.where(pg.sql('first_msg IN ('+
|
||||
pg.select('id').from('messages')
|
||||
.where({ folder_id: boxId })
|
||||
.where(pg.sql('uid is not null'))
|
||||
.where(pg.sql.lt('vertag', self.versionTag))
|
||||
+')')).run(gen.ef());
|
||||
yield pg.delete('messages')
|
||||
.where({ folder_id: boxId })
|
||||
.where(pg.sql('uid is not null'))
|
||||
.where(pg.sql.lt('vertag', self.versionTag)).run(gen.ef());
|
||||
yield pg.update('threads',
|
||||
{ first_msg: pg.sql('('+
|
||||
pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')'
|
||||
) }).where(pg.sql('first_msg IS NULL')).run(gen.ef());
|
||||
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
|
||||
}
|
||||
|
||||
// fetch new messages
|
||||
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
|
||||
maxUid = maxUid ? maxUid+1 : 1;
|
||||
yield* self.runFetch(maxUid+':*', {
|
||||
yield* self.runFetch((maxUid ? maxUid+1 : 1)+':*', {
|
||||
size: true,
|
||||
bodies: 'HEADER'
|
||||
}, boxId, 'saveMessages');
|
||||
|
@ -241,7 +256,7 @@ Syncer.updateFlags = function*(messages, boxId)
|
|||
flags: toPgArray(m[0].flags)
|
||||
}));
|
||||
// TODO check if something is missing
|
||||
yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') })
|
||||
yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]'), vertag: self.versionTag })
|
||||
.from('('+pg.sql.values(rows)+') AS t (uid, flags)')
|
||||
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef());
|
||||
self.synced += messages.length;
|
||||
|
@ -310,6 +325,7 @@ Syncer.addMessage = function*(msgrow, attrs)
|
|||
msgrow.time = header.date;
|
||||
msgrow.flags = toPgArray(msgrow.flags);
|
||||
msgrow.refs = toPgArray(header.references);
|
||||
msgrow.vertag = self.versionTag;
|
||||
if (header.references.length)
|
||||
{
|
||||
var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
|
||||
|
|
Loading…
Reference in New Issue