diff --git a/operetta.js b/operetta.js index a00072e..8bf9b21 100644 --- a/operetta.js +++ b/operetta.js @@ -42,8 +42,13 @@ process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; var cfg = require('./cfg.json'); var pg = bricks.configure(cfg.pg); +pg._pg.types.setTypeParser(1082, 'text', val => val); // НЕ ПАРСИТЬ ДАТЫ ! ! ! var Syncer = { + selected: {}, + connections: {}, + busy: {}, + queue: {} }; Syncer.app = express(); @@ -108,7 +113,7 @@ Syncer.app.get('/messages', genRequest(function*(req, res) var limit = req.query.limit || 50; var offset = req.query.offset || 0; var [ msgs ] = yield pg.select('*').from('messages').where({ folder_id: folderId }) - .limit(limit).offset(offset).rows(gen.ef()); + .orderBy('time desc').limit(limit).offset(offset).rows(gen.ef()); return res.send({ messages: msgs }); })); @@ -119,17 +124,19 @@ Syncer.app.get('/message', genRequest(function*(req, res) return res.sendStatus(401); var msgId = req.query.msgId; var [ msg ] = yield pg.select('m.*, f.name folder_name, f.account_id') - .from('messages m').join('folders f', 'f.id=m.folder_id') + .from('messages m').join('folders f', pg.sql('f.id=m.folder_id')) .where({ 'm.id': msgId }).row(gen.ef()); if (!msg) return res.send({ error: 'not-found' }); + delete msg.text_index; if (!msg.body_html && !msg.body_text) { - var srv = yield* self.getConnection(msg.account_id, boxName); - var [ upd ] = yield* self.runFetch(msg.uid, { bodies: '' }, msg.folder_id, 'getBody'); - self.releaseConnection(accountId); + var srv = yield* self.getConnection(msg.account_id, msg.folder_name); + var [ upd ] = yield* self.runFetch(srv, msg.uid, { bodies: '' }, msg.folder_id, 'getBody'); + self.releaseConnection(msg.account_id); return res.send({ msg: { ...msg, ...upd } }); } + return res.send({ msg: msg }); })); Syncer.getBody = function*(messages, boxId) @@ -141,10 +148,9 @@ Syncer.getBody = function*(messages, boxId) let msg = messages[i]; p.on('end', gen.cb()); p.write(msg[0].headers); - let obj = yield p.end(); - delete msg[0].headers; - obj.html = htmlawed.sanitize(obj.html, { safe: 1, elements: '* +style' }); - let upd = { body_text: obj.text, body_html: obj.html }; + let [ obj ] = yield p.end(); + obj.html = htmlawed.sanitize(obj.html||'', { safe: 1, elements: '* +style' }); + let upd = { body_text: obj.text||'', body_html: obj.html }; upd.body_html_text = obj.html.replace(/]*>.*<\/style\s*>|<\/?[^>]*>/g, ''); yield pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef()); if (messages.length == 1) @@ -152,23 +158,33 @@ Syncer.getBody = function*(messages, boxId) } }; -Syncer.getConnection = function*(accountId, boxName) +Syncer.getConnection = function*(accountId, boxName, connKey) { var self = this; - if (self.connections[accountId]) + connKey = accountId+(connKey||''); + if (self.connections[connKey]) { - if (self.busy[accountId]) - yield self.queue[accountId].push(gen.cb()); - if (boxName && self.selected[accountId] != boxName) + if (self.busy[connKey]) + yield self.queue[connKey].push(gen.cb()); + if (boxName && self.selected[connKey] != boxName) { yield srv.openBox(boxName, true, gen.ef()); - self.selected[accountId] = boxName; + self.selected[connKey] = boxName; } - self.busy[accountId] = true; - return self.connections[accountId]; + self.busy[connKey] = true; + return self.connections[connKey]; } - var srv = new Imap(self.accounts[accountId].imap); + if (!self.accounts) + { + self.accounts = {}; + let [ rows ] = yield pg.select('*').from('accounts').rows(gen.ef()); + for (var i = 0; i < rows.length; i++) + { + self.accounts[rows[i].id] = rows[i]; + } + } + var srv = new Imap(self.accounts[accountId].settings.imap); srv.once('ready', gen.cb()); // FIXME handle connection errors @@ -181,13 +197,13 @@ Syncer.getConnection = function*(accountId, boxName) { var m; if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) - self.vanished.push(m[2].split(/,/).map(s => s.split(':'))); + self.vanished = self.vanished.concat(self.vanished, m[2].split(/,/).map(s => s.split(':'))); oldUT.apply(this); }; srv.on('close', function() { - delete self.connections[accountId]; + delete self.connections[connKey]; if (self.srv == srv) delete self.srv; }); @@ -195,21 +211,30 @@ Syncer.getConnection = function*(accountId, boxName) if (boxName) { yield srv.openBox(boxName, true, gen.ef()); - self.selected[accountId] = boxName; + self.selected[connKey] = boxName; } - self.connections[accountId] = srv; - self.busy[accountId] = true; - self.queue[accountId] = []; + self.connections[connKey] = srv; + self.busy[connKey] = true; + self.queue[connKey] = []; return srv; } -Syncer.releaseConnection = function(accountId) +Syncer.releaseConnection = function(accountId, connKey, allowClose) { var self = this; - self.busy[accountId] = false; - if (self.queue[accountId].length) - (self.queue[accountId].shift())(); + connKey = accountId+(connKey||''); + self.busy[connKey] = false; + if (self.queue[connKey].length) + (self.queue[connKey].shift())(); + else if (allowClose) + { + self.connections[connKey].end(); + delete self.connections[connKey]; + delete self.busy[connKey]; + delete self.queue[connKey]; + delete self.selected[connKey]; + } } Syncer.sync = function*(account) @@ -230,23 +255,8 @@ Syncer.sync = function*(account) }).returning('id').row(gen.ef()); accountId = row.id; } - var srv = new Imap(account.imap); - self.srv = srv; - - srv.once('ready', gen.cb()); - yield srv.connect(); - yield srv._enqueue('ENABLE QRESYNC', gen.cb()); - - // Monkey-patch node-imap to support VANISHED responses - var oldUT = srv._parser._resUntagged; - srv._parser._resUntagged = function() - { - var m; - if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) - self.vanished.push(m[2].split(/,/).map(s => s.split(':'))); - oldUT.apply(this); - }; + var srv = yield* self.getConnection(accountId, null, 'S'); var [ boxes ] = yield srv.getBoxes(gen.ef()); for (var k in boxes) { @@ -292,7 +302,7 @@ Syncer.sync = function*(account) { process.stderr.write(account.email+'/'+box.name+': quick resync\n'); self.vanished = []; - yield* self.runFetch('1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags'); + yield* self.runFetch(srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags'); if (self.vanished.length) { let lst = []; @@ -321,7 +331,7 @@ Syncer.sync = function*(account) process.stderr.write(account.email+'/'+box.name+': full resync\n'); process.stderr.write('\rsynchronizing 0'); - yield* self.runFetch('1:'+maxUid, {}, boxId, 'updateFlags'); + yield* self.runFetch(srv, '1:'+maxUid, {}, boxId, 'updateFlags'); process.stderr.write('\n'); // delete messages removed from IMAP server @@ -330,7 +340,7 @@ Syncer.sync = function*(account) // fetch new messages self.missing.push((maxUid ? maxUid+1 : 1)+':*'); - yield* self.runFetch(self.missing, { + yield* self.runFetch(srv, self.missing, { size: true, bodies: 'HEADER' }, boxId, 'saveMessages'); @@ -340,11 +350,8 @@ Syncer.sync = function*(account) //unread_count: box.messages.new, highestmodseq: box.highestmodseq||0 }).where({ id: row.id }).run(gen.ef()); - - yield srv.closeBox(gen.cb()); } - srv.end(); - self.srv = null; + self.releaseConnection(accountId, 'S'); } Syncer.deleteMessages = function*(where) @@ -361,94 +368,97 @@ Syncer.deleteMessages = function*(where) yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); } -Syncer.runFetch = function*(what, params, boxId, processor) +Syncer.runFetch = function*(srv, what, params, boxId, processor) { var self = this; - var f = self.srv.fetch(what, params); + var f = srv.fetch(what, params); - self.parsed = 0; - self.paused = false; - self.synced = 0; - self.pending = []; - self.results = []; + var fetchState = { + parsed: 0, + paused: false, + synced: 0, + pending: [], + results: [], + srv: srv + }; var cb, wait; f.on('message', function(msg, seqnum) { - gen.run(self.onMessage(msg, seqnum, boxId, processor), checkFinish, function(e) { checkFinish(); throw e; }); + gen.run(self.onMessage(fetchState, msg, seqnum, boxId, processor), checkFinish, function(e) { checkFinish(); throw e; }); }); cb = gen.cb(); yield f.once('end', function() { wait = true; - if (self.parsed <= 0) + if (fetchState.parsed <= 0) cb(); - else if (self.pending.length > 0) - { - var result = gen.run(self[processor](self.pending, boxId), saveLast, function(e) { saveLast(); throw e; }); - if (result) - self.results = self.results.concat(result); - } + else if (fetchState.pending.length > 0) + gen.run(self[processor](fetchState.pending, boxId, fetchState), saveLast, function(e) { saveLast(); throw e; }); }); - if (self.results.length > 0) + if (fetchState.results.length > 0) { - let r = self.results; - delete self.results; - return r; + return fetchState.results; } - function saveLast() + function saveLast(r) { - self.parsed -= self.pending.length; - self.pending = []; + if (r) + fetchState.results = fetchState.results.concat(r); + fetchState.parsed -= fetchState.pending.length; + fetchState.pending = []; checkFinish(); } function checkFinish() { - if (self.parsed <= 0 && wait) + if (fetchState.parsed <= 0 && wait) cb(); } }; -Syncer.onMessage = function*(msg, seqnum, boxId, processor) +Syncer.onMessage = function*(fetchState, msg, seqnum, boxId, processor) { var self = this; var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId); + // Workaround memory leak in node-imap + // TODO: send pull request + if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) + delete fetchState.srv._curReq.fetchCache[seqnum]; - self.pending.push([ msgrow, attrs ]); - self.parsed++; - if (!self.paused && self.parsed >= 100) + fetchState.pending.push([ msgrow, attrs ]); + fetchState.parsed++; + if (!fetchState.paused && fetchState.parsed >= 100) { // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! - self.srv._parser._ignoreReadable = true; - self.paused = true; + fetchState.srv._parser._ignoreReadable = true; + fetchState.paused = true; } - if (self.pending.length >= 100) + if (fetchState.pending.length >= 100) { - var m = self.pending; - self.pending = []; + var m = fetchState.pending; + fetchState.pending = []; var err; var result; try { - result = yield gen.run(self[processor](m, boxId), gen.cb()); + result = yield gen.run(self[processor](m, boxId, fetchState), gen.cb()); if (result) - self.results = self.results.concat(result); + fetchState.results = fetchState.results.concat(result); } catch (e) { err = e; } - self.parsed -= m.length; - if (self.paused && self.parsed < 100) + fetchState.parsed -= m.length; + if (fetchState.paused && fetchState.parsed < 100) { - self.paused = false; - self.srv._parser._ignoreReadable = false; - process.nextTick(self.srv._parser._cbReadable); + fetchState.paused = false; + fetchState.srv._parser._ignoreReadable = false; + process.nextTick(fetchState.srv._parser._cbReadable); } if (err) throw err; @@ -493,14 +503,10 @@ Syncer.parseMessage = function*(msg, seqnum, boxId) msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); var nf = msgrow.flags.filter(f => f != 'seen'); msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; - // Workaround memory leak in node-imap - // TODO: send pull request - if (this.srv._curReq && this.srv._curReq.fetchCache) - delete this.srv._curReq.fetchCache[seqnum]; return [ msgrow, attrs ]; } -Syncer.updateFlags = function*(messages, boxId) +Syncer.updateFlags = function*(messages, boxId, fetchState) { yield gen.throttle(3); var self = this; @@ -517,8 +523,8 @@ Syncer.updateFlags = function*(messages, boxId) for (i = 0; i < messages.length; i++) if (!uh[messages[i][0].uid]) self.missing.push(messages[i][0].uid); - self.synced += messages.length; - process.stderr.write('\rsynchronizing '+self.synced); + fetchState.synced += messages.length; + process.stderr.write('\rsynchronizing '+fetchState.synced); } Syncer.saveMessages = function*(messages, boxId) @@ -640,7 +646,7 @@ gen.run(function*() function genRequest(fn) { - return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e)); + return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e.stack)); } function splitEmails(s)