likeopera-backend/ImapManager.js

262 lines
7.5 KiB
JavaScript
Raw Normal View History

2016-09-11 22:04:37 +03:00
const Imap = require('imap');
class ImapManager
2016-09-11 22:04:37 +03:00
{
constructor()
{
this.accounts = {};
this.connections = {};
this.busy = {};
this.selected = {};
this.queue = {};
this.onIdle = {};
this.onStopIdle = {};
}
2016-09-11 22:04:37 +03:00
setServer(accountId, settings)
{
this.accounts[accountId] = settings;
}
2016-09-11 22:04:37 +03:00
async getConnection(accountId, boxName, connKey, onIdle, onStopIdle)
2016-09-11 22:04:37 +03:00
{
connKey = accountId+(connKey||'');
if (this.connections[connKey])
{
let stoppingIdle = this.queue[connKey].length == 0;
if (this.busy[connKey])
{
// wait for the queue to finish
2019-05-10 01:26:31 +03:00
await new Promise((r, e) =>
{
this.queue[connKey].push(r);
});
}
if (stoppingIdle && this.onStopIdle[connKey])
{
// run "stop idle" callback
this.onStopIdle[connKey](accountId, this.connections[connKey]);
}
if (boxName && this.selected[connKey] != boxName)
{
// select different box
await new Promise((r, e) => this.connections[connKey].openBox(boxName, false, r));
this.selected[connKey] = boxName;
}
this.busy[connKey] = true;
return this.connections[connKey];
}
2019-05-10 01:26:31 +03:00
let srv = new Imap(this.accounts[accountId]);
// FIXME handle connection errors
await new Promise((r, e) =>
{
srv.once('ready', r);
srv.connect();
});
await new Promise((r, e) => srv._enqueue('ENABLE QRESYNC', r));
// Monkey-patch node-imap to support VANISHED responses
2019-05-10 01:26:31 +03:00
let oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
2016-09-11 22:04:37 +03:00
{
2019-05-10 01:26:31 +03:00
let m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
{
srv.emit('vanish', m[2].split(/,/).map(s => s.split(':')));
}
oldUT.apply(this);
};
srv.on('close', () =>
{
delete this.connections[connKey];
if (this.srv == srv)
{
this.srv = null;
}
});
2016-09-11 22:04:37 +03:00
if (boxName)
{
await new Promise((r, e) => srv.openBox(boxName, false, r));
this.selected[connKey] = boxName;
}
2016-09-11 22:04:37 +03:00
this.connections[connKey] = srv;
this.busy[connKey] = true;
this.queue[connKey] = [];
this.onIdle[connKey] = onIdle;
this.onStopIdle[connKey] = onStopIdle;
return srv;
}
2016-09-11 22:04:37 +03:00
releaseConnection(accountId, connKey, allowClose)
2016-09-11 22:04:37 +03:00
{
connKey = accountId + (connKey||'');
this.busy[connKey] = false;
if (this.queue[connKey].length)
{
(this.queue[connKey].shift())();
}
else if (allowClose)
2016-09-11 22:04:37 +03:00
{
this.connections[connKey].end();
delete this.connections[connKey];
delete this.busy[connKey];
delete this.queue[connKey];
delete this.selected[connKey];
2016-09-11 22:04:37 +03:00
}
else
{
if (this.onIdle[connKey])
this.onIdle[connKey](accountId, this.connections[connKey]);
}
}
2016-09-11 22:04:37 +03:00
async runFetch(srv, what, params, processor, args)
2016-09-11 22:04:37 +03:00
{
let f = srv.fetch(what, params);
2016-09-11 22:04:37 +03:00
let fetchState = {
...(args||{}),
paused: false,
synced: 0,
parsing: 0,
pending: [],
results: [],
srv: srv,
end: false,
};
2016-09-11 22:04:37 +03:00
let wait;
2016-09-11 22:04:37 +03:00
await new Promise((resolve, reject) =>
{
let end = () =>
{
if (!fetchState.pending.length)
{
2019-05-13 16:23:53 +03:00
resolve();
}
else
{
let m = fetchState.pending;
fetchState.pending = [];
processor(m, fetchState)
.then(results =>
{
if (results)
{
fetchState.results = fetchState.results.concat(results);
}
resolve();
})
2019-05-17 16:57:37 +03:00
.catch(reject);
}
};
2016-09-11 22:04:37 +03:00
f.on('message', (msg, seqnum) =>
{
this.onMessage(fetchState, msg, seqnum, processor)
.then(() =>
{
if (fetchState.end && !fetchState.parsing)
{
end();
}
})
2019-05-18 03:01:13 +03:00
.catch(reject);
});
2016-09-11 22:04:37 +03:00
f.once('end', () =>
{
fetchState.end = true;
if (!fetchState.parsing)
{
end();
}
});
});
2016-09-11 22:04:37 +03:00
return fetchState.results;
2016-09-11 22:04:37 +03:00
}
async onMessage(fetchState, msg, seqnum, processor)
2016-09-11 22:04:37 +03:00
{
let msgrow, attrs;
fetchState.parsing++;
try
{
[ msgrow, attrs ] = await this.parseMessage(msg, seqnum);
}
catch (e)
{
fetchState.parsing--;
throw e;
}
fetchState.parsing--;
// Workaround memory leak in node-imap
// TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
2016-09-11 22:04:37 +03:00
{
delete fetchState.srv._curReq.fetchCache[seqnum];
2016-09-11 22:04:37 +03:00
}
fetchState.pending.push([ msgrow, attrs ]);
if (!fetchState.paused && fetchState.pending.length >= 100 && !fetchState.nopause)
2016-09-11 22:04:37 +03:00
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
fetchState.srv._parser._ignoreReadable = true;
fetchState.paused = true;
2016-09-11 22:04:37 +03:00
}
if (fetchState.pending.length >= 100)
2016-09-11 22:04:37 +03:00
{
let m = fetchState.pending;
fetchState.pending = [];
let result = await processor(m, fetchState);
if (result)
{
fetchState.results = fetchState.results.concat(result);
}
if (fetchState.paused)
{
fetchState.paused = false;
fetchState.srv._parser._ignoreReadable = false;
process.nextTick(fetchState.srv._parser._cbReadable);
}
2016-09-11 22:04:37 +03:00
}
}
async parseMessage(msg, seqnum)
2016-09-11 22:04:37 +03:00
{
let msgrow = {};
let attrs;
msg.on('body', function(stream, info)
2016-09-11 22:04:37 +03:00
{
let buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
msgrow.headers = buffer;
});
2016-09-11 22:04:37 +03:00
});
msg.once('attributes', function(a)
2016-09-11 22:04:37 +03:00
{
attrs = a;
2016-09-11 22:04:37 +03:00
});
await new Promise((r, e) => msg.once('end', r));
msgrow.uid = attrs.uid;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')).sort();
return [ msgrow, attrs ];
}
2016-09-11 22:04:37 +03:00
}
2019-05-10 01:26:31 +03:00
module.exports = ImapManager;