Much faster resync with uidlist

master
Vitaliy Filippov 2016-07-25 02:05:23 +03:00
parent 40fd67114a
commit a0bb2f2fae
1 changed files with 126 additions and 96 deletions

View File

@ -5,10 +5,6 @@ require('heapdump');
var gen = require('gen-thread');
var Imap = require('imap');
var inspect = require('util').inspect;
//var pg;
//try { require('pg-native'); pg = require('pg').native; }
//catch(e) { pg = require('pg'); }
//var pg_pool = new pg.Pool(cfg.pg);
var bricks = require('pg-bricks');
var pg = bricks.configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database);
@ -73,7 +69,7 @@ Syncer.sync = function*(account)
}
else
{
yield pg.update('messages', { flags: pg.sql('(flags || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id })
yield pg.update('messages', { flags: pg.sql('(array_remove(flags, \'deleted\') || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id })
.where(pg.sql('uid is not null')).run(gen.ef());
}
yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
@ -90,54 +86,13 @@ Syncer.sync = function*(account)
}).returning('id').row(gen.ef());
boxId = row.id;
}
var f = srv.fetch('1:*', {
size: true,
bodies: 'HEADER'
});
self.parsed = 0;
self.paused = false;
self.synced = 0;
self.pending = [];
// list messages, update flags
process.stderr.write('\rsynchronizing 0');
yield* self.runFetch('1:*', {}, boxId, 'updateFlags');
process.stderr.write('\n');
f.on('message', function(msg, seqnum)
{
gen.run(function*()
{
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId);
// Workaround memory leak in node-imap
// TODO: send pull request
if (srv._curReq.fetchCache)
delete srv._curReq.fetchCache[seqnum];
self.pending.push([ msgrow, attrs ]);
self.parsed++;
if (!self.paused && self.parsed >= 20)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
srv._parser._ignoreReadable = true;
self.paused = true;
}
if (self.pending.length >= 20)
{
var m = self.pending;
self.pending = [];
yield gen.run(self.saveMessages(m, boxId), gen.cb());
}
});
});
var cb = gen.cb();
yield f.once('end', function()
{
process.stderr.write('\n');
if (self.pending.length > 0)
gen.run(self.saveMessages(self.pending, boxId));
self.pending = [];
cb();
});
console.log(boxId);
// delete messages removed from IMAP server
yield pg.update('threads', { first_msg: null })
.where(pg.sql('first_msg IN ('+
pg.select('id').from('messages').where({ folder_id: boxId })
@ -150,12 +105,103 @@ Syncer.sync = function*(account)
pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')'
) }).where(pg.sql('first_msg IS NULL')).run(gen.ef());
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
// fetch new messages
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
maxUid = maxUid ? maxUid+1 : 1;
yield* self.runFetch(maxUid+':*', {
size: true,
bodies: 'HEADER'
}, boxId, 'saveMessages');
yield srv.closeBox(gen.cb());
}
srv.end();
self.srv = null;
}
Syncer.runFetch = function*(what, params, boxId, processor)
{
var self = this;
var f = self.srv.fetch(what, params);
self.parsed = 0;
self.paused = false;
self.synced = 0;
self.pending = [];
var cb, wait;
f.on('message', function(msg, seqnum)
{
gen.run(self.onMessage(msg, seqnum, boxId, processor), checkFinish, checkFinish);
});
cb = gen.cb();
yield f.once('end', function()
{
wait = true;
if (self.parsed <= 0)
cb();
else if (self.pending.length > 0)
gen.run(self[processor](self.pending, boxId), saveLast, saveLast);
});
function saveLast()
{
self.parsed -= self.pending.length;
self.pending = [];
checkFinish();
}
function checkFinish()
{
if (self.parsed <= 0 && wait)
{
wait = false;
cb();
}
}
};
Syncer.onMessage = function*(msg, seqnum, boxId, processor)
{
var self = this;
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId);
self.pending.push([ msgrow, attrs ]);
self.parsed++;
if (!self.paused && self.parsed >= 20)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
self.srv._parser._ignoreReadable = true;
self.paused = true;
}
if (self.pending.length >= 20)
{
var m = self.pending;
self.pending = [];
var err;
try
{
yield gen.run(self[processor](m, boxId), gen.cb());
}
catch (e)
{
err = e;
}
self.parsed -= m.length;
if (self.paused && self.parsed < 20)
{
self.paused = false;
self.srv._parser._ignoreReadable = false;
process.nextTick(self.srv._parser._cbReadable);
}
if (err)
throw err;
}
}
Syncer.parseMessage = function*(msg, seqnum, boxId)
{
var msgrow = {};
@ -182,58 +228,42 @@ Syncer.parseMessage = function*(msg, seqnum, boxId)
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen');
msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
// Workaround memory leak in node-imap
// TODO: send pull request
if (this.srv._curReq.fetchCache)
delete this.srv._curReq.fetchCache[seqnum];
return [ msgrow, attrs ];
}
Syncer.updateFlags = function*(messages, boxId)
{
yield gen.throttle(3);
var self = this;
var rows = messages.map(m => ({
uid: m[0].uid,
flags: toPgArray(m[0].flags)
}));
// TODO check if something is missing
pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') })
.from('('+pg.sql.values(rows)+') AS t (uid, flags)')
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef());
self.synced += messages.length;
process.stderr.write('\rsynchronizing '+self.synced);
}
Syncer.saveMessages = function*(messages, boxId)
{
var self = this;
var err;
yield gen.throttle(2);
try
{
var uids = [];
var uk = {};
for (var i = 0; i < messages.length; i++)
{
uids.push(messages[i][1].uid);
uk[messages[i][1].uid] = messages[i];
}
yield pg.update('messages', { flags: pg.sql('array_remove(flags, \'deleted\')') })
.where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).run(gen.ef());
var [ exist ] = yield pg.select('uid, flags').from('messages')
.where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef());
for (var i = 0; i < exist.length; i++)
{
var fl = uk[exist[i].uid][0].flags;
if (fl.sort().join('\\') != exist[i].flags.sort().join('\\'))
yield pg.update('messages', { flags: toPgArray(fl) }).where({ folder_id: boxId, uid: exist[i].uid }).run(gen.ef());
delete uk[exist[i].uid];
}
self.synced += exist.length;
process.stderr.write('\rsynchronizing '+self.synced);
var run = [];
for (var i in uk)
run.push(this.addMessage(uk[i][0], uk[i][1]));
if (run.length)
{
process.stderr.write('\n');
yield gen.runParallel(run, gen.cb());
}
}
catch (e)
{
err = e;
}
self.parsed -= messages.length;
if (self.paused && self.parsed < 20)
{
self.paused = false;
self.srv._parser._ignoreReadable = false;
process.nextTick(self.srv._parser._cbReadable);
}
if (err)
throw err;
var uids = messages.map(m => m[1].uid);
var [ exist ] = yield pg.select('uid, flags').from('messages')
.where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef());
uids = {};
for (var i = 0; i < exist.length; i++)
uids[exist[i].uid] = true;
for (var i = 0; i < messages.length; i++)
if (!uids[messages[i][1].uid])
yield* this.addMessage(messages[i][0], messages[i][1]);
}
Syncer.addMessage = function*(msgrow, attrs)