const gen = require('gen-thread'); const Imap = require('imap'); const ImapManager = require('./ImapManager.js'); module.exports = Syncer; function Syncer(pg) { this.syncInProgress = false; this.pg = pg; this.imap = new ImapManager(); this.runIdle = this.runIdle.bind(this); this.stopIdle = this.stopIdle.bind(this); } Syncer.prototype.init = function*(cfg) { for (var i = 0; i < cfg.accounts.length; i++) yield* this.addAccount(cfg.accounts[i]); yield* this.loadAccounts(); } Syncer.prototype.syncAll = function*() { this.syncInProgress = true; for (var id in this.accounts) yield* this.syncAccount(this.accounts[id]); this.syncInProgress = false; } Syncer.prototype.addAccount = function*(account) { var self = this; var [ row ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); if (row.length) { row = row[0]; yield this.pg.update('accounts', { settings: { imap: account.imap, folders: account.folders } }).where({ id: row.id }).run(gen.ef()); } else { [ row ] = yield this.pg.insert('accounts', { name: account.name, email: account.email, settings: { imap: account.imap } }).returning('*').row(gen.ef()); } return row.id; } Syncer.prototype.loadAccounts = function*() { let [ rows ] = yield this.pg.select('*').from('accounts').rows(gen.ef()); this.accounts = {}; for (var i = 0; i < rows.length; i++) { this.accounts[rows[i].id] = rows[i]; this.imap.setServer(rows[i].id, rows[i].settings.imap); } } Syncer.prototype.getSyncConnection = function*(accountId, boxName) { var srv = yield* this.imap.getConnection(accountId, null, 'S', this.runIdle, this.stopIdle); return srv; } Syncer.prototype.idleUidvalidity = function(accountId, uidvalidity) { // uidvalidity changes (FUUUU) remove everything and resync } Syncer.prototype.idleMail = function(accountId, count) { // new messages arrived while idling, fetch them var self = this; gen.run(function*() { var srv = yield* self.getSyncConnection(accountId); yield* self.syncBox(srv, accountId, 'INBOX'); self.releaseSyncConnection(accountId); }); } Syncer.prototype.idleVanish = function(accountId, uids) { // messages expunged by uids var self = this; gen.run(function*() { let [ boxId ] = yield* self.pg.select('id').from('folders') .where({ name: 'INBOX', account_id: accountId }).val(gen.ef()); yield* self.deleteVanished(boxId, uids); }); } Syncer.prototype.idleExpunge = function(accountId, seqno) { // message expunged by (FUUUU) sequence number(s?) var self = this; gen.run(function*() { var srv = yield* self.getSyncConnection(accountId); yield* self.syncBox(srv, accountId, 'INBOX'); self.releaseSyncConnection(accountId); }); } Syncer.prototype.runIdle = function(accountId, srv) { var self = this; if (!srv._idleCallbacks) { srv._idleCallbacks = { uidvalidity: this.idleUidvalidity.bind(this, accountId), mail: this.idleMail.bind(this, accountId), vanish: this.idleVanish.bind(this, accountId), expunge: this.idleExpunge.bind(this, accountId) } } for (var i in srv._idleCallbacks) { srv.on(i, srv._idleCallbacks[i]); } srv.openBox('INBOX', true); } Syncer.prototype.stopIdle = function(accountId, srv) { for (var i in srv._idleCallbacks) { srv.removeListener(i, srv._idleCallbacks[i]); } } Syncer.prototype.releaseSyncConnection = function*(accountId, boxName) { this.imap.releaseConnection(accountId, 'S'); } Syncer.prototype.syncAccount = function*(account) { var self = this; var accountId; var [ rows ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); if (rows[0] && rows[0].id) accountId = rows[0].id; else { var [ row ] = yield this.pg.insert('accounts', { name: account.name, email: account.email, settings: { imap: account.imap } }).returning('id').row(gen.ef()); accountId = row.id; } var srv = yield* self.getSyncConnection(accountId); var [ boxes ] = yield srv.getBoxes(gen.ef()); for (var k in boxes) { var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); yield* self.syncBox(srv, accountId, k, boxKind, true); } self.releaseSyncConnection(accountId); } Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull) { var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef()); // IMAP sync: http://tools.ietf.org/html/rfc4549 var [ boxRow ] = yield this.pg.select('*').from('folders') .where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef()); if (boxRow.length) { boxRow = boxRow[0]; if (boxRow.uidvalidity != boxStatus.uidvalidity) { yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxRow.id }, this.pg.sql('uid is not null'))); boxRow.uidvalidity = boxStatus.uidvalidity; } } else { [ boxRow ] = yield this.pg.insert('folders', { name: boxStatus.name, uidvalidity: boxStatus.uidvalidity, account_id: accountId, highestmodseq: 0, kind: boxKind||'' //unread_count: boxStatus.messages.new, //total_count: boxStatus.messages.total, }).returning('id').row(gen.ef()); } // fetch new messages var missing = []; var [ maxUid ] = yield this.pg.select('MAX(uid)').from('messages') .where({ folder_id: boxRow.id }).val(gen.ef()); if (boxStatus.highestmodseq) { process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n'); yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); boxRow.highestmodseq = boxStatus.highestmodseq; } else if (doFull && maxUid) { // list messages, update flags and version tag process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n'); yield* this.fullResync(srv, boxRow.id, maxUid, missing); } missing.push((maxUid ? maxUid+1 : 1)+':*'); yield* this.imap.runFetch(srv, missing, { size: true, bodies: 'HEADER' }, (messages, state) => this.saveMessages(messages, boxRow.id, state)); yield this.pg.update('folders', { uidvalidity: boxRow.uidvalidity, highestmodseq: boxRow.highestmodseq||0 }).where({ id: boxRow.id }).run(gen.ef()); } Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing) { var [ flags ] = yield this.pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef()); flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); var updateFlags = []; process.stderr.write('\rsynchronizing 0'); yield* this.imap.runFetch( srv, '1:'+maxUid, {}, (messages, state) => this.queueFlags(messages, boxId, state), { flags: flags, updateFlags: updateFlags, missing: missing||[] } ); process.stderr.write('\n'); yield* this.updateFlags(boxId, updateFlags); // delete messages removed from IMAP server flags = Object.keys(flags); if (flags.length) yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql.in('uid', flags))); } Syncer.prototype.queueFlags = function*(messages, boxId, fetchState) { for (var i = 0; i < messages.length; i++) { var m = messages[i][0]; if (!fetchState.flags[m.uid]) fetchState.missing.push(m.uid); else { if (fetchState.flags[m.uid].join(',') != m.flags.join(',')) fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); delete fetchState.flags[m.uid]; } } fetchState.synced += messages.length; process.stderr.write('\rsynchronizing '+fetchState.synced); } Syncer.prototype.updateFlags = function*(boxId, updateFlags, checkMissing) { if (updateFlags.length) { var sql = this.pg.update('messages m', { flags: this.pg.sql('t.flags::varchar(255)[]') }) .from('('+this.pg.sql.values(updateFlags)+') AS t (uid, flags)') .where({ 'm.folder_id': boxId }).where(this.pg.sql('m.uid=t.uid')); if (checkMissing) { var [ updated ] = yield sql.returning('m.uid').rows(gen.ef()); var missing = {}; for (var i = 0; i < updateFlags.length; i++) missing[updateFlags[i].uid] = true; for (var i = 0; i < updated.length; i++) delete missing[updated[i].uid]; return Object.keys(missing); } else yield sql.run(gen.ef()); } return []; } Syncer.prototype.quickResync = function*(srv, boxId, maxUid, changedSince, missing) { var updateFlags = []; var vanished = []; var onVanish = function(dias) { vanished = vanished.concat(vanished, dias); }; srv.on('vanish', onVanish); yield* this.imap.runFetch( srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, (messages, state) => this.queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags } ); srv.removeListener('vanish', onVanish); var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true); if (missing) missing.push.apply(missing, checkedMissing); if (vanished.length) { yield* this.deleteVanished(boxId, vanished); } } Syncer.prototype.deleteVanished = function*(boxId, vanished) { let lst = [], dia = []; for (let i = 0; i < vanished.length; i++) { if (vanished[i][1]) dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]); else lst.push(vanished[i][0]); } if (lst.length) dia.push('uid IN ('+lst.join(',')+')'); yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql('('+dia.join(' OR ')+')'))); } Syncer.prototype.queueQuickFlags = function*(messages, boxId, fetchState) { for (var i = 0; i < messages.length; i++) { var m = messages[i][0]; fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); } } Syncer.prototype.deleteMessages = function*(where) { yield this.pg.update('threads', { first_msg: null }) .where(this.pg.sql('first_msg IN ('+this.pg.select('id').from('messages').where(where)+')')) .run(gen.ef()); yield this.pg.delete('messages').where(where).run(gen.ef()); yield this.pg.update('threads', { first_msg: this.pg.sql('('+ this.pg.select('id').from('messages').where({ thread_id: this.pg.sql('threads.id') }).orderBy('time').limit(1) +')') }).where(this.pg.sql('first_msg IS NULL')).run(gen.ef()); yield this.pg.delete('threads').where(this.pg.sql('first_msg IS NULL')).run(gen.ef()); } Syncer.prototype.saveMessages = function*(messages, boxId) { var self = this; yield gen.throttle(2); var uids = messages.map(m => m[1].uid); var [ exist ] = yield this.pg.select('uid, flags').from('messages') .where({ folder_id: boxId }).where(this.pg.sql.in('uid', uids)).rows(gen.ef()); uids = {}; for (var i = 0; i < exist.length; i++) uids[exist[i].uid] = true; for (var i = 0; i < messages.length; i++) if (!uids[messages[i][1].uid]) yield* this.addMessage(boxId, messages[i][0], messages[i][1]); } Syncer.prototype.addMessage = function*(boxId, msgrow, attrs) { var self = this; var pgtx, end_transaction; try { [ pgtx, end_transaction ] = yield this.pg.transaction(gen.cb(), function(e) { if (e) throw e; }); var header = Imap.parseHeader(msgrow.headers); for (var i in header) for (var k = 0; k < header[i].length; k++) header[i][k] = header[i][k].replace(/\x00/g, ''); header.from = header.from && splitEmails(header.from[0])[0]; header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; var re = /(<[^>]*>)/; header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); if (header.references.length) { if (header.references.length > 10) header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9)); if (!header['in-reply-to'] || !header['in-reply-to'][0]) header['in-reply-to'] = [ header.references[header.references.length-1] ]; else if (header.references[header.references.length-1] != header['in-reply-to'][0]) header.references.push(header['in-reply-to'][0]); } if (header.date) { var t = Date.parse(header.date[0]); if (!isNaN(t)) header.date = new Date(t); else header.date = null; } if (!header.date) header.date = new Date(attrs.date); msgrow.folder_id = boxId; msgrow.from_email = header.from && header.from.email || ''; msgrow.from_name = header.from && header.from.name || ''; msgrow.replyto_email = header.replyto && header.replyto.email || ''; msgrow.replyto_name = header.replyto && header.replyto.name || ''; msgrow.to_list = header.to && header.to[0] || ''; msgrow.cc_list = header.cc && header.cc[0] || ''; msgrow.bcc_list = header.bcc && header.bcc[0] || ''; msgrow.subject = header.subject && header.subject[0] || ''; msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); msgrow.time = header.date; msgrow.flags = toPgArray(msgrow.flags); msgrow.refs = toPgArray(header.references); var thisIsFirst = false; if (header.references.length) { let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') .where(this.pg.sql.in('messageid', header.references)).val(gen.ef()); if (!threadId) { [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') .where(new this.pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef()); if (threadId) thisIsFirst = true; } msgrow.thread_id = threadId; } console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef()); if (!msgrow.thread_id) { [ msgrow.thread_id ] = yield pgtx.insert('threads', { first_msg: msgrow.id, msg_count: 1 }).returning('id').val(gen.ef()); yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef()); } else { let upd = pgtx.update('threads', { msg_count: this.pg.sql('msg_count+1') }); if (thisIsFirst) upd.first_msg = msgrow.id; yield upd.where({ id: msgrow.threadId }).run(gen.ef()); } end_transaction(); } catch (e0) { if (end_transaction) end_transaction(); throw e0; } } function splitEmails(s) { var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|]+)>?)/; // ' var m, r = []; while (m = re.exec(s)) { s = s.substr(m[0].length); r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() }); } return r; } function toPgArray(a) { a = JSON.stringify(a); return '{'+a.substring(1, a.length-1)+'}'; }