Use in-memory sync for flags
parent
a4102ad3f0
commit
b662c71319
2
db.sql
2
db.sql
|
@ -48,14 +48,12 @@ create table messages (
|
|||
time timestamptz not null,
|
||||
size unsigned not null,
|
||||
flags varchar(255)[] not null,
|
||||
vertag int not null default 0,
|
||||
foreign key (folder_id) references folders (id) on delete cascade on update cascade
|
||||
);
|
||||
create unique index messages_unique on messages (folder_id, uid);
|
||||
create index messages_flags on messages using gin (folder_id, flags);
|
||||
create index messages_messageid on messages (messageid);
|
||||
create index messages_refs on messages using gin (refs);
|
||||
create index messages_vertag on messages (folder_id, vertag);
|
||||
create index messages_time on messages (folder_id, time);
|
||||
create index messages_text on messages using gin (text_index);
|
||||
create or replace function fn_messages_text_index() returns trigger
|
||||
|
|
155
operetta.js
155
operetta.js
|
@ -276,7 +276,6 @@ Syncer.sync = function*(account)
|
|||
// IMAP sync: http://tools.ietf.org/html/rfc4549
|
||||
var [ row ] = yield pg.select('*').from('folders')
|
||||
.where({ account_id: accountId, name: box.name }).rows(gen.ef());
|
||||
self.versionTag = 0;
|
||||
if (row.length)
|
||||
{
|
||||
row = row[0];
|
||||
|
@ -287,12 +286,6 @@ Syncer.sync = function*(account)
|
|||
yield pg.delete('messages').where({ folder_id: row.id })
|
||||
.where(pg.sql('uid is not null')).run(gen.ef());
|
||||
}
|
||||
else
|
||||
{
|
||||
[ self.versionTag ] = yield pg.select('MAX(vertag)').from('messages')
|
||||
.where({ folder_id: row.id }).val(gen.ef());
|
||||
self.versionTag = self.versionTag || 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -311,42 +304,12 @@ Syncer.sync = function*(account)
|
|||
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
|
||||
if (changedSince)
|
||||
{
|
||||
process.stderr.write(account.email+'/'+box.name+': quick resync\n');
|
||||
self.vanished = [];
|
||||
yield* self.runFetch(srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags');
|
||||
if (self.vanished.length)
|
||||
{
|
||||
let lst = [];
|
||||
let dia = [];
|
||||
for (let i = 0; i < self.vanished.length; i++)
|
||||
{
|
||||
if (self.vanished[i][1])
|
||||
dia.push('uid >= '+self.vanished[i][0]+' AND uid <= '+self.vanished[i][1]);
|
||||
else
|
||||
lst.push(self.vanished[i][0]);
|
||||
}
|
||||
if (lst.length)
|
||||
dia.push('uid IN ('+lst.join(',')+')');
|
||||
yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')')));
|
||||
}
|
||||
yield* self.quickResync(srv, account, box, boxId, maxUid, changedSince);
|
||||
}
|
||||
else if (maxUid)
|
||||
{
|
||||
// list messages, update flags and version tag
|
||||
self.versionTag++;
|
||||
if (self.versionTag >= 0x7fffffff)
|
||||
{
|
||||
yield pg.update('messages', { vertag: 0 }).where({ folder_id: boxId }).run(gen.ef());
|
||||
self.versionTag = 1;
|
||||
}
|
||||
|
||||
process.stderr.write(account.email+'/'+box.name+': full resync\n');
|
||||
process.stderr.write('\rsynchronizing 0');
|
||||
yield* self.runFetch(srv, '1:'+maxUid, {}, boxId, 'updateFlags');
|
||||
process.stderr.write('\n');
|
||||
|
||||
// delete messages removed from IMAP server
|
||||
yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('uid is not null'), pg.sql.lt('vertag', self.versionTag)));
|
||||
yield* self.fullResync(srv, account, box, boxId, maxUid);
|
||||
}
|
||||
|
||||
// fetch new messages
|
||||
|
@ -365,6 +328,92 @@ Syncer.sync = function*(account)
|
|||
self.releaseConnection(accountId, 'S');
|
||||
}
|
||||
|
||||
Syncer.fullResync = function*(srv, account, box, boxId, maxUid)
|
||||
{
|
||||
var [ flags ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef());
|
||||
flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {});
|
||||
|
||||
var updateFlags = [];
|
||||
|
||||
process.stderr.write(account.email+'/'+box.name+': full resync\n');
|
||||
process.stderr.write('\rsynchronizing 0');
|
||||
yield* this.runFetch(srv, '1:'+maxUid, {}, boxId, 'queueFlags', { flags: flags, updateFlags: updateFlags });
|
||||
process.stderr.write('\n');
|
||||
|
||||
this.updateFlags(boxId, updateFlags);
|
||||
|
||||
// delete messages removed from IMAP server
|
||||
flags = Object.keys(flags);
|
||||
if (flags.length)
|
||||
yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql.in('uid', flags)));
|
||||
}
|
||||
|
||||
Syncer.queueFlags = function*(messages, boxId, fetchState)
|
||||
{
|
||||
for (var i = 0; i < messages.length; i++)
|
||||
{
|
||||
var m = messages[i][0];
|
||||
if (!fetchState.flags[m.uid])
|
||||
this.missing.push(m.uid);
|
||||
else
|
||||
{
|
||||
if (fetchState.flags[m.uid].join(',') != m.flags.join(','))
|
||||
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
|
||||
delete fetchState.flags[m.uid];
|
||||
}
|
||||
}
|
||||
fetchState.synced += messages.length;
|
||||
process.stderr.write('\rsynchronizing '+fetchState.synced);
|
||||
}
|
||||
|
||||
Syncer.updateFlags = function*(boxId, updateFlags)
|
||||
{
|
||||
if (updateFlags.length)
|
||||
{
|
||||
yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') })
|
||||
.from('('+pg.sql.values(updateFlags)+') AS t (uid, flags)')
|
||||
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef());
|
||||
}
|
||||
}
|
||||
|
||||
Syncer.quickResync = function*(srv, account, box, boxId, maxUid, changedSince)
|
||||
{
|
||||
var updateFlags = [];
|
||||
this.vanished = [];
|
||||
|
||||
process.stderr.write(account.email+'/'+box.name+': quick resync\n');
|
||||
yield* this.runFetch(
|
||||
srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } },
|
||||
boxId, 'queueQuickFlags', { updateFlags: updateFlags }
|
||||
);
|
||||
this.updateFlags(boxId, updateFlags);
|
||||
|
||||
if (this.vanished.length)
|
||||
{
|
||||
let lst = [], dia = [];
|
||||
for (let i = 0; i < this.vanished.length; i++)
|
||||
{
|
||||
if (this.vanished[i][1])
|
||||
dia.push('uid >= '+this.vanished[i][0]+' AND uid <= '+this.vanished[i][1]);
|
||||
else
|
||||
lst.push(this.vanished[i][0]);
|
||||
}
|
||||
if (lst.length)
|
||||
dia.push('uid IN ('+lst.join(',')+')');
|
||||
yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')')));
|
||||
}
|
||||
delete this.vanished;
|
||||
}
|
||||
|
||||
Syncer.queueQuickFlags = function*(messages, boxId, fetchState)
|
||||
{
|
||||
for (var i = 0; i < messages.length; i++)
|
||||
{
|
||||
var m = messages[i][0];
|
||||
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
|
||||
}
|
||||
}
|
||||
|
||||
Syncer.deleteMessages = function*(where)
|
||||
{
|
||||
yield pg.update('threads', { first_msg: null })
|
||||
|
@ -378,12 +427,13 @@ Syncer.deleteMessages = function*(where)
|
|||
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
|
||||
}
|
||||
|
||||
Syncer.runFetch = function*(srv, what, params, boxId, processor)
|
||||
Syncer.runFetch = function*(srv, what, params, boxId, processor, args)
|
||||
{
|
||||
var self = this;
|
||||
var f = srv.fetch(what, params);
|
||||
|
||||
var fetchState = {
|
||||
...(args||{}),
|
||||
parsed: 0,
|
||||
paused: false,
|
||||
synced: 0,
|
||||
|
@ -512,31 +562,11 @@ Syncer.parseMessage = function*(msg, seqnum, boxId)
|
|||
msgrow.folder_id = boxId;
|
||||
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
|
||||
var nf = msgrow.flags.filter(f => f != 'seen');
|
||||
msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
|
||||
nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
|
||||
msgrow.flags = nf.sort();
|
||||
return [ msgrow, attrs ];
|
||||
}
|
||||
|
||||
Syncer.updateFlags = function*(messages, boxId, fetchState)
|
||||
{
|
||||
yield gen.throttle(3);
|
||||
var self = this;
|
||||
var rows = messages.map(m => ({
|
||||
uid: m[0].uid,
|
||||
flags: toPgArray(m[0].flags)
|
||||
}));
|
||||
var [ updated ] = yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]'), vertag: self.versionTag })
|
||||
.from('('+pg.sql.values(rows)+') AS t (uid, flags)')
|
||||
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).returning('m.uid').rows(gen.ef());
|
||||
var uh = {}, i;
|
||||
for (i = 0; i < updated.length; i++)
|
||||
uh[updated[i].uid] = true;
|
||||
for (i = 0; i < messages.length; i++)
|
||||
if (!uh[messages[i][0].uid])
|
||||
self.missing.push(messages[i][0].uid);
|
||||
fetchState.synced += messages.length;
|
||||
process.stderr.write('\rsynchronizing '+fetchState.synced);
|
||||
}
|
||||
|
||||
Syncer.saveMessages = function*(messages, boxId)
|
||||
{
|
||||
var self = this;
|
||||
|
@ -602,7 +632,6 @@ Syncer.addMessage = function*(msgrow, attrs)
|
|||
msgrow.time = header.date;
|
||||
msgrow.flags = toPgArray(msgrow.flags);
|
||||
msgrow.refs = toPgArray(header.references);
|
||||
msgrow.vertag = self.versionTag;
|
||||
|
||||
var thisIsFirst = false;
|
||||
if (header.references.length)
|
||||
|
|
Loading…
Reference in New Issue