Support quick resync

master
Vitaliy Filippov 2016-08-01 14:05:08 +03:00
parent f4a4fcc1b2
commit 3c491d0ba8
2 changed files with 45 additions and 7 deletions

1
db.sql
View File

@ -19,6 +19,7 @@ create table folders (
account_id int not null,
name varchar(255) not null,
unread_count int not null,
highestmodseq int not null default 0,
foreign key (account_id) references accounts (id) on delete cascade on update cascade
);
create unique index folders_name on folders (account_id, name);

View File

@ -55,13 +55,26 @@ Syncer.sync = function*(account)
}
var srv = new Imap(account.imap);
self.srv = srv;
srv.once('ready', gen.cb());
yield srv.connect();
yield srv._enqueue('ENABLE QRESYNC', gen.cb());
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
self.vanished.push(m[2].split(/,/).map(s => s.split(':')));
oldUT.apply(this);
};
var [ boxes ] = yield srv.getBoxes(gen.ef());
for (var k in boxes)
{
var [ box ] = yield srv.openBox(k, true, gen.ef());
var boxId;
var boxId, changedSince = 0;
// IMAP sync: http://tools.ietf.org/html/rfc4549
var [ row ] = yield pg.select('*').from('folders')
.where({ account_id: accountId, name: box.name }).rows(gen.ef());
@ -69,6 +82,7 @@ Syncer.sync = function*(account)
if (row.length)
{
row = row[0];
changedSince = row.highestmodseq;
boxId = row.id;
if (row.uidvalidity != box.uidvalidity)
{
@ -81,7 +95,7 @@ Syncer.sync = function*(account)
.where({ folder_id: row.id }).val(gen.ef());
self.versionTag = self.versionTag || 0;
}
yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new, highestmodseq: box.highestmodseq||0 })
.where({ id: row.id }).run(gen.ef());
}
else
@ -91,14 +105,36 @@ Syncer.sync = function*(account)
uidvalidity: box.uidvalidity,
account_id: accountId,
unread_count: box.messages.new,
// total_count: box.messages.count
highestmodseq: box.highestmodseq||0,
//total_count: box.messages.count
}).returning('id').row(gen.ef());
boxId = row.id;
}
self.missing = [];
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
if (maxUid)
if (changedSince)
{
process.stderr.write(account.email+'/'+box.name+': quick resync\n');
self.vanished = [];
yield* self.runFetch('1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags');
if (self.vanished.length)
{
let lst = [];
let dia = [];
for (let i = 0; i < self.vanished.length; i++)
{
if (self.vanished[i][1])
dia.push('uid >= '+self.vanished[i][0]+' AND uid <= '+self.vanished[i][1]);
else
lst.push(self.vanished[i][0]);
}
if (lst.length)
dia.push('uid IN ('+lst.join(',')+')');
yield pg.delete('messages').where({ folder_id: boxId }).where(pg.sql('('+dia.join(' OR ')+')')).run(gen.ef());
}
}
else if (maxUid)
{
// list messages, update flags and version tag
self.versionTag++;
@ -108,6 +144,7 @@ Syncer.sync = function*(account)
self.versionTag = 1;
}
process.stderr.write(account.email+'/'+box.name+': full resync\n');
process.stderr.write('\rsynchronizing 0');
yield* self.runFetch('1:'+maxUid, {}, boxId, 'updateFlags');
process.stderr.write('\n');
@ -191,14 +228,14 @@ Syncer.onMessage = function*(msg, seqnum, boxId, processor)
self.pending.push([ msgrow, attrs ]);
self.parsed++;
if (!self.paused && self.parsed >= 20)
if (!self.paused && self.parsed >= 100)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
self.srv._parser._ignoreReadable = true;
self.paused = true;
}
if (self.pending.length >= 20)
if (self.pending.length >= 100)
{
var m = self.pending;
self.pending = [];
@ -212,7 +249,7 @@ Syncer.onMessage = function*(msg, seqnum, boxId, processor)
err = e;
}
self.parsed -= m.length;
if (self.paused && self.parsed < 20)
if (self.paused && self.parsed < 100)
{
self.paused = false;
self.srv._parser._ignoreReadable = false;