const Imap = require('imap'); class ImapManager { constructor() { this.accounts = {}; this.connections = {}; this.busy = {}; this.selected = {}; this.queue = {}; this.onIdle = {}; this.onStopIdle = {}; } setServer(accountId, settings) { this.accounts[accountId] = settings; } async getConnection(accountId, boxName, connKey, onIdle, onStopIdle) { connKey = accountId+(connKey||''); if (this.connections[connKey]) { let stoppingIdle = this.queue[connKey].length == 0; if (this.busy[connKey]) { // wait for the queue to finish await new Promise((r, e) => { this.queue[connKey].push(r); }); } if (stoppingIdle && this.onStopIdle[connKey]) { // run "stop idle" callback this.onStopIdle[connKey](accountId, this.connections[connKey]); } if (boxName && this.selected[connKey] != boxName) { // select different box await new Promise((r, e) => this.connections[connKey].openBox(boxName, false, r)); this.selected[connKey] = boxName; } this.busy[connKey] = true; return this.connections[connKey]; } let srv = new Imap(this.accounts[accountId]); // FIXME handle connection errors await new Promise((r, e) => { srv.once('ready', r); srv.connect(); }); await new Promise((r, e) => srv._enqueue('ENABLE QRESYNC', r)); // Monkey-patch node-imap to support VANISHED responses let oldUT = srv._parser._resUntagged; srv._parser._resUntagged = function() { let m; if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) { srv.emit('vanish', m[2].split(/,/).map(s => s.split(':'))); } oldUT.apply(this); }; srv.on('close', () => { delete this.connections[connKey]; if (this.srv == srv) { this.srv = null; } }); if (boxName) { await new Promise((r, e) => srv.openBox(boxName, false, r)); this.selected[connKey] = boxName; } this.connections[connKey] = srv; this.busy[connKey] = true; this.queue[connKey] = []; this.onIdle[connKey] = onIdle; this.onStopIdle[connKey] = onStopIdle; return srv; } releaseConnection(accountId, connKey, allowClose) { connKey = accountId + (connKey||''); this.busy[connKey] = false; if (this.queue[connKey].length) { (this.queue[connKey].shift())(); } else if (allowClose) { this.connections[connKey].end(); delete this.connections[connKey]; delete this.busy[connKey]; delete this.queue[connKey]; delete this.selected[connKey]; } else { if (this.onIdle[connKey]) this.onIdle[connKey](accountId, this.connections[connKey]); } } async runFetch(srv, what, params, processor, args) { let f = srv.fetch(what, params); let fetchState = { ...(args||{}), paused: false, synced: 0, parsing: 0, pending: [], results: [], srv: srv, end: false, }; let wait; await new Promise((resolve, reject) => { let end = () => { if (!fetchState.pending.length) { resolve(); } else { let m = fetchState.pending; fetchState.pending = []; processor(m, fetchState) .then(results => { if (results) { fetchState.results = fetchState.results.concat(results); } resolve(); }) .catch(reject); } }; f.on('message', (msg, seqnum) => { this.onMessage(fetchState, msg, seqnum, processor) .then(() => { if (fetchState.end && !fetchState.parsing) { end(); } }) .catch(reject); }); f.once('end', () => { fetchState.end = true; if (!fetchState.parsing) { end(); } }); }); return fetchState.results; } async onMessage(fetchState, msg, seqnum, processor) { let msgrow, attrs; fetchState.parsing++; try { [ msgrow, attrs ] = await this.parseMessage(msg, seqnum); } catch (e) { fetchState.parsing--; throw e; } fetchState.parsing--; // Workaround memory leak in node-imap // TODO: send pull request if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) { delete fetchState.srv._curReq.fetchCache[seqnum]; } fetchState.pending.push([ msgrow, attrs ]); if (!fetchState.paused && fetchState.pending.length >= 100 && !fetchState.nopause) { // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! fetchState.srv._parser._ignoreReadable = true; fetchState.paused = true; } if (fetchState.pending.length >= 100) { let m = fetchState.pending; fetchState.pending = []; let result = await processor(m, fetchState); if (result) { fetchState.results = fetchState.results.concat(result); } if (fetchState.paused) { fetchState.paused = false; fetchState.srv._parser._ignoreReadable = false; process.nextTick(fetchState.srv._parser._cbReadable); } } } async parseMessage(msg, seqnum) { let msgrow = {}; let attrs; msg.on('body', function(stream, info) { let buffer; stream.on('data', function(chunk) { if (!buffer) buffer = chunk; else buffer = Buffer.concat([ buffer, chunk ]); }); stream.once('end', function() { msgrow.headers = buffer; }); }); msg.once('attributes', function(a) { attrs = a; }); await new Promise((r, e) => msg.once('end', r)); msgrow.uid = attrs.uid; msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')).sort(); return [ msgrow, attrs ]; } } module.exports = ImapManager;