likeopera-backend/ImapManager.js

241 lines
6.5 KiB
JavaScript
Raw Normal View History

2016-09-11 22:04:37 +03:00
const gen = require('gen-thread');
const Imap = require('imap');
module.exports = ImapManager;
function ImapManager()
{
this.accounts = {};
this.connections = {};
this.busy = {};
this.selected = {};
this.queue = {};
this.onIdle = {};
this.onStopIdle = {};
2016-09-11 22:04:37 +03:00
}
ImapManager.prototype.setServer = function(accountId, settings)
{
this.accounts[accountId] = settings;
}
ImapManager.prototype.getConnection = function*(accountId, boxName, connKey, onIdle, onStopIdle)
2016-09-11 22:04:37 +03:00
{
var self = this;
connKey = accountId+(connKey||'');
if (self.connections[connKey])
{
let stoppingIdle = self.queue[connKey].length == 0;
2016-09-11 22:04:37 +03:00
if (self.busy[connKey])
{
// wait for the queue to finish
2016-09-11 22:04:37 +03:00
yield self.queue[connKey].push(gen.cb());
}
if (stoppingIdle && self.onStopIdle[connKey])
{
// run "stop idle" callback
self.onStopIdle[connKey](accountId, self.connections[connKey]);
}
2016-09-11 22:04:37 +03:00
if (boxName && self.selected[connKey] != boxName)
{
// select different box
yield self.connections[connKey].openBox(boxName, true, gen.ef());
2016-09-11 22:04:37 +03:00
self.selected[connKey] = boxName;
}
self.busy[connKey] = true;
return self.connections[connKey];
}
var srv = new Imap(self.accounts[accountId]);
srv.once('ready', gen.cb());
// FIXME handle connection errors
yield srv.connect();
yield srv._enqueue('ENABLE QRESYNC', gen.cb());
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
{
srv.emit('vanish', m[2].split(/,/).map(s => s.split(':')));
}
oldUT.apply(this);
};
srv.on('close', function()
{
delete self.connections[connKey];
if (self.srv == srv)
delete self.srv;
});
if (boxName)
{
yield srv.openBox(boxName, true, gen.ef());
self.selected[connKey] = boxName;
}
self.connections[connKey] = srv;
self.busy[connKey] = true;
self.queue[connKey] = [];
self.onIdle[connKey] = onIdle;
self.onStopIdle[connKey] = onStopIdle;
2016-09-11 22:04:37 +03:00
return srv;
}
ImapManager.prototype.releaseConnection = function(accountId, connKey, allowClose)
{
var self = this;
connKey = accountId+(connKey||'');
self.busy[connKey] = false;
if (self.queue[connKey].length)
{
2016-09-11 22:04:37 +03:00
(self.queue[connKey].shift())();
}
2016-09-11 22:04:37 +03:00
else if (allowClose)
{
self.connections[connKey].end();
delete self.connections[connKey];
delete self.busy[connKey];
delete self.queue[connKey];
delete self.selected[connKey];
}
else
{
if (self.onIdle[connKey])
self.onIdle[connKey](accountId, self.connections[connKey]);
}
2016-09-11 22:04:37 +03:00
}
ImapManager.prototype.runFetch = function*(srv, what, params, processor, args)
{
var self = this;
var f = srv.fetch(what, params);
var fetchState = {
...(args||{}),
parsed: 0,
paused: false,
synced: 0,
pending: [],
results: [],
srv: srv
};
var cb, wait;
f.on('message', function(msg, seqnum)
{
gen.run(self.onMessage(fetchState, msg, seqnum, processor), checkFinish, function(e) { checkFinish(); throw e; });
});
cb = gen.cb();
yield f.once('end', function()
{
wait = true;
if (fetchState.parsed <= 0)
cb();
else if (fetchState.pending.length > 0)
gen.run(processor(fetchState.pending, fetchState), saveLast, function(e) { saveLast(); throw e; });
});
if (fetchState.results.length > 0)
{
return fetchState.results;
}
function saveLast(r)
{
if (r)
fetchState.results = fetchState.results.concat(r);
fetchState.parsed -= fetchState.pending.length;
fetchState.pending = [];
checkFinish();
}
function checkFinish()
{
if (fetchState.parsed <= 0 && wait)
cb();
}
};
ImapManager.prototype.onMessage = function*(fetchState, msg, seqnum, processor)
{
var self = this;
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum);
// Workaround memory leak in node-imap
// TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
delete fetchState.srv._curReq.fetchCache[seqnum];
fetchState.pending.push([ msgrow, attrs ]);
fetchState.parsed++;
2016-10-09 18:56:04 +03:00
if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause)
2016-09-11 22:04:37 +03:00
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
fetchState.srv._parser._ignoreReadable = true;
fetchState.paused = true;
}
if (fetchState.pending.length >= 100)
{
var m = fetchState.pending;
fetchState.pending = [];
var err;
var result;
try
{
result = yield gen.run(processor(m, fetchState), gen.cb());
if (result)
fetchState.results = fetchState.results.concat(result);
}
catch (e)
{
err = e;
}
fetchState.parsed -= m.length;
if (fetchState.paused && fetchState.parsed < 100)
{
fetchState.paused = false;
fetchState.srv._parser._ignoreReadable = false;
process.nextTick(fetchState.srv._parser._cbReadable);
}
if (err)
throw err;
}
}
ImapManager.prototype.parseMessage = function*(msg, seqnum)
{
var msgrow = {};
var attrs;
msg.on('body', function(stream, info)
{
var buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
2016-10-05 13:50:33 +03:00
msgrow.headers = buffer;
2016-09-11 22:04:37 +03:00
});
});
msg.once('attributes', function(a) {
attrs = a;
});
yield msg.once('end', gen.cb());
msgrow.uid = attrs.uid;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen');
nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
msgrow.flags = nf.sort();
return [ msgrow, attrs ];
}