OK, basic api methods now work

master
Vitaliy Filippov 2016-08-28 12:40:29 +03:00
parent 875dee61c3
commit 6c31561b9b
1 changed files with 104 additions and 98 deletions

View File

@ -42,8 +42,13 @@ process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
var cfg = require('./cfg.json'); var cfg = require('./cfg.json');
var pg = bricks.configure(cfg.pg); var pg = bricks.configure(cfg.pg);
pg._pg.types.setTypeParser(1082, 'text', val => val); // НЕ ПАРСИТЬ ДАТЫ ! ! !
var Syncer = { var Syncer = {
selected: {},
connections: {},
busy: {},
queue: {}
}; };
Syncer.app = express(); Syncer.app = express();
@ -108,7 +113,7 @@ Syncer.app.get('/messages', genRequest(function*(req, res)
var limit = req.query.limit || 50; var limit = req.query.limit || 50;
var offset = req.query.offset || 0; var offset = req.query.offset || 0;
var [ msgs ] = yield pg.select('*').from('messages').where({ folder_id: folderId }) var [ msgs ] = yield pg.select('*').from('messages').where({ folder_id: folderId })
.limit(limit).offset(offset).rows(gen.ef()); .orderBy('time desc').limit(limit).offset(offset).rows(gen.ef());
return res.send({ messages: msgs }); return res.send({ messages: msgs });
})); }));
@ -119,17 +124,19 @@ Syncer.app.get('/message', genRequest(function*(req, res)
return res.sendStatus(401); return res.sendStatus(401);
var msgId = req.query.msgId; var msgId = req.query.msgId;
var [ msg ] = yield pg.select('m.*, f.name folder_name, f.account_id') var [ msg ] = yield pg.select('m.*, f.name folder_name, f.account_id')
.from('messages m').join('folders f', 'f.id=m.folder_id') .from('messages m').join('folders f', pg.sql('f.id=m.folder_id'))
.where({ 'm.id': msgId }).row(gen.ef()); .where({ 'm.id': msgId }).row(gen.ef());
if (!msg) if (!msg)
return res.send({ error: 'not-found' }); return res.send({ error: 'not-found' });
delete msg.text_index;
if (!msg.body_html && !msg.body_text) if (!msg.body_html && !msg.body_text)
{ {
var srv = yield* self.getConnection(msg.account_id, boxName); var srv = yield* self.getConnection(msg.account_id, msg.folder_name);
var [ upd ] = yield* self.runFetch(msg.uid, { bodies: '' }, msg.folder_id, 'getBody'); var [ upd ] = yield* self.runFetch(srv, msg.uid, { bodies: '' }, msg.folder_id, 'getBody');
self.releaseConnection(accountId); self.releaseConnection(msg.account_id);
return res.send({ msg: { ...msg, ...upd } }); return res.send({ msg: { ...msg, ...upd } });
} }
return res.send({ msg: msg });
})); }));
Syncer.getBody = function*(messages, boxId) Syncer.getBody = function*(messages, boxId)
@ -141,10 +148,9 @@ Syncer.getBody = function*(messages, boxId)
let msg = messages[i]; let msg = messages[i];
p.on('end', gen.cb()); p.on('end', gen.cb());
p.write(msg[0].headers); p.write(msg[0].headers);
let obj = yield p.end(); let [ obj ] = yield p.end();
delete msg[0].headers; obj.html = htmlawed.sanitize(obj.html||'', { safe: 1, elements: '* +style' });
obj.html = htmlawed.sanitize(obj.html, { safe: 1, elements: '* +style' }); let upd = { body_text: obj.text||'', body_html: obj.html };
let upd = { body_text: obj.text, body_html: obj.html };
upd.body_html_text = obj.html.replace(/<style[^>]*>.*<\/style\s*>|<\/?[^>]*>/g, ''); upd.body_html_text = obj.html.replace(/<style[^>]*>.*<\/style\s*>|<\/?[^>]*>/g, '');
yield pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef()); yield pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef());
if (messages.length == 1) if (messages.length == 1)
@ -152,23 +158,33 @@ Syncer.getBody = function*(messages, boxId)
} }
}; };
Syncer.getConnection = function*(accountId, boxName) Syncer.getConnection = function*(accountId, boxName, connKey)
{ {
var self = this; var self = this;
if (self.connections[accountId]) connKey = accountId+(connKey||'');
if (self.connections[connKey])
{ {
if (self.busy[accountId]) if (self.busy[connKey])
yield self.queue[accountId].push(gen.cb()); yield self.queue[connKey].push(gen.cb());
if (boxName && self.selected[accountId] != boxName) if (boxName && self.selected[connKey] != boxName)
{ {
yield srv.openBox(boxName, true, gen.ef()); yield srv.openBox(boxName, true, gen.ef());
self.selected[accountId] = boxName; self.selected[connKey] = boxName;
} }
self.busy[accountId] = true; self.busy[connKey] = true;
return self.connections[accountId]; return self.connections[connKey];
} }
var srv = new Imap(self.accounts[accountId].imap); if (!self.accounts)
{
self.accounts = {};
let [ rows ] = yield pg.select('*').from('accounts').rows(gen.ef());
for (var i = 0; i < rows.length; i++)
{
self.accounts[rows[i].id] = rows[i];
}
}
var srv = new Imap(self.accounts[accountId].settings.imap);
srv.once('ready', gen.cb()); srv.once('ready', gen.cb());
// FIXME handle connection errors // FIXME handle connection errors
@ -181,13 +197,13 @@ Syncer.getConnection = function*(accountId, boxName)
{ {
var m; var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
self.vanished.push(m[2].split(/,/).map(s => s.split(':'))); self.vanished = self.vanished.concat(self.vanished, m[2].split(/,/).map(s => s.split(':')));
oldUT.apply(this); oldUT.apply(this);
}; };
srv.on('close', function() srv.on('close', function()
{ {
delete self.connections[accountId]; delete self.connections[connKey];
if (self.srv == srv) if (self.srv == srv)
delete self.srv; delete self.srv;
}); });
@ -195,21 +211,30 @@ Syncer.getConnection = function*(accountId, boxName)
if (boxName) if (boxName)
{ {
yield srv.openBox(boxName, true, gen.ef()); yield srv.openBox(boxName, true, gen.ef());
self.selected[accountId] = boxName; self.selected[connKey] = boxName;
} }
self.connections[accountId] = srv; self.connections[connKey] = srv;
self.busy[accountId] = true; self.busy[connKey] = true;
self.queue[accountId] = []; self.queue[connKey] = [];
return srv; return srv;
} }
Syncer.releaseConnection = function(accountId) Syncer.releaseConnection = function(accountId, connKey, allowClose)
{ {
var self = this; var self = this;
self.busy[accountId] = false; connKey = accountId+(connKey||'');
if (self.queue[accountId].length) self.busy[connKey] = false;
(self.queue[accountId].shift())(); if (self.queue[connKey].length)
(self.queue[connKey].shift())();
else if (allowClose)
{
self.connections[connKey].end();
delete self.connections[connKey];
delete self.busy[connKey];
delete self.queue[connKey];
delete self.selected[connKey];
}
} }
Syncer.sync = function*(account) Syncer.sync = function*(account)
@ -230,23 +255,8 @@ Syncer.sync = function*(account)
}).returning('id').row(gen.ef()); }).returning('id').row(gen.ef());
accountId = row.id; accountId = row.id;
} }
var srv = new Imap(account.imap);
self.srv = srv;
srv.once('ready', gen.cb());
yield srv.connect();
yield srv._enqueue('ENABLE QRESYNC', gen.cb());
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
self.vanished.push(m[2].split(/,/).map(s => s.split(':')));
oldUT.apply(this);
};
var srv = yield* self.getConnection(accountId, null, 'S');
var [ boxes ] = yield srv.getBoxes(gen.ef()); var [ boxes ] = yield srv.getBoxes(gen.ef());
for (var k in boxes) for (var k in boxes)
{ {
@ -292,7 +302,7 @@ Syncer.sync = function*(account)
{ {
process.stderr.write(account.email+'/'+box.name+': quick resync\n'); process.stderr.write(account.email+'/'+box.name+': quick resync\n');
self.vanished = []; self.vanished = [];
yield* self.runFetch('1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags'); yield* self.runFetch(srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'updateFlags');
if (self.vanished.length) if (self.vanished.length)
{ {
let lst = []; let lst = [];
@ -321,7 +331,7 @@ Syncer.sync = function*(account)
process.stderr.write(account.email+'/'+box.name+': full resync\n'); process.stderr.write(account.email+'/'+box.name+': full resync\n');
process.stderr.write('\rsynchronizing 0'); process.stderr.write('\rsynchronizing 0');
yield* self.runFetch('1:'+maxUid, {}, boxId, 'updateFlags'); yield* self.runFetch(srv, '1:'+maxUid, {}, boxId, 'updateFlags');
process.stderr.write('\n'); process.stderr.write('\n');
// delete messages removed from IMAP server // delete messages removed from IMAP server
@ -330,7 +340,7 @@ Syncer.sync = function*(account)
// fetch new messages // fetch new messages
self.missing.push((maxUid ? maxUid+1 : 1)+':*'); self.missing.push((maxUid ? maxUid+1 : 1)+':*');
yield* self.runFetch(self.missing, { yield* self.runFetch(srv, self.missing, {
size: true, size: true,
bodies: 'HEADER' bodies: 'HEADER'
}, boxId, 'saveMessages'); }, boxId, 'saveMessages');
@ -340,11 +350,8 @@ Syncer.sync = function*(account)
//unread_count: box.messages.new, //unread_count: box.messages.new,
highestmodseq: box.highestmodseq||0 highestmodseq: box.highestmodseq||0
}).where({ id: row.id }).run(gen.ef()); }).where({ id: row.id }).run(gen.ef());
yield srv.closeBox(gen.cb());
} }
srv.end(); self.releaseConnection(accountId, 'S');
self.srv = null;
} }
Syncer.deleteMessages = function*(where) Syncer.deleteMessages = function*(where)
@ -361,94 +368,97 @@ Syncer.deleteMessages = function*(where)
yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef());
} }
Syncer.runFetch = function*(what, params, boxId, processor) Syncer.runFetch = function*(srv, what, params, boxId, processor)
{ {
var self = this; var self = this;
var f = self.srv.fetch(what, params); var f = srv.fetch(what, params);
self.parsed = 0; var fetchState = {
self.paused = false; parsed: 0,
self.synced = 0; paused: false,
self.pending = []; synced: 0,
self.results = []; pending: [],
results: [],
srv: srv
};
var cb, wait; var cb, wait;
f.on('message', function(msg, seqnum) f.on('message', function(msg, seqnum)
{ {
gen.run(self.onMessage(msg, seqnum, boxId, processor), checkFinish, function(e) { checkFinish(); throw e; }); gen.run(self.onMessage(fetchState, msg, seqnum, boxId, processor), checkFinish, function(e) { checkFinish(); throw e; });
}); });
cb = gen.cb(); cb = gen.cb();
yield f.once('end', function() yield f.once('end', function()
{ {
wait = true; wait = true;
if (self.parsed <= 0) if (fetchState.parsed <= 0)
cb(); cb();
else if (self.pending.length > 0) else if (fetchState.pending.length > 0)
{ gen.run(self[processor](fetchState.pending, boxId, fetchState), saveLast, function(e) { saveLast(); throw e; });
var result = gen.run(self[processor](self.pending, boxId), saveLast, function(e) { saveLast(); throw e; });
if (result)
self.results = self.results.concat(result);
}
}); });
if (self.results.length > 0) if (fetchState.results.length > 0)
{ {
let r = self.results; return fetchState.results;
delete self.results;
return r;
} }
function saveLast() function saveLast(r)
{ {
self.parsed -= self.pending.length; if (r)
self.pending = []; fetchState.results = fetchState.results.concat(r);
fetchState.parsed -= fetchState.pending.length;
fetchState.pending = [];
checkFinish(); checkFinish();
} }
function checkFinish() function checkFinish()
{ {
if (self.parsed <= 0 && wait) if (fetchState.parsed <= 0 && wait)
cb(); cb();
} }
}; };
Syncer.onMessage = function*(msg, seqnum, boxId, processor) Syncer.onMessage = function*(fetchState, msg, seqnum, boxId, processor)
{ {
var self = this; var self = this;
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId); var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId);
// Workaround memory leak in node-imap
// TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
delete fetchState.srv._curReq.fetchCache[seqnum];
self.pending.push([ msgrow, attrs ]); fetchState.pending.push([ msgrow, attrs ]);
self.parsed++; fetchState.parsed++;
if (!self.paused && self.parsed >= 100) if (!fetchState.paused && fetchState.parsed >= 100)
{ {
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
self.srv._parser._ignoreReadable = true; fetchState.srv._parser._ignoreReadable = true;
self.paused = true; fetchState.paused = true;
} }
if (self.pending.length >= 100) if (fetchState.pending.length >= 100)
{ {
var m = self.pending; var m = fetchState.pending;
self.pending = []; fetchState.pending = [];
var err; var err;
var result; var result;
try try
{ {
result = yield gen.run(self[processor](m, boxId), gen.cb()); result = yield gen.run(self[processor](m, boxId, fetchState), gen.cb());
if (result) if (result)
self.results = self.results.concat(result); fetchState.results = fetchState.results.concat(result);
} }
catch (e) catch (e)
{ {
err = e; err = e;
} }
self.parsed -= m.length; fetchState.parsed -= m.length;
if (self.paused && self.parsed < 100) if (fetchState.paused && fetchState.parsed < 100)
{ {
self.paused = false; fetchState.paused = false;
self.srv._parser._ignoreReadable = false; fetchState.srv._parser._ignoreReadable = false;
process.nextTick(self.srv._parser._cbReadable); process.nextTick(fetchState.srv._parser._cbReadable);
} }
if (err) if (err)
throw err; throw err;
@ -493,14 +503,10 @@ Syncer.parseMessage = function*(msg, seqnum, boxId)
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen'); var nf = msgrow.flags.filter(f => f != 'seen');
msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; msgrow.flags = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
// Workaround memory leak in node-imap
// TODO: send pull request
if (this.srv._curReq && this.srv._curReq.fetchCache)
delete this.srv._curReq.fetchCache[seqnum];
return [ msgrow, attrs ]; return [ msgrow, attrs ];
} }
Syncer.updateFlags = function*(messages, boxId) Syncer.updateFlags = function*(messages, boxId, fetchState)
{ {
yield gen.throttle(3); yield gen.throttle(3);
var self = this; var self = this;
@ -517,8 +523,8 @@ Syncer.updateFlags = function*(messages, boxId)
for (i = 0; i < messages.length; i++) for (i = 0; i < messages.length; i++)
if (!uh[messages[i][0].uid]) if (!uh[messages[i][0].uid])
self.missing.push(messages[i][0].uid); self.missing.push(messages[i][0].uid);
self.synced += messages.length; fetchState.synced += messages.length;
process.stderr.write('\rsynchronizing '+self.synced); process.stderr.write('\rsynchronizing '+fetchState.synced);
} }
Syncer.saveMessages = function*(messages, boxId) Syncer.saveMessages = function*(messages, boxId)
@ -640,7 +646,7 @@ gen.run(function*()
function genRequest(fn) function genRequest(fn)
{ {
return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e)); return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e.stack));
} }
function splitEmails(s) function splitEmails(s)