likeopera-backend/ImapManager.js

241 lines
6.5 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

const gen = require('gen-thread');
const Imap = require('imap');
module.exports = ImapManager;
function ImapManager()
{
this.accounts = {};
this.connections = {};
this.busy = {};
this.selected = {};
this.queue = {};
this.onIdle = {};
this.onStopIdle = {};
}
ImapManager.prototype.setServer = function(accountId, settings)
{
this.accounts[accountId] = settings;
}
ImapManager.prototype.getConnection = function*(accountId, boxName, connKey, onIdle, onStopIdle)
{
var self = this;
connKey = accountId+(connKey||'');
if (self.connections[connKey])
{
let stoppingIdle = self.queue[connKey].length == 0;
if (self.busy[connKey])
{
// wait for the queue to finish
yield self.queue[connKey].push(gen.cb());
}
if (stoppingIdle && self.onStopIdle[connKey])
{
// run "stop idle" callback
self.onStopIdle[connKey](accountId, self.connections[connKey]);
}
if (boxName && self.selected[connKey] != boxName)
{
// select different box
yield self.connections[connKey].openBox(boxName, true, gen.ef());
self.selected[connKey] = boxName;
}
self.busy[connKey] = true;
return self.connections[connKey];
}
var srv = new Imap(self.accounts[accountId]);
srv.once('ready', gen.cb());
// FIXME handle connection errors
yield srv.connect();
yield srv._enqueue('ENABLE QRESYNC', gen.cb());
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
{
srv.emit('vanish', m[2].split(/,/).map(s => s.split(':')));
}
oldUT.apply(this);
};
srv.on('close', function()
{
delete self.connections[connKey];
if (self.srv == srv)
delete self.srv;
});
if (boxName)
{
yield srv.openBox(boxName, true, gen.ef());
self.selected[connKey] = boxName;
}
self.connections[connKey] = srv;
self.busy[connKey] = true;
self.queue[connKey] = [];
self.onIdle[connKey] = onIdle;
self.onStopIdle[connKey] = onStopIdle;
return srv;
}
ImapManager.prototype.releaseConnection = function(accountId, connKey, allowClose)
{
var self = this;
connKey = accountId+(connKey||'');
self.busy[connKey] = false;
if (self.queue[connKey].length)
{
(self.queue[connKey].shift())();
}
else if (allowClose)
{
self.connections[connKey].end();
delete self.connections[connKey];
delete self.busy[connKey];
delete self.queue[connKey];
delete self.selected[connKey];
}
else
{
if (self.onIdle[connKey])
self.onIdle[connKey](accountId, self.connections[connKey]);
}
}
ImapManager.prototype.runFetch = function*(srv, what, params, processor, args)
{
var self = this;
var f = srv.fetch(what, params);
var fetchState = {
...(args||{}),
parsed: 0,
paused: false,
synced: 0,
pending: [],
results: [],
srv: srv
};
var cb, wait;
f.on('message', function(msg, seqnum)
{
gen.run(self.onMessage(fetchState, msg, seqnum, processor), checkFinish, function(e) { checkFinish(); throw e; });
});
cb = gen.cb();
yield f.once('end', function()
{
wait = true;
if (fetchState.parsed <= 0)
cb();
else if (fetchState.pending.length > 0)
gen.run(processor(fetchState.pending, fetchState), saveLast, function(e) { saveLast(); throw e; });
});
if (fetchState.results.length > 0)
{
return fetchState.results;
}
function saveLast(r)
{
if (r)
fetchState.results = fetchState.results.concat(r);
fetchState.parsed -= fetchState.pending.length;
fetchState.pending = [];
checkFinish();
}
function checkFinish()
{
if (fetchState.parsed <= 0 && wait)
cb();
}
};
ImapManager.prototype.onMessage = function*(fetchState, msg, seqnum, processor)
{
var self = this;
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum);
// Workaround memory leak in node-imap
// TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
delete fetchState.srv._curReq.fetchCache[seqnum];
fetchState.pending.push([ msgrow, attrs ]);
fetchState.parsed++;
if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
fetchState.srv._parser._ignoreReadable = true;
fetchState.paused = true;
}
if (fetchState.pending.length >= 100)
{
var m = fetchState.pending;
fetchState.pending = [];
var err;
var result;
try
{
result = yield gen.run(processor(m, fetchState), gen.cb());
if (result)
fetchState.results = fetchState.results.concat(result);
}
catch (e)
{
err = e;
}
fetchState.parsed -= m.length;
if (fetchState.paused && fetchState.parsed < 100)
{
fetchState.paused = false;
fetchState.srv._parser._ignoreReadable = false;
process.nextTick(fetchState.srv._parser._cbReadable);
}
if (err)
throw err;
}
}
ImapManager.prototype.parseMessage = function*(msg, seqnum)
{
var msgrow = {};
var attrs;
msg.on('body', function(stream, info)
{
var buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
msgrow.headers = buffer;
});
});
msg.once('attributes', function(a) {
attrs = a;
});
yield msg.once('end', gen.cb());
msgrow.uid = attrs.uid;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen');
nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
msgrow.flags = nf.sort();
return [ msgrow, attrs ];
}