likeopera-backend/Syncer.js

428 lines
15 KiB
JavaScript

const gen = require('gen-thread');
const Imap = require('imap');
const ImapManager = require('./ImapManager.js');
module.exports = Syncer;
function Syncer(pg)
{
this.syncInProgress = false;
this.pg = pg;
this.imap = new ImapManager();
}
Syncer.prototype.init = function*(cfg)
{
for (var i = 0; i < cfg.accounts.length; i++)
yield* this.addAccount(cfg.accounts[i]);
yield* this.loadAccounts();
}
Syncer.prototype.syncAll = function*()
{
this.syncInProgress = true;
for (var id in this.accounts)
yield* this.syncAccount(this.accounts[id]);
this.syncInProgress = false;
}
Syncer.prototype.addAccount = function*(account)
{
var self = this;
var [ row ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef());
if (row.length)
{
row = row[0];
}
else
{
[ row ] = yield this.pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('*').row(gen.ef());
}
return row.id;
}
Syncer.prototype.loadAccounts = function*()
{
let [ rows ] = yield this.pg.select('*').from('accounts').rows(gen.ef());
this.accounts = {};
for (var i = 0; i < rows.length; i++)
{
this.accounts[rows[i].id] = rows[i];
this.imap.setServer(rows[i].id, rows[i].settings.imap);
}
}
Syncer.prototype.syncAccount = function*(account)
{
var self = this;
var accountId;
var [ rows ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef());
if (rows[0] && rows[0].id)
accountId = rows[0].id;
else
{
var [ row ] = yield this.pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('id').row(gen.ef());
accountId = row.id;
}
var srv = yield* self.imap.getConnection(accountId, null, 'S');
var [ boxes ] = yield srv.getBoxes(gen.ef());
for (var k in boxes)
{
var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase();
yield* self.syncBox(srv, accountId, k, boxKind, true);
}
yield* self.runIdle(accountId, srv);
self.imap.releaseConnection(accountId, 'S');
}
Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull)
{
var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef());
// IMAP sync: http://tools.ietf.org/html/rfc4549
var [ boxRow ] = yield this.pg.select('*').from('folders')
.where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef());
if (boxRow.length)
{
boxRow = boxRow[0];
if (boxRow.uidvalidity != boxStatus.uidvalidity)
{
yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxRow.id }, this.pg.sql('uid is not null')));
boxRow.uidvalidity = boxStatus.uidvalidity;
}
}
else
{
[ boxRow ] = yield this.pg.insert('folders', {
name: boxStatus.name,
uidvalidity: boxStatus.uidvalidity,
account_id: accountId,
highestmodseq: 0,
kind: boxKind||''
//unread_count: boxStatus.messages.new,
//total_count: boxStatus.messages.total,
}).returning('id').row(gen.ef());
}
// fetch new messages
var missing = [];
var [ maxUid ] = yield this.pg.select('MAX(uid)').from('messages')
.where({ folder_id: boxRow.id }).val(gen.ef());
if (boxStatus.highestmodseq)
{
process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n');
yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing);
boxRow.highestmodseq = boxStatus.highestmodseq;
}
else if (doFull && maxUid)
{
// list messages, update flags and version tag
process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n');
yield* this.fullResync(srv, boxRow.id, maxUid, missing);
}
missing.push((maxUid ? maxUid+1 : 1)+':*');
yield* this.imap.runFetch(srv, missing, {
size: true,
bodies: 'HEADER'
}, (messages, state) => this.saveMessages(messages, boxRow.id, state));
yield this.pg.update('folders', {
uidvalidity: boxRow.uidvalidity,
highestmodseq: boxRow.highestmodseq||0
}).where({ id: boxRow.id }).run(gen.ef());
}
Syncer.prototype.runIdle = function*(accountId, srv)
{
var self = this;
yield srv.openBox('INBOX', true, gen.ef());
srv.on('uidvalidity', function(uidvalidity)
{
// uidvalidity changes (FUUUU) remove everything
});
srv.on('mail', function(count)
{
// <count> new messages arrived while idling, fetch them
gen.run(function*()
{
var srv = yield* self.imap.getConnection(accountId, null, 'S');
yield* self.syncBox(srv, accountId, 'INBOX');
self.imap.releaseConnection(accountId, 'S');
});
});
srv.on('vanish', function(uids)
{
// messages expunged by uids
console.log([ 'VANISH', uids ]);
});
srv.on('expunge', function(seqno)
{
// message expunged by (FUUUU) sequence number
console.log(arguments);
});
}
Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing)
{
var [ flags ] = yield this.pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef());
flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {});
var updateFlags = [];
process.stderr.write('\rsynchronizing 0');
yield* this.imap.runFetch(
srv, '1:'+maxUid, {},
(messages, state) => this.queueFlags(messages, boxId, state),
{ flags: flags, updateFlags: updateFlags, missing: missing||[] }
);
process.stderr.write('\n');
yield* this.updateFlags(boxId, updateFlags);
// delete messages removed from IMAP server
flags = Object.keys(flags);
if (flags.length)
yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql.in('uid', flags)));
}
Syncer.prototype.queueFlags = function*(messages, boxId, fetchState)
{
for (var i = 0; i < messages.length; i++)
{
var m = messages[i][0];
if (!fetchState.flags[m.uid])
fetchState.missing.push(m.uid);
else
{
if (fetchState.flags[m.uid].join(',') != m.flags.join(','))
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
delete fetchState.flags[m.uid];
}
}
fetchState.synced += messages.length;
process.stderr.write('\rsynchronizing '+fetchState.synced);
}
Syncer.prototype.updateFlags = function*(boxId, updateFlags, checkMissing)
{
if (updateFlags.length)
{
var sql = this.pg.update('messages m', { flags: this.pg.sql('t.flags::varchar(255)[]') })
.from('('+this.pg.sql.values(updateFlags)+') AS t (uid, flags)')
.where({ 'm.folder_id': boxId }).where(this.pg.sql('m.uid=t.uid'));
if (checkMissing)
{
var [ updated ] = yield sql.returning('m.uid').rows(gen.ef());
var missing = {};
for (var i = 0; i < updateFlags.length; i++)
missing[updateFlags[i].uid] = true;
for (var i = 0; i < updated.length; i++)
delete missing[updated[i].uid];
return Object.keys(missing);
}
else
yield sql.run(gen.ef());
}
return [];
}
Syncer.prototype.quickResync = function*(srv, boxId, maxUid, changedSince, missing)
{
var updateFlags = [];
var vanished = [];
var onVanish = function(dias)
{
vanished = vanished.concat(vanished, dias);
};
srv.on('vanish', onVanish);
yield* this.imap.runFetch(
srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } },
(messages, state) => queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags }
);
srv.removeListener('vanish', onVanish);
var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true);
if (missing)
missing.push.apply(missing, checkedMissing);
if (vanished.length)
{
let lst = [], dia = [];
for (let i = 0; i < vanished.length; i++)
{
if (vanished[i][1])
dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]);
else
lst.push(vanished[i][0]);
}
if (lst.length)
dia.push('uid IN ('+lst.join(',')+')');
yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql('('+dia.join(' OR ')+')')));
}
}
Syncer.prototype.queueQuickFlags = function*(messages, boxId, fetchState)
{
for (var i = 0; i < messages.length; i++)
{
var m = messages[i][0];
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
}
}
Syncer.prototype.deleteMessages = function*(where)
{
yield this.pg.update('threads', { first_msg: null })
.where(this.pg.sql('first_msg IN ('+this.pg.select('id').from('messages').where(where)+')'))
.run(gen.ef());
yield this.pg.delete('messages').where(where).run(gen.ef());
yield this.pg.update('threads',
{ first_msg: this.pg.sql('('+
this.pg.select('id').from('messages').where({ thread_id: this.pg.sql('threads.id') }).orderBy('time').limit(1)
+')') }).where(this.pg.sql('first_msg IS NULL')).run(gen.ef());
yield this.pg.delete('threads').where(this.pg.sql('first_msg IS NULL')).run(gen.ef());
}
Syncer.prototype.saveMessages = function*(messages, boxId)
{
var self = this;
yield gen.throttle(2);
var uids = messages.map(m => m[1].uid);
var [ exist ] = yield this.pg.select('uid, flags').from('messages')
.where({ folder_id: boxId }).where(this.pg.sql.in('uid', uids)).rows(gen.ef());
uids = {};
for (var i = 0; i < exist.length; i++)
uids[exist[i].uid] = true;
for (var i = 0; i < messages.length; i++)
if (!uids[messages[i][1].uid])
yield* this.addMessage(boxId, messages[i][0], messages[i][1]);
}
Syncer.prototype.addMessage = function*(boxId, msgrow, attrs)
{
var self = this;
var pgtx, end_transaction;
try
{
[ pgtx, end_transaction ] = yield this.pg.transaction(gen.cb(), function(e) { if (e) throw e; });
var header = Imap.parseHeader(msgrow.headers);
for (var i in header)
for (var k = 0; k < header[i].length; k++)
header[i][k] = header[i][k].replace(/\x00/g, '');
header.from = header.from && splitEmails(header.from[0])[0];
header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0];
var re = /(<[^>]*>)/;
header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re));
if (header.references.length)
{
if (header.references.length > 10)
header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9));
if (!header['in-reply-to'] || !header['in-reply-to'][0])
header['in-reply-to'] = [ header.references[header.references.length-1] ];
else if (header.references[header.references.length-1] != header['in-reply-to'][0])
header.references.push(header['in-reply-to'][0]);
}
if (header.date)
{
var t = Date.parse(header.date[0]);
if (!isNaN(t))
header.date = new Date(t);
else
header.date = null;
}
if (!header.date)
header.date = new Date(attrs.date);
msgrow.folder_id = boxId;
msgrow.from_email = header.from && header.from.email || '';
msgrow.from_name = header.from && header.from.name || '';
msgrow.replyto_email = header.replyto && header.replyto.email || '';
msgrow.replyto_name = header.replyto && header.replyto.name || '';
msgrow.to_list = header.to && header.to[0] || '';
msgrow.cc_list = header.cc && header.cc[0] || '';
msgrow.bcc_list = header.bcc && header.bcc[0] || '';
msgrow.subject = header.subject && header.subject[0] || '';
msgrow.messageid = header['message-id'] && header['message-id'][0] || '';
msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || '';
msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1');
msgrow.time = header.date;
msgrow.flags = toPgArray(msgrow.flags);
msgrow.refs = toPgArray(header.references);
var thisIsFirst = false;
if (header.references.length)
{
let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(this.pg.sql.in('messageid', header.references)).val(gen.ef());
if (!threadId)
{
[ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new this.pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef());
if (threadId)
thisIsFirst = true;
}
msgrow.thread_id = threadId;
}
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
[ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef());
if (!msgrow.thread_id)
{
[ msgrow.thread_id ] = yield pgtx.insert('threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(gen.ef());
yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef());
}
else
{
let upd = pgtx.update('threads', { msg_count: this.pg.sql('msg_count+1') });
if (thisIsFirst)
upd.first_msg = msgrow.id;
yield upd.where({ id: msgrow.threadId }).run(gen.ef());
}
end_transaction();
}
catch (e0)
{
if (end_transaction)
end_transaction();
throw e0;
}
}
function splitEmails(s)
{
var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|<?([^<>]+)>?)/; // '
var m, r = [];
while (m = re.exec(s))
{
s = s.substr(m[0].length);
r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() });
}
return r;
}
function toPgArray(a)
{
a = JSON.stringify(a);
return '{'+a.substring(1, a.length-1)+'}';
}