diff --git a/operetta.js b/operetta.js index 64070b5..b4dc491 100644 --- a/operetta.js +++ b/operetta.js @@ -5,10 +5,6 @@ require('heapdump'); var gen = require('gen-thread'); var Imap = require('imap'); var inspect = require('util').inspect; -//var pg; -//try { require('pg-native'); pg = require('pg').native; } -//catch(e) { pg = require('pg'); } -//var pg_pool = new pg.Pool(cfg.pg); var bricks = require('pg-bricks'); var pg = bricks.configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database); @@ -73,7 +69,7 @@ Syncer.sync = function*(account) } else { - yield pg.update('messages', { flags: pg.sql('(flags || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id }) + yield pg.update('messages', { flags: pg.sql('(array_remove(flags, \'deleted\') || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id }) .where(pg.sql('uid is not null')).run(gen.ef()); } yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) @@ -90,54 +86,13 @@ Syncer.sync = function*(account) }).returning('id').row(gen.ef()); boxId = row.id; } - var f = srv.fetch('1:*', { - size: true, - bodies: 'HEADER' - }); - self.parsed = 0; - self.paused = false; - self.synced = 0; - self.pending = []; + // list messages, update flags + process.stderr.write('\rsynchronizing 0'); + yield* self.runFetch('1:*', {}, boxId, 'updateFlags'); + process.stderr.write('\n'); - f.on('message', function(msg, seqnum) - { - gen.run(function*() - { - var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId); - - // Workaround memory leak in node-imap - // TODO: send pull request - if (srv._curReq.fetchCache) - delete srv._curReq.fetchCache[seqnum]; - - self.pending.push([ msgrow, attrs ]); - self.parsed++; - if (!self.paused && self.parsed >= 20) - { - // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! - srv._parser._ignoreReadable = true; - self.paused = true; - } - - if (self.pending.length >= 20) - { - var m = self.pending; - self.pending = []; - yield gen.run(self.saveMessages(m, boxId), gen.cb()); - } - }); - }); - var cb = gen.cb(); - yield f.once('end', function() - { - process.stderr.write('\n'); - if (self.pending.length > 0) - gen.run(self.saveMessages(self.pending, boxId)); - self.pending = []; - cb(); - }); - console.log(boxId); + // delete messages removed from IMAP server yield pg.update('threads', { first_msg: null }) .where(pg.sql('first_msg IN ('+ pg.select('id').from('messages').where({ folder_id: boxId }) @@ -150,12 +105,103 @@ Syncer.sync = function*(account) pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')' ) }).where(pg.sql('first_msg IS NULL')).run(gen.ef()); yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); + + // fetch new messages + var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef()); + maxUid = maxUid ? maxUid+1 : 1; + yield* self.runFetch(maxUid+':*', { + size: true, + bodies: 'HEADER' + }, boxId, 'saveMessages'); + yield srv.closeBox(gen.cb()); } srv.end(); self.srv = null; } +Syncer.runFetch = function*(what, params, boxId, processor) +{ + var self = this; + var f = self.srv.fetch(what, params); + + self.parsed = 0; + self.paused = false; + self.synced = 0; + self.pending = []; + + var cb, wait; + f.on('message', function(msg, seqnum) + { + gen.run(self.onMessage(msg, seqnum, boxId, processor), checkFinish, checkFinish); + }); + + cb = gen.cb(); + yield f.once('end', function() + { + wait = true; + if (self.parsed <= 0) + cb(); + else if (self.pending.length > 0) + gen.run(self[processor](self.pending, boxId), saveLast, saveLast); + }); + + function saveLast() + { + self.parsed -= self.pending.length; + self.pending = []; + checkFinish(); + } + + function checkFinish() + { + if (self.parsed <= 0 && wait) + { + wait = false; + cb(); + } + } +}; + +Syncer.onMessage = function*(msg, seqnum, boxId, processor) +{ + var self = this; + var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId); + + self.pending.push([ msgrow, attrs ]); + self.parsed++; + if (!self.paused && self.parsed >= 20) + { + // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! + self.srv._parser._ignoreReadable = true; + self.paused = true; + } + + if (self.pending.length >= 20) + { + var m = self.pending; + self.pending = []; + var err; + try + { + yield gen.run(self[processor](m, boxId), gen.cb()); + } + catch (e) + { + err = e; + } + self.parsed -= m.length; + if (self.paused && self.parsed < 20) + { + self.paused = false; + self.srv._parser._ignoreReadable = false; + process.nextTick(self.srv._parser._cbReadable); + } + if (err) + throw err; + } +} + Syncer.parseMessage = function*(msg, seqnum, boxId) { var msgrow = {}; @@ -182,58 +228,42 @@ Syncer.parseMessage = function*(msg, seqnum, boxId) msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); var nf = msgrow.flags.filter(f => f != 'seen'); msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; + // Workaround memory leak in node-imap + // TODO: send pull request + if (this.srv._curReq.fetchCache) + delete this.srv._curReq.fetchCache[seqnum]; return [ msgrow, attrs ]; } +Syncer.updateFlags = function*(messages, boxId) +{ + yield gen.throttle(3); + var self = this; + var rows = messages.map(m => ({ + uid: m[0].uid, + flags: toPgArray(m[0].flags) + })); + // TODO check if something is missing + pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') }) + .from('('+pg.sql.values(rows)+') AS t (uid, flags)') + .where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef()); + self.synced += messages.length; + process.stderr.write('\rsynchronizing '+self.synced); +} + Syncer.saveMessages = function*(messages, boxId) { var self = this; - var err; yield gen.throttle(2); - try - { - var uids = []; - var uk = {}; - for (var i = 0; i < messages.length; i++) - { - uids.push(messages[i][1].uid); - uk[messages[i][1].uid] = messages[i]; - } - yield pg.update('messages', { flags: pg.sql('array_remove(flags, \'deleted\')') }) - .where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).run(gen.ef()); - var [ exist ] = yield pg.select('uid, flags').from('messages') - .where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef()); - for (var i = 0; i < exist.length; i++) - { - var fl = uk[exist[i].uid][0].flags; - if (fl.sort().join('\\') != exist[i].flags.sort().join('\\')) - yield pg.update('messages', { flags: toPgArray(fl) }).where({ folder_id: boxId, uid: exist[i].uid }).run(gen.ef()); - delete uk[exist[i].uid]; - } - self.synced += exist.length; - process.stderr.write('\rsynchronizing '+self.synced); - var run = []; - for (var i in uk) - run.push(this.addMessage(uk[i][0], uk[i][1])); - if (run.length) - { - process.stderr.write('\n'); - yield gen.runParallel(run, gen.cb()); - } - } - catch (e) - { - err = e; - } - self.parsed -= messages.length; - if (self.paused && self.parsed < 20) - { - self.paused = false; - self.srv._parser._ignoreReadable = false; - process.nextTick(self.srv._parser._cbReadable); - } - if (err) - throw err; + var uids = messages.map(m => m[1].uid); + var [ exist ] = yield pg.select('uid, flags').from('messages') + .where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef()); + uids = {}; + for (var i = 0; i < exist.length; i++) + uids[exist[i].uid] = true; + for (var i = 0; i < messages.length; i++) + if (!uids[messages[i][1].uid]) + yield* this.addMessage(messages[i][0], messages[i][1]); } Syncer.addMessage = function*(msgrow, attrs)