likeopera-backend/operetta.js

331 lines
12 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
var cfg = require('./cfg.json');
require('heapdump');
var gen = require('gen-thread');
var Imap = require('imap');
var inspect = require('util').inspect;
//var pg;
//try { require('pg-native'); pg = require('pg').native; }
//catch(e) { pg = require('pg'); }
//var pg_pool = new pg.Pool(cfg.pg);
var bricks = require('pg-bricks');
var pg = bricks.configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database);
function splitEmails(s)
{
var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|<?([^<>]+)>?)/; // '
var m, r = [];
while (m = re.exec(s))
{
s = s.substr(m[0].length);
r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() });
}
return r;
}
function toPgArray(a)
{
a = JSON.stringify(a);
return '{'+a.substring(1, a.length-1)+'}';
}
var Syncer = {
};
Syncer.sync = function*(account)
{
var self = this;
var accountId;
var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef());
if (rows[0] && rows[0].id)
accountId = rows[0].id;
else
{
var [ row ] = pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('id').row(gen.ef());
accountId = row.id;
}
var srv = new Imap(account.imap);
self.srv = srv;
srv.once('ready', gen.cb());
yield srv.connect();
var [ boxes ] = yield srv.getBoxes(gen.ef());
for (var k in boxes)
{
var [ box ] = yield srv.openBox(k, true, gen.ef());
var boxId;
// IMAP sync: http://tools.ietf.org/html/rfc4549
var [ row ] = yield pg.select('*').from('folders').where({ account_id: accountId, name: box.name }).row(gen.ef());
if (row)
{
boxId = row.id;
if (row.uidvalidity != box.uidvalidity)
{
yield pg.delete('messages').where({ folder_id: row.id })
.where(pg.sql('uid is not null')).run(gen.ef());
}
else
{
yield pg.update('messages', { flags: pg.sql('(flags || array[\'deleted\']::varchar(255)[])') }).where({ folder_id: row.id })
.where(pg.sql('uid is not null')).run(gen.ef());
}
yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
.where({ id: row.id }).run(gen.ef());
}
else
{
var [ row ] = yield pg.insert('folders', {
name: box.name,
uidvalidity: box.uidvalidity,
account_id: accountId,
unread_count: box.messages.new,
// total_count: box.messages.count
}).returning('id').row(gen.ef());
boxId = row.id;
}
var f = srv.fetch('1:*', {
size: true,
bodies: 'HEADER'
});
self.parsed = 0;
self.paused = false;
self.synced = 0;
self.pending = [];
f.on('message', function(msg, seqnum)
{
gen.run(function*()
{
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId);
// Workaround memory leak in node-imap
// TODO: send pull request
if (srv._curReq.fetchCache)
delete srv._curReq.fetchCache[seqnum];
self.pending.push([ msgrow, attrs ]);
self.parsed++;
if (!self.paused && self.parsed >= 20)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
srv._parser._ignoreReadable = true;
self.paused = true;
}
if (self.pending.length >= 20)
{
var m = self.pending;
self.pending = [];
yield gen.run(self.saveMessages(m, boxId), gen.cb());
}
});
});
var cb = gen.cb();
yield f.once('end', function()
{
process.stderr.write('\n');
if (self.pending.length > 0)
gen.run(self.saveMessages(self.pending, boxId));
self.pending = [];
cb();
});
console.log(boxId);
yield pg.update('threads', { first_msg: null })
.where(pg.sql('first_msg IN ('+
pg.select('id').from('messages').where({ folder_id: boxId })
.where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])'))
+')')).run(gen.ef());
yield pg.delete('messages').where({ folder_id: boxId })
.where(pg.sql('(flags @> array[\'deleted\']::varchar(255)[])')).run(gen.ef());
yield pg.update('threads',
{ first_msg: pg.sql('('+
pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')'
) }).where(pg.sql('first_msg IS NULL')).run(gen.ef());
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
yield srv.closeBox(gen.cb());
}
srv.end();
self.srv = null;
}
Syncer.parseMessage = function*(msg, seqnum, boxId)
{
var msgrow = {};
var attrs;
msg.on('body', function(stream, info)
{
var buffer = '';
stream.on('data', function(chunk)
{
buffer += chunk.toString('utf8');
});
stream.once('end', function()
{
msgrow.body = '';
msgrow.headers = buffer;
});
});
msg.once('attributes', function(a) {
attrs = a;
});
yield msg.once('end', gen.cb());
msgrow.uid = attrs.uid;
msgrow.folder_id = boxId;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen');
msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
return [ msgrow, attrs ];
}
Syncer.saveMessages = function*(messages, boxId)
{
var self = this;
var err;
yield gen.throttle(2);
try
{
var uids = [];
var uk = {};
for (var i = 0; i < messages.length; i++)
{
uids.push(messages[i][1].uid);
uk[messages[i][1].uid] = messages[i];
}
yield pg.update('messages', { flags: pg.sql('array_remove(flags, \'deleted\')') })
.where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).run(gen.ef());
var [ exist ] = yield pg.select('uid, flags').from('messages')
.where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef());
for (var i = 0; i < exist.length; i++)
{
var fl = uk[exist[i].uid][0].flags;
if (fl.sort().join('\\') != exist[i].flags.sort().join('\\'))
yield pg.update('messages', { flags: toPgArray(fl) }).where({ folder_id: boxId, uid: exist[i].uid }).run(gen.ef());
delete uk[exist[i].uid];
}
self.synced += exist.length;
process.stderr.write('\rsynchronizing '+self.synced);
var run = [];
for (var i in uk)
run.push(this.addMessage(uk[i][0], uk[i][1]));
if (run.length)
{
process.stderr.write('\n');
yield gen.runParallel(run, gen.cb());
}
}
catch (e)
{
err = e;
}
self.parsed -= messages.length;
if (self.paused && self.parsed < 20)
{
self.paused = false;
self.srv._parser._ignoreReadable = false;
process.nextTick(self.srv._parser._cbReadable);
}
if (err)
throw err;
}
Syncer.addMessage = function*(msgrow, attrs)
{
var pgtx, end_transaction;
try
{
[ pgtx, end_transaction ] = yield pg.transaction(gen.cb(), function(e) { if (e) throw e; });
var header = Imap.parseHeader(msgrow.headers);
for (var i in header)
for (var k = 0; k < header[i].length; k++)
header[i][k] = header[i][k].replace(/\x00/g, '');
header.from = header.from && splitEmails(header.from[0])[0];
header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0];
var re = /(<[^>]*>)/;
header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re));
if (header.references.length)
{
if (header.references.length > 10)
header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9));
if (!header['in-reply-to'] || !header['in-reply-to'][0])
header['in-reply-to'] = [ header.references[header.references.length-1] ];
else if (header.references[header.references.length-1] != header['in-reply-to'][0])
header.references.push(header['in-reply-to'][0]);
}
if (header.date)
{
var t = Date.parse(header.date[0]);
if (!isNaN(t))
header.date = new Date(t);
}
if (!header.date)
header.date = new Date(attrs.date);
msgrow.from_email = header.from && header.from.email || '';
msgrow.from_name = header.from && header.from.name || '';
msgrow.replyto_email = header.replyto && header.replyto.email || '';
msgrow.replyto_name = header.replyto && header.replyto.name || '';
msgrow.to_list = header.to && header.to[0] || '';
msgrow.cc_list = header.cc && header.cc[0] || '';
msgrow.bcc_list = header.bcc && header.bcc[0] || '';
msgrow.subject = header.subject && header.subject[0] || '';
msgrow.messageid = header['message-id'] && header['message-id'][0] || '';
msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || '';
msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1');
msgrow.time = header.date;
msgrow.flags = toPgArray(msgrow.flags);
msgrow.refs = toPgArray(header.references);
if (header.references.length)
{
var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(pg.sql.in('messageid', header.references)).val(gen.ef());
if (!threadId)
{
[ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef());
}
if (threadId)
{
try
{
yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') })
.where({ id: threadId }).run(gen.ef());
}
catch (e)
{
throw new Error(''+e);
}
}
msgrow.thread_id = threadId;
}
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
[ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef());
if (!msgrow.thread_id)
{
[ msgrow.thread_id ] = yield pgtx.insert('threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(gen.ef());
yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef());
}
end_transaction();
}
catch (e0)
{
if (end_transaction)
end_transaction();
throw e0;
}
}
gen.run(Syncer.sync(cfg.accounts[0]), function() { process.exit() });