likeopera-backend/operetta.js

663 lines
22 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// TODO: Получать, парсить и хранить тела писем (и, вероятно, вложения) + индексировать тексты
// TODO: Группировка писем
// TODO: Висеть в виде демона и сразу получать новые письма (IDLE)
// TODO: Сделать веб-сервер
// TODO: Сделать подписки на новые сообщения по вебсокетам
// TODO: Чего я ещё хотел - интеграцию с maillog'ом и серверным спамфильтром
/**
* Нужные методы API:
* - список аккаунтов и папок
* - список сообщений в папке
* - содержимое сообщения
* - поиск по тексту
* - список тредов в папке, с сообщениями
* - проверить почту
* - пометить прочтённым
* - переместить
* - удалить
* - подсказка адресов To
* - отправить сообщение
*
* В определённом плане получается тупость - получается, что дублируешь
* функционал самого почтового сервера. Но шо ж с ним поделаешь, если он "ни ф силах"...
* Ведь по сути-то, MTA от такой штуки нужен только 1 метод: "добавить сообщение в папку".
*/
require('heapdump');
const gen = require("gen-thread");
const Imap = require('imap');
const iconv = require('iconv-lite');
const MailParser = require('mailparser').MailParser;
const bricks = require('pg-bricks');
const htmlawed = require('htmlawed');
const express = require('express');
const express_session = require('express-session');
const bodyparser = require('body-parser');
const multer = require('multer');
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
var cfg = require('./cfg.json');
var pg = bricks.configure(cfg.pg);
var Syncer = {
};
Syncer.app = express();
Syncer.app.use(bodyparser.urlencoded({ extended: false }));
Syncer.app.use(express_session({
secret: cfg.sessionSecret || '1083581xm1l3s1l39k',
resave: false,
saveUninitialized: false
}));
Syncer.app.get('/auth', function(req, res)
{
return res.type('html').send('<form action="/auth" method="post"><input name="login" /> <input name="password" type="password" /> <input type="submit" /></form>');
});
Syncer.app.post('/auth', function(req, res)
{
if (!req.body)
return res.sendStatus(400);
if (req.body.login == cfg.login && req.body.password == cfg.password)
{
req.session.auth = true;
return res.send({ ok: true });
}
return res.send({ ok: false });
});
Syncer.app.get('/folders', genRequest(function*(req, res)
{
var self = Syncer;
if (!req.session || !req.session.auth)
return res.sendStatus(401);
var [ accounts ] = yield pg.select('id, name, email').from('accounts').rows(gen.ef());
var [ folders ] = yield pg.select(
'id, account_id, name,'+
' (select count(*) from messages m where m.folder_id=f.id) total_count,'+
' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count'
).from('folders f').orderBy('account_id, name').rows(gen.ef());
var fh = {};
for (let i = 0; i < folders.length; i++)
{
fh[folders[i].account_id] = fh[folders[i].account_id] || [];
fh[folders[i].account_id].push(folders[i]);
}
for (let i = 0; i < accounts.length; i++)
{
accounts[i].folders = fh[accounts[i].id] || [];
}
return res.send({ accounts: accounts });
}));
Syncer.app.get('/messages', genRequest(function*(req, res)
{
var self = Syncer;
if (!req.session || !req.session.auth)
return res.sendStatus(401);
var folderId = req.query.folderId;
if (!folderId)
return res.status(500).send('Need `folderId` query parameter');
var limit = req.query.limit || 50;
var offset = req.query.offset || 0;
var [ msgs ] = yield pg.select('*').from('messages').where({ folder_id: folderId })
.limit(limit).offset(offset).rows(gen.ef());
return res.send({ messages: msgs });
}));
Syncer.app.get('/message', genRequest(function*(req, res)
{
var self = Syncer;
if (!req.session || !req.session.auth)
return res.sendStatus(401);
var msgId = req.query.msgId;
var [ msg ] = yield pg.select('m.*, f.name folder_name, f.account_id')
.from('messages m').join('folders f', 'f.id=m.folder_id')
.where({ 'm.id': msgId }).row(gen.ef());
if (!msg)
return res.send({ error: 'not-found' });
if (!msg.body_html && !msg.body_text)
{
var srv = yield* self.getConnection(msg.account_id, boxName);
var [ upd ] = yield* self.runFetch(msg.uid, { bodies: '' }, msg.folder_id, 'getBody');
self.releaseConnection(accountId);
return res.send({ msg: { ...msg, ...upd } });
}
}));
Syncer.getBody = function*(messages, boxId)
{
var self = this;
var p = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' });
for (var i = 0; i < messages.length; i++)
{
let msg = messages[i];
p.on('end', gen.cb());
p.write(msg[0].headers);
let obj = yield p.end();
delete msg[0].headers;
obj.html = htmlawed.sanitize(obj.html, { safe: 1, elements: '* +style' });
let upd = { body_text: obj.text, body_html: obj.html };
upd.body_html_text = obj.html.replace(/<style[^>]*>.*<\/style\s*>|<\/?[^>]*>/g, '');
yield pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef());
if (messages.length == 1)
return [ upd ];
}
};
Syncer.getConnection = function*(accountId, boxName)
{
var self = this;
if (self.connections[accountId])
{
if (self.busy[accountId])
yield self.queue[accountId].push(gen.cb());
if (boxName && self.selected[accountId] != boxName)
{
yield srv.openBox(boxName, true, gen.ef());
self.selected[accountId] = boxName;
}
self.busy[accountId] = true;
return self.connections[accountId];
}
var srv = new Imap(self.accounts[accountId].imap);
srv.once('ready', gen.cb());
// FIXME handle connection errors
yield srv.connect();
yield srv._enqueue('ENABLE QRESYNC', gen.cb());
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
self.vanished.push(m[2].split(/,/).map(s => s.split(':')));
oldUT.apply(this);
};
srv.on('close', function()
{
delete self.connections[accountId];
if (self.srv == srv)
delete self.srv;
});
if (boxName)
{
yield srv.openBox(boxName, true, gen.ef());
self.selected[accountId] = boxName;
}
self.connections[accountId] = srv;
self.busy[accountId] = true;
self.queue[accountId] = [];
return srv;
}
Syncer.releaseConnection = function(accountId)
{
var self = this;
self.busy[accountId] = false;
if (self.queue[accountId].length)
(self.queue[accountId].shift())();
}
Syncer.sync = function*(account)
{
var self = this;
var accountId;
var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef());
if (rows[0] && rows[0].id)
accountId = rows[0].id;
else
{
var [ row ] = yield pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('id').row(gen.ef());
accountId = row.id;
}
var srv = new Imap(account.imap);
self.srv = srv;
srv.once('ready', gen.cb());
yield srv.connect();
yield srv._enqueue('ENABLE QRESYNC', gen.cb());
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
self.vanished.push(m[2].split(/,/).map(s => s.split(':')));
oldUT.apply(this);
};
var [ boxes ] = yield srv.getBoxes(gen.ef());
for (var k in boxes)
{
var [ box ] = yield srv.openBox(k, true, gen.ef());
var boxId, changedSince = 0;
// IMAP sync: http://tools.ietf.org/html/rfc4549
var [ row ] = yield pg.select('*').from('folders')
.where({ account_id: accountId, name: box.name }).rows(gen.ef());
self.versionTag = 0;
if (row.length)
{
row = row[0];
changedSince = row.highestmodseq;
boxId = row.id;
if (row.uidvalidity != box.uidvalidity)
{
yield pg.delete('messages').where({ folder_id: row.id })
.where(pg.sql('uid is not null')).run(gen.ef());
}
else
{
[ self.versionTag ] = yield pg.select('MAX(vertag)').from('messages')
.where({ folder_id: row.id }).val(gen.ef());
self.versionTag = self.versionTag || 0;
}
}
else
{
[ row ] = yield pg.insert('folders', {
name: box.name,
uidvalidity: box.uidvalidity,
account_id: accountId,
highestmodseq: 0,
//unread_count: box.messages.new,
//total_count: box.messages.total,
}).returning('id').row(gen.ef());
boxId = row.id;
}
self.missing = [];
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
if (changedSince)
{
process.stderr.write(account.email+'/'+box.name+': quick resync\n');
self.vanished = [];
yield* self.runFetch('1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags');
if (self.vanished.length)
{
let lst = [];
let dia = [];
for (let i = 0; i < self.vanished.length; i++)
{
if (self.vanished[i][1])
dia.push('uid >= '+self.vanished[i][0]+' AND uid <= '+self.vanished[i][1]);
else
lst.push(self.vanished[i][0]);
}
if (lst.length)
dia.push('uid IN ('+lst.join(',')+')');
yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')')));
}
}
else if (maxUid)
{
// list messages, update flags and version tag
self.versionTag++;
if (self.versionTag >= 0x7fffffff)
{
yield pg.update('messages', { vertag: 0 }).where({ folder_id: boxId }).run(gen.ef());
self.versionTag = 1;
}
process.stderr.write(account.email+'/'+box.name+': full resync\n');
process.stderr.write('\rsynchronizing 0');
yield* self.runFetch('1:'+maxUid, {}, boxId, 'updateFlags');
process.stderr.write('\n');
// delete messages removed from IMAP server
yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('uid is not null'), pg.sql.lt('vertag', self.versionTag)));
}
// fetch new messages
self.missing.push((maxUid ? maxUid+1 : 1)+':*');
yield* self.runFetch(self.missing, {
size: true,
bodies: 'HEADER'
}, boxId, 'saveMessages');
yield pg.update('folders', {
uidvalidity: box.uidvalidity,
//unread_count: box.messages.new,
highestmodseq: box.highestmodseq||0
}).where({ id: row.id }).run(gen.ef());
yield srv.closeBox(gen.cb());
}
srv.end();
self.srv = null;
}
Syncer.deleteMessages = function*(where)
{
console.log(where+'');
yield pg.update('threads', { first_msg: null })
.where(pg.sql('first_msg IN ('+pg.select('id').from('messages').where(where)+')'))
.run(gen.ef());
yield pg.delete('messages').where(where).run(gen.ef());
yield pg.update('threads',
{ first_msg: pg.sql('('+
pg.select('id').from('messages').where({ thread_id: pg.sql('threads.id') }).orderBy('time').limit(1)
+')') }).where(pg.sql('first_msg IS NULL')).run(gen.ef());
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
}
Syncer.runFetch = function*(what, params, boxId, processor)
{
var self = this;
var f = self.srv.fetch(what, params);
self.parsed = 0;
self.paused = false;
self.synced = 0;
self.pending = [];
self.results = [];
var cb, wait;
f.on('message', function(msg, seqnum)
{
gen.run(self.onMessage(msg, seqnum, boxId, processor), checkFinish, function(e) { checkFinish(); throw e; });
});
cb = gen.cb();
yield f.once('end', function()
{
wait = true;
if (self.parsed <= 0)
cb();
else if (self.pending.length > 0)
{
var result = gen.run(self[processor](self.pending, boxId), saveLast, function(e) { saveLast(); throw e; });
if (result)
self.results = self.results.concat(result);
}
});
if (self.results.length > 0)
{
let r = self.results;
delete self.results;
return r;
}
function saveLast()
{
self.parsed -= self.pending.length;
self.pending = [];
checkFinish();
}
function checkFinish()
{
if (self.parsed <= 0 && wait)
cb();
}
};
Syncer.onMessage = function*(msg, seqnum, boxId, processor)
{
var self = this;
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId);
self.pending.push([ msgrow, attrs ]);
self.parsed++;
if (!self.paused && self.parsed >= 100)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
self.srv._parser._ignoreReadable = true;
self.paused = true;
}
if (self.pending.length >= 100)
{
var m = self.pending;
self.pending = [];
var err;
var result;
try
{
result = yield gen.run(self[processor](m, boxId), gen.cb());
if (result)
self.results = self.results.concat(result);
}
catch (e)
{
err = e;
}
self.parsed -= m.length;
if (self.paused && self.parsed < 100)
{
self.paused = false;
self.srv._parser._ignoreReadable = false;
process.nextTick(self.srv._parser._cbReadable);
}
if (err)
throw err;
}
}
Syncer.parseMessage = function*(msg, seqnum, boxId)
{
var msgrow = {};
var attrs;
msg.on('body', function(stream, info)
{
var buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
var b = buffer.toString('utf8');
if (b.indexOf('<27>') >= 0)
{
let enc = /Content-type:\s*[^;\n]*;\s*charset=(\S+)/i.exec(b);
enc = enc ? enc[1] : 'windows-1251';
try { b = iconv.decode(buffer, enc); }
catch (e) {}
}
if (b.indexOf('\0') >= 0)
b = b.substr(0, b.indexOf('\0'));
msgrow.headers = b;
});
});
msg.once('attributes', function(a) {
attrs = a;
});
yield msg.once('end', gen.cb());
msgrow.uid = attrs.uid;
msgrow.folder_id = boxId;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen');
msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
// Workaround memory leak in node-imap
// TODO: send pull request
if (this.srv._curReq && this.srv._curReq.fetchCache)
delete this.srv._curReq.fetchCache[seqnum];
return [ msgrow, attrs ];
}
Syncer.updateFlags = function*(messages, boxId)
{
yield gen.throttle(3);
var self = this;
var rows = messages.map(m => ({
uid: m[0].uid,
flags: toPgArray(m[0].flags)
}));
var [ updated ] = yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]'), vertag: self.versionTag })
.from('('+pg.sql.values(rows)+') AS t (uid, flags)')
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).returning('m.uid').rows(gen.ef());
var uh = {}, i;
for (i = 0; i < updated.length; i++)
uh[updated[i].uid] = true;
for (i = 0; i < messages.length; i++)
if (!uh[messages[i][0].uid])
self.missing.push(messages[i][0].uid);
self.synced += messages.length;
process.stderr.write('\rsynchronizing '+self.synced);
}
Syncer.saveMessages = function*(messages, boxId)
{
var self = this;
yield gen.throttle(2);
var uids = messages.map(m => m[1].uid);
var [ exist ] = yield pg.select('uid, flags').from('messages')
.where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef());
uids = {};
for (var i = 0; i < exist.length; i++)
uids[exist[i].uid] = true;
for (var i = 0; i < messages.length; i++)
if (!uids[messages[i][1].uid])
yield* this.addMessage(messages[i][0], messages[i][1]);
}
Syncer.addMessage = function*(msgrow, attrs)
{
var self = this;
var pgtx, end_transaction;
try
{
[ pgtx, end_transaction ] = yield pg.transaction(gen.cb(), function(e) { if (e) throw e; });
var header = Imap.parseHeader(msgrow.headers);
for (var i in header)
for (var k = 0; k < header[i].length; k++)
header[i][k] = header[i][k].replace(/\x00/g, '');
header.from = header.from && splitEmails(header.from[0])[0];
header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0];
var re = /(<[^>]*>)/;
header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re));
if (header.references.length)
{
if (header.references.length > 10)
header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9));
if (!header['in-reply-to'] || !header['in-reply-to'][0])
header['in-reply-to'] = [ header.references[header.references.length-1] ];
else if (header.references[header.references.length-1] != header['in-reply-to'][0])
header.references.push(header['in-reply-to'][0]);
}
if (header.date)
{
var t = Date.parse(header.date[0]);
if (!isNaN(t))
header.date = new Date(t);
else
header.date = null;
}
if (!header.date)
header.date = new Date(attrs.date);
msgrow.from_email = header.from && header.from.email || '';
msgrow.from_name = header.from && header.from.name || '';
msgrow.replyto_email = header.replyto && header.replyto.email || '';
msgrow.replyto_name = header.replyto && header.replyto.name || '';
msgrow.to_list = header.to && header.to[0] || '';
msgrow.cc_list = header.cc && header.cc[0] || '';
msgrow.bcc_list = header.bcc && header.bcc[0] || '';
msgrow.subject = header.subject && header.subject[0] || '';
msgrow.messageid = header['message-id'] && header['message-id'][0] || '';
msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || '';
msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1');
msgrow.time = header.date;
msgrow.flags = toPgArray(msgrow.flags);
msgrow.refs = toPgArray(header.references);
msgrow.vertag = self.versionTag;
var thisIsFirst = false;
if (header.references.length)
{
let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(pg.sql.in('messageid', header.references)).val(gen.ef());
if (!threadId)
{
[ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef());
if (threadId)
thisIsFirst = true;
}
msgrow.thread_id = threadId;
}
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
[ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef());
if (!msgrow.thread_id)
{
[ msgrow.thread_id ] = yield pgtx.insert('threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(gen.ef());
yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef());
}
else
{
let upd = pgtx.update('threads', { msg_count: pg.sql('msg_count+1') });
if (thisIsFirst)
upd.first_msg = msgrow.id;
yield upd.where({ id: msgrow.threadId }).run(gen.ef());
}
end_transaction();
}
catch (e0)
{
if (end_transaction)
end_transaction();
throw e0;
}
}
Syncer.app.listen(8057);
gen.run(function*()
{
for (var i = 0; i < cfg.accounts.length; i++)
yield* Syncer.sync(cfg.accounts[i]);
});
function genRequest(fn)
{
return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e));
}
function splitEmails(s)
{
var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|<?([^<>]+)>?)/; // '
var m, r = [];
while (m = re.exec(s))
{
s = s.substr(m[0].length);
r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() });
}
return r;
}
function toPgArray(a)
{
a = JSON.stringify(a);
return '{'+a.substring(1, a.length-1)+'}';
}