From 40fd67114a968b824f1a3a515e1a833360b70b9e Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 25 Jul 2016 00:59:04 +0300 Subject: [PATCH] slightly structure code, save flags as text, remove disappearing messages --- db.sql | 11 +- operetta.js | 419 ++++++++++++++++++++++++++++------------------------ 2 files changed, 232 insertions(+), 198 deletions(-) diff --git a/db.sql b/db.sql index 1420ee3..80605c2 100644 --- a/db.sql +++ b/db.sql @@ -1,3 +1,5 @@ +create extension if not exists btree_gin; + create table accounts ( id serial not null primary key, name varchar(255) not null, @@ -25,7 +27,7 @@ create table messages ( id serial not null primary key, thread_id int, folder_id int not null, - uid int not null, + uid int, messageid varchar(1000) not null, inreplyto varchar(1000) not null, refs varchar(1000)[] not null, @@ -40,16 +42,17 @@ create table messages ( headers text not null, body text not null, time timestamptz not null, - flags smallint not null, + flags varchar(255)[] not null, foreign key (folder_id) references folders (id) on delete cascade on update cascade ); create unique index messages_unique on messages (folder_id, uid); +create index messages_flags on messages using gin (folder_id, flags); create index messages_messageid on messages (messageid); create index messages_refs on messages using gin (refs); create table threads ( id serial not null primary key, - first_msg int not null, + first_msg int, msg_count int not null default 1, foreign key (first_msg) references messages (id) on delete restrict on update cascade ); @@ -57,3 +60,5 @@ create table threads ( alter table messages add foreign key (thread_id) references threads (id) on delete restrict on update cascade; --create table tt as with recursive t (id, messageid, upperid, uppermsg) as (select (array_agg(m1.id))[0], m1.messageid, (array_agg(m1.id))[1], m1.messageid from messages m1 left join messages m2 on m1.messageid!='' and m1.inreplyto!='' and m2.messageid=m1.inreplyto where m2.id is null group by m1.messageid union select m1.id, m1.messageid, t.upperid, t.uppermsg from messages m1 inner join t on m1.inreplyto!='' and m1.inreplyto=t.messageid where m1.messageid!='') select * from t; + +--alter table messages alter flags type varchar(255)[] using (case when flags&1=1 then array['recent'] else array[]::varchar(255)[] end) || (case when flags&2=2 then array['flagged'] else array[]::varchar(255)[] end) || (case when flags&4=4 then array['answered'] else array[]::varchar(255)[] end) || (case when flags&8=8 then array['unread'] else array[]::varchar(255)[] end); diff --git a/operetta.js b/operetta.js index 7a15881..64070b5 100644 --- a/operetta.js +++ b/operetta.js @@ -13,13 +13,6 @@ var inspect = require('util').inspect; var bricks = require('pg-bricks'); var pg = bricks.configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database); -var flagNum = { - '\\recent': 1, - '\\flagged': 2, - '\\answered': 4, - '\\seen': 8, -}; - function splitEmails(s) { var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|]+)>?)/; // ' @@ -38,8 +31,12 @@ function toPgArray(a) return '{'+a.substring(1, a.length-1)+'}'; } -function* main(account) +var Syncer = { +}; + +Syncer.sync = function*(account) { + var self = this; var accountId; var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); if (rows[0] && rows[0].id) @@ -56,6 +53,7 @@ function* main(account) accountId = row.id; } var srv = new Imap(account.imap); + self.srv = srv; srv.once('ready', gen.cb()); yield srv.connect(); var [ boxes ] = yield srv.getBoxes(gen.ef()); @@ -63,13 +61,23 @@ function* main(account) { var [ box ] = yield srv.openBox(k, true, gen.ef()); var boxId; - var [ rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) - .where({ account_id: accountId, name: box.name }).returning('id').rows(gen.ef()); - if (rows[0] && rows[0].id) + // IMAP sync: http://tools.ietf.org/html/rfc4549 + var [ row ] = yield pg.select('*').from('folders').where({ account_id: accountId, name: box.name }).row(gen.ef()); + if (row) { - // IMAP sync: http://tools.ietf.org/html/rfc4549 - // TODO: check old uidvalidity - boxId = rows[0].id; + boxId = row.id; + if (row.uidvalidity != box.uidvalidity) + { + yield pg.delete('messages').where({ folder_id: row.id }) + .where(pg.sql('uid is not null')).run(gen.ef()); + } + else + { + yield pg.update('messages', { flags: pg.sql('(flags || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id }) + .where(pg.sql('uid is not null')).run(gen.ef()); + } + yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) + .where({ id: row.id }).run(gen.ef()); } else { @@ -87,201 +95,36 @@ function* main(account) bodies: 'HEADER' }); - var parsed = 0, paused = false, synced = 0; - var messages = []; - - function* parseMessage(msg, seqnum, boxId) - { - var msgrow = {}; - var attrs; - msg.on('body', function(stream, info) - { - var buffer = ''; - stream.on('data', function(chunk) - { - buffer += chunk.toString('utf8'); - }); - stream.once('end', function() - { - msgrow.body = ''; - msgrow.headers = buffer; - }); - }); - msg.once('attributes', function(a) { - attrs = a; - }); - yield msg.once('end', gen.cb()); - msgrow.uid = attrs.uid; - msgrow.folder_id = boxId; - msgrow.flags = 0; - for (var i = 0; i < attrs.flags.length; i++) - msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()]; - msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag - return [ msgrow, attrs ]; - } - - function* saveMessages(messages, boxId) - { - var err; - yield gen.throttle(3); - try - { - var uids = []; - var uk = {}; - for (var i = 0; i < messages.length; i++) - { - uids.push(messages[i][1].uid); - uk[messages[i][1].uid] = messages[i]; - } - var [ exist ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef()); - for (var i = 0; i < exist.length; i++) - { - var fl = uk[exist[i].uid][0].flags; - if (fl != exist[i].flags) - yield pg.update('messages', { flags: fl }).where({ folder_id: boxId, uid: exist[i].uid }).run(gen.ef()); - delete uk[exist[i].uid]; - } - synced += exist.length; - process.stderr.write('\rsynchronizing '+synced); - var run = []; - for (var i in uk) - run.push(addMessage(uk[i][0], uk[i][1])); - if (run.length) - { - process.stderr.write('\n'); - yield gen.runParallel(run, gen.cb()); - } - } - catch (e) - { - err = e; - } - parsed -= messages.length; - if (paused && parsed < 20) - { - paused = false; - srv._parser._ignoreReadable = false; - process.nextTick(srv._parser._cbReadable); - } - if (err) - throw err; - } - - function* addMessage(msgrow, attrs) - { - var pgtx, end_transaction; - try - { - [ pgtx, end_transaction ] = yield pg.transaction(gen.cb(), function(e) { if (e) throw e; }); - - var header = Imap.parseHeader(msgrow.headers); - for (var i in header) - for (var k = 0; k < header[i].length; k++) - header[i][k] = header[i][k].replace(/\x00/g, ''); - header.from = header.from && splitEmails(header.from[0])[0]; - header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; - var re = /(<[^>]*>)/; - header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); - if (header.references.length) - { - if (header.references.length > 10) - header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9)); - if (!header['in-reply-to'] || !header['in-reply-to'][0]) - header['in-reply-to'] = [ header.references[header.references.length-1] ]; - else if (header.references[header.references.length-1] != header['in-reply-to'][0]) - header.references.push(header['in-reply-to'][0]); - } - if (header.date) - { - var t = Date.parse(header.date[0]); - if (!isNaN(t)) - header.date = new Date(t); - } - if (!header.date) - header.date = new Date(attrs.date); - - msgrow.from_email = header.from && header.from.email || ''; - msgrow.from_name = header.from && header.from.name || ''; - msgrow.replyto_email = header.replyto && header.replyto.email || ''; - msgrow.replyto_name = header.replyto && header.replyto.name || ''; - msgrow.to_list = header.to && header.to[0] || ''; - msgrow.cc_list = header.cc && header.cc[0] || ''; - msgrow.bcc_list = header.bcc && header.bcc[0] || ''; - msgrow.subject = header.subject && header.subject[0] || ''; - msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; - msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; - msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); - msgrow.time = header.date; - msgrow.refs = toPgArray(header.references); - if (header.references.length) - { - var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(pg.sql.in('messageid', header.references)).val(gen.ef()); - if (!threadId) - { - [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef()); - } - if (threadId) - { - try - { - yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') }) - .where({ id: threadId }).run(gen.ef()); - } - catch (e) - { - throw new Error(''+e); - } - } - msgrow.thread_id = threadId; - } - console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); - [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef()); - if (!msgrow.thread_id) - { - [ msgrow.thread_id ] = yield pgtx.insert('threads', { - first_msg: msgrow.id, - msg_count: 1 - }).returning('id').val(gen.ef()); - yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef()); - } - - end_transaction(); - } - catch (e0) - { - if (end_transaction) - end_transaction(); - throw e0; - } - } + self.parsed = 0; + self.paused = false; + self.synced = 0; + self.pending = []; f.on('message', function(msg, seqnum) { gen.run(function*() { - var [ msgrow, attrs ] = yield* parseMessage(msg, seqnum, boxId); + var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId); // Workaround memory leak in node-imap // TODO: send pull request if (srv._curReq.fetchCache) delete srv._curReq.fetchCache[seqnum]; - messages.push([ msgrow, attrs ]); - parsed++; - if (!paused && parsed >= 20) + self.pending.push([ msgrow, attrs ]); + self.parsed++; + if (!self.paused && self.parsed >= 20) { // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! srv._parser._ignoreReadable = true; - paused = true; + self.paused = true; } - if (messages.length >= 20) + if (self.pending.length >= 20) { - var m = messages; - messages = []; - yield gen.run(saveMessages(m, boxId), gen.cb()); + var m = self.pending; + self.pending = []; + yield gen.run(self.saveMessages(m, boxId), gen.cb()); } }); }); @@ -289,13 +132,199 @@ function* main(account) yield f.once('end', function() { process.stderr.write('\n'); - if (messages.length > 0) - gen.run(saveMessages(messages, boxId)); + if (self.pending.length > 0) + gen.run(self.saveMessages(self.pending, boxId)); + self.pending = []; cb(); }); + console.log(boxId); + yield pg.update('threads', { first_msg: null }) + .where(pg.sql('first_msg IN ('+ + pg.select('id').from('messages').where({ folder_id: boxId }) + .where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])')) + +')')).run(gen.ef()); + yield pg.delete('messages').where({ folder_id: boxId }) + .where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])')).run(gen.ef()); + yield pg.update('threads', + { first_msg: pg.sql('('+ + pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')' + ) }).where(pg.sql('first_msg IS NULL')).run(gen.ef()); + yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); yield srv.closeBox(gen.cb()); } srv.end(); + self.srv = null; } -gen.run(main(cfg.accounts[0]), function() { process.exit() }); +Syncer.parseMessage = function*(msg, seqnum, boxId) +{ + var msgrow = {}; + var attrs; + msg.on('body', function(stream, info) + { + var buffer = ''; + stream.on('data', function(chunk) + { + buffer += chunk.toString('utf8'); + }); + stream.once('end', function() + { + msgrow.body = ''; + msgrow.headers = buffer; + }); + }); + msg.once('attributes', function(a) { + attrs = a; + }); + yield msg.once('end', gen.cb()); + msgrow.uid = attrs.uid; + msgrow.folder_id = boxId; + msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); + var nf = msgrow.flags.filter(f => f != 'seen'); + msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; + return [ msgrow, attrs ]; +} + +Syncer.saveMessages = function*(messages, boxId) +{ + var self = this; + var err; + yield gen.throttle(2); + try + { + var uids = []; + var uk = {}; + for (var i = 0; i < messages.length; i++) + { + uids.push(messages[i][1].uid); + uk[messages[i][1].uid] = messages[i]; + } + yield pg.update('messages', { flags: pg.sql('array_remove(flags, \'deleted\')') }) + .where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).run(gen.ef()); + var [ exist ] = yield pg.select('uid, flags').from('messages') + .where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef()); + for (var i = 0; i < exist.length; i++) + { + var fl = uk[exist[i].uid][0].flags; + if (fl.sort().join('\\') != exist[i].flags.sort().join('\\')) + yield pg.update('messages', { flags: toPgArray(fl) }).where({ folder_id: boxId, uid: exist[i].uid }).run(gen.ef()); + delete uk[exist[i].uid]; + } + self.synced += exist.length; + process.stderr.write('\rsynchronizing '+self.synced); + var run = []; + for (var i in uk) + run.push(this.addMessage(uk[i][0], uk[i][1])); + if (run.length) + { + process.stderr.write('\n'); + yield gen.runParallel(run, gen.cb()); + } + } + catch (e) + { + err = e; + } + self.parsed -= messages.length; + if (self.paused && self.parsed < 20) + { + self.paused = false; + self.srv._parser._ignoreReadable = false; + process.nextTick(self.srv._parser._cbReadable); + } + if (err) + throw err; +} + +Syncer.addMessage = function*(msgrow, attrs) +{ + var pgtx, end_transaction; + try + { + [ pgtx, end_transaction ] = yield pg.transaction(gen.cb(), function(e) { if (e) throw e; }); + + var header = Imap.parseHeader(msgrow.headers); + for (var i in header) + for (var k = 0; k < header[i].length; k++) + header[i][k] = header[i][k].replace(/\x00/g, ''); + header.from = header.from && splitEmails(header.from[0])[0]; + header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; + var re = /(<[^>]*>)/; + header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); + if (header.references.length) + { + if (header.references.length > 10) + header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9)); + if (!header['in-reply-to'] || !header['in-reply-to'][0]) + header['in-reply-to'] = [ header.references[header.references.length-1] ]; + else if (header.references[header.references.length-1] != header['in-reply-to'][0]) + header.references.push(header['in-reply-to'][0]); + } + if (header.date) + { + var t = Date.parse(header.date[0]); + if (!isNaN(t)) + header.date = new Date(t); + } + if (!header.date) + header.date = new Date(attrs.date); + + msgrow.from_email = header.from && header.from.email || ''; + msgrow.from_name = header.from && header.from.name || ''; + msgrow.replyto_email = header.replyto && header.replyto.email || ''; + msgrow.replyto_name = header.replyto && header.replyto.name || ''; + msgrow.to_list = header.to && header.to[0] || ''; + msgrow.cc_list = header.cc && header.cc[0] || ''; + msgrow.bcc_list = header.bcc && header.bcc[0] || ''; + msgrow.subject = header.subject && header.subject[0] || ''; + msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; + msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; + msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); + msgrow.time = header.date; + msgrow.flags = toPgArray(msgrow.flags); + msgrow.refs = toPgArray(header.references); + if (header.references.length) + { + var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') + .where(pg.sql.in('messageid', header.references)).val(gen.ef()); + if (!threadId) + { + [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') + .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef()); + } + if (threadId) + { + try + { + yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') }) + .where({ id: threadId }).run(gen.ef()); + } + catch (e) + { + throw new Error(''+e); + } + } + msgrow.thread_id = threadId; + } + console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); + [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef()); + if (!msgrow.thread_id) + { + [ msgrow.thread_id ] = yield pgtx.insert('threads', { + first_msg: msgrow.id, + msg_count: 1 + }).returning('id').val(gen.ef()); + yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef()); + } + + end_transaction(); + } + catch (e0) + { + if (end_transaction) + end_transaction(); + throw e0; + } +} + +gen.run(Syncer.sync(cfg.accounts[0]), function() { process.exit() });