likeopera-backend/operetta.js

408 lines
14 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// TODO: Висеть в виде демона и сразу получать новые письма (IDLE)
// TODO: Сделать веб-сервер для обновления view
// TODO: Сделать подписки на новые сообщения по вебсокетам
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
var cfg = require('./cfg.json');
require('heapdump');
var gen = require('gen-thread');
var Imap = require('imap');
var inspect = require('util').inspect;
var iconv = require('iconv-lite');
var bricks = require('pg-bricks');
var pg = bricks.configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database);
function splitEmails(s)
{
var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|<?([^<>]+)>?)/; // '
var m, r = [];
while (m = re.exec(s))
{
s = s.substr(m[0].length);
r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() });
}
return r;
}
function toPgArray(a)
{
a = JSON.stringify(a);
return '{'+a.substring(1, a.length-1)+'}';
}
var Syncer = {
};
Syncer.sync = function*(account)
{
var self = this;
var accountId;
var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef());
if (rows[0] && rows[0].id)
accountId = rows[0].id;
else
{
var [ row ] = yield pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('id').row(gen.ef());
accountId = row.id;
}
var srv = new Imap(account.imap);
self.srv = srv;
srv.once('ready', gen.cb());
yield srv.connect();
var [ boxes ] = yield srv.getBoxes(gen.ef());
for (var k in boxes)
{
var [ box ] = yield srv.openBox(k, true, gen.ef());
var boxId;
// IMAP sync: http://tools.ietf.org/html/rfc4549
var [ row ] = yield pg.select('*').from('folders')
.where({ account_id: accountId, name: box.name }).rows(gen.ef());
self.versionTag = 0;
if (row.length)
{
row = row[0];
boxId = row.id;
if (row.uidvalidity != box.uidvalidity)
{
yield pg.delete('messages').where({ folder_id: row.id })
.where(pg.sql('uid is not null')).run(gen.ef());
}
else
{
[ self.versionTag ] = yield pg.select('MAX(vertag)').from('messages')
.where({ folder_id: row.id }).val(gen.ef());
self.versionTag = self.versionTag || 0;
}
yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
.where({ id: row.id }).run(gen.ef());
}
else
{
[ row ] = yield pg.insert('folders', {
name: box.name,
uidvalidity: box.uidvalidity,
account_id: accountId,
unread_count: box.messages.new,
// total_count: box.messages.count
}).returning('id').row(gen.ef());
boxId = row.id;
}
self.missing = [];
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
if (maxUid)
{
// list messages, update flags and version tag
self.versionTag++;
if (self.versionTag >= 0x7fffffff)
{
yield pg.update('messages', { vertag: 0 }).where({ folder_id: boxId }).run(gen.ef());
self.versionTag = 1;
}
process.stderr.write('\rsynchronizing 0');
yield* self.runFetch('1:'+maxUid, {}, boxId, 'updateFlags');
process.stderr.write('\n');
// delete messages removed from IMAP server
yield pg.update('threads', { first_msg: null })
.where(pg.sql('first_msg IN ('+
pg.select('id').from('messages')
.where({ folder_id: boxId })
.where(pg.sql('uid is not null'))
.where(pg.sql.lt('vertag', self.versionTag))
+')')).run(gen.ef());
yield pg.delete('messages')
.where({ folder_id: boxId })
.where(pg.sql('uid is not null'))
.where(pg.sql.lt('vertag', self.versionTag)).run(gen.ef());
yield pg.update('threads',
{ first_msg: pg.sql('('+
pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')'
) }).where(pg.sql('first_msg IS NULL')).run(gen.ef());
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
}
// fetch new messages
self.missing.push((maxUid ? maxUid+1 : 1)+':*');
yield* self.runFetch(self.missing, {
size: true,
bodies: 'HEADER'
}, boxId, 'saveMessages');
yield srv.closeBox(gen.cb());
}
srv.end();
self.srv = null;
}
Syncer.runFetch = function*(what, params, boxId, processor)
{
var self = this;
var f = self.srv.fetch(what, params);
self.parsed = 0;
self.paused = false;
self.synced = 0;
self.pending = [];
var cb, wait;
f.on('message', function(msg, seqnum)
{
gen.run(self.onMessage(msg, seqnum, boxId, processor), checkFinish, function(e) { checkFinish(); throw e; });
});
cb = gen.cb();
yield f.once('end', function()
{
wait = true;
if (self.parsed <= 0)
cb();
else if (self.pending.length > 0)
gen.run(self[processor](self.pending, boxId), saveLast, function(e) { saveLast(); throw e; });
});
function saveLast()
{
self.parsed -= self.pending.length;
self.pending = [];
checkFinish();
}
function checkFinish()
{
if (self.parsed <= 0 && wait)
cb();
}
};
Syncer.onMessage = function*(msg, seqnum, boxId, processor)
{
var self = this;
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId);
self.pending.push([ msgrow, attrs ]);
self.parsed++;
if (!self.paused && self.parsed >= 20)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
self.srv._parser._ignoreReadable = true;
self.paused = true;
}
if (self.pending.length >= 20)
{
var m = self.pending;
self.pending = [];
var err;
try
{
yield gen.run(self[processor](m, boxId), gen.cb());
}
catch (e)
{
err = e;
}
self.parsed -= m.length;
if (self.paused && self.parsed < 20)
{
self.paused = false;
self.srv._parser._ignoreReadable = false;
process.nextTick(self.srv._parser._cbReadable);
}
if (err)
throw err;
}
}
Syncer.parseMessage = function*(msg, seqnum, boxId)
{
var msgrow = {};
var attrs;
msg.on('body', function(stream, info)
{
var buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
msgrow.body = '';
var b = buffer.toString('utf8');
if (b.indexOf('<27>') >= 0)
{
let enc = /Content-type:\s*[^;\n]*;\s*charset=(\S+)/i.exec(b);
enc = enc ? enc[1] : 'windows-1251';
try { b = iconv.decode(buffer, enc); }
catch (e) {}
}
if (b.indexOf('\0') >= 0)
b = b.substr(0, b.indexOf('\0'));
msgrow.headers = b;
});
});
msg.once('attributes', function(a) {
attrs = a;
});
yield msg.once('end', gen.cb());
msgrow.uid = attrs.uid;
msgrow.folder_id = boxId;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen');
msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
// Workaround memory leak in node-imap
// TODO: send pull request
if (this.srv._curReq && this.srv._curReq.fetchCache)
delete this.srv._curReq.fetchCache[seqnum];
return [ msgrow, attrs ];
}
Syncer.updateFlags = function*(messages, boxId)
{
yield gen.throttle(3);
var self = this;
var rows = messages.map(m => ({
uid: m[0].uid,
flags: toPgArray(m[0].flags)
}));
var [ updated ] = yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]'), vertag: self.versionTag })
.from('('+pg.sql.values(rows)+') AS t (uid, flags)')
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).returning('m.uid').rows(gen.ef());
var uh = {}, i;
for (i = 0; i < updated.length; i++)
uh[updated[i].uid] = true;
for (i = 0; i < messages.length; i++)
if (!uh[messages[i][0].uid])
self.missing.push(messages[i][0].uid);
self.synced += messages.length;
process.stderr.write('\rsynchronizing '+self.synced);
}
Syncer.saveMessages = function*(messages, boxId)
{
var self = this;
yield gen.throttle(2);
var uids = messages.map(m => m[1].uid);
var [ exist ] = yield pg.select('uid, flags').from('messages')
.where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef());
uids = {};
for (var i = 0; i < exist.length; i++)
uids[exist[i].uid] = true;
for (var i = 0; i < messages.length; i++)
if (!uids[messages[i][1].uid])
yield* this.addMessage(messages[i][0], messages[i][1]);
}
Syncer.addMessage = function*(msgrow, attrs)
{
var self = this;
var pgtx, end_transaction;
try
{
[ pgtx, end_transaction ] = yield pg.transaction(gen.cb(), function(e) { if (e) throw e; });
var header = Imap.parseHeader(msgrow.headers);
for (var i in header)
for (var k = 0; k < header[i].length; k++)
header[i][k] = header[i][k].replace(/\x00/g, '');
header.from = header.from && splitEmails(header.from[0])[0];
header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0];
var re = /(<[^>]*>)/;
header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re));
if (header.references.length)
{
if (header.references.length > 10)
header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9));
if (!header['in-reply-to'] || !header['in-reply-to'][0])
header['in-reply-to'] = [ header.references[header.references.length-1] ];
else if (header.references[header.references.length-1] != header['in-reply-to'][0])
header.references.push(header['in-reply-to'][0]);
}
if (header.date)
{
var t = Date.parse(header.date[0]);
if (!isNaN(t))
header.date = new Date(t);
else
header.date = null;
}
if (!header.date)
header.date = new Date(attrs.date);
msgrow.from_email = header.from && header.from.email || '';
msgrow.from_name = header.from && header.from.name || '';
msgrow.replyto_email = header.replyto && header.replyto.email || '';
msgrow.replyto_name = header.replyto && header.replyto.name || '';
msgrow.to_list = header.to && header.to[0] || '';
msgrow.cc_list = header.cc && header.cc[0] || '';
msgrow.bcc_list = header.bcc && header.bcc[0] || '';
msgrow.subject = header.subject && header.subject[0] || '';
msgrow.messageid = header['message-id'] && header['message-id'][0] || '';
msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || '';
msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1');
msgrow.time = header.date;
msgrow.flags = toPgArray(msgrow.flags);
msgrow.refs = toPgArray(header.references);
msgrow.vertag = self.versionTag;
var thisIsFirst = false;
if (header.references.length)
{
let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(pg.sql.in('messageid', header.references)).val(gen.ef());
if (!threadId)
{
[ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef());
if (threadId)
thisIsFirst = true;
}
msgrow.thread_id = threadId;
}
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
[ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef());
if (!msgrow.thread_id)
{
[ msgrow.thread_id ] = yield pgtx.insert('threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(gen.ef());
yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef());
}
else
{
let upd = pgtx.update('threads', { msg_count: pg.sql('msg_count+1') });
if (thisIsFirst)
upd.first_msg = msgrow.id;
yield upd.where({ id: msgrow.threadId }).run(gen.ef());
}
end_transaction();
}
catch (e0)
{
if (end_transaction)
end_transaction();
throw e0;
}
}
gen.run(function*()
{
for (var i = 0; i < cfg.accounts.length; i++)
yield* Syncer.sync(cfg.accounts[i]);
process.exit();
});