Fix message parsing workflow

master
Vitaliy Filippov 2016-07-17 21:30:32 +03:00
parent f85a1591fe
commit 251f596564
1 changed files with 45 additions and 43 deletions

View File

@ -40,8 +40,8 @@ function toPgArray(a)
function* main(NEXT, account) function* main(NEXT, account)
{ {
var accountId; var accountId;
var [ err, rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT); var [ err, rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT.cb());
if (err) throw err; if (err) throw new Error(''+err);
if (rows[0] && rows[0].id) if (rows[0] && rows[0].id)
accountId = rows[0].id; accountId = rows[0].id;
else else
@ -52,21 +52,21 @@ function* main(NEXT, account)
settings: { settings: {
imap: account.imap imap: account.imap
} }
}).returning('id').row(NEXT); }).returning('id').row(NEXT.cb());
if (err) throw err; if (err) throw new Error(''+err);
accountId = row.id; accountId = row.id;
} }
var srv = new Imap(account.imap); var srv = new Imap(account.imap);
srv.once('ready', NEXT); srv.once('ready', NEXT.cb());
yield srv.connect(); yield srv.connect();
var [ err, boxes ] = yield srv.getBoxes(NEXT); var [ err, boxes ] = yield srv.getBoxes(NEXT.cb());
for (var k in boxes) for (var k in boxes)
{ {
var [ err, box ] = yield srv.openBox(k, true, NEXT); var [ err, box ] = yield srv.openBox(k, true, NEXT.cb());
if (err) throw err; if (err) throw new Error(''+err);
var boxId; var boxId;
var [ err, rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) var [ err, rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
.where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT); .where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT.cb());
if (err) throw new Error(''+err); if (err) throw new Error(''+err);
if (rows[0] && rows[0].id) if (rows[0] && rows[0].id)
{ {
@ -82,8 +82,8 @@ function* main(NEXT, account)
account_id: accountId, account_id: accountId,
unread_count: box.messages.new, unread_count: box.messages.new,
// total_count: box.messages.count // total_count: box.messages.count
}).returning('id').row(NEXT); }).returning('id').row(NEXT.cb());
if (err) throw err; if (err) throw new Error(''+err);
boxId = row.id; boxId = row.id;
} }
var f = srv.fetch('1:*', { var f = srv.fetch('1:*', {
@ -94,20 +94,27 @@ function* main(NEXT, account)
{ {
gen.run(function*(NEXT) gen.run(function*(NEXT)
{ {
yield NEXT.throttle(5);
var msgrow = {}; var msgrow = {};
var [ stream, info ] = yield msg.on('body', NEXT); var attrs;
var buffer = ''; msg.on('body', function(stream, info)
stream.on('data', function(chunk)
{ {
buffer += chunk.toString('utf8'); var buffer = '';
stream.on('data', function(chunk)
{
buffer += chunk.toString('utf8');
});
stream.once('end', function()
{
msgrow.body = '';
msgrow.headers = buffer;
});
}); });
yield stream.once('end', NEXT); msg.once('attributes', function(a) {
msgrow.headers = buffer; attrs = a;
msgrow.body = ''; });
var [ attrs ] = yield msg.once('attributes', NEXT); yield msg.once('end', NEXT.cb());
yield msg.once('end', NEXT);
yield NEXT.throttle(5);
var header = Imap.parseHeader(msgrow.headers); var header = Imap.parseHeader(msgrow.headers);
for (var i in header) for (var i in header)
@ -129,7 +136,6 @@ function* main(NEXT, account)
var re = /(<[^>]*>)/; var re = /(<[^>]*>)/;
var references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); var references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re));
msgrow.refs = toPgArray(references); msgrow.refs = toPgArray(references);
console.log(seqnum+' 1');
if (references.length) if (references.length)
{ {
if (references.length > 10) if (references.length > 10)
@ -139,28 +145,26 @@ function* main(NEXT, account)
else if (references[references.length-1] != msgrow.inreplyto) else if (references[references.length-1] != msgrow.inreplyto)
references.push(msgrow.inreplyto); references.push(msgrow.inreplyto);
var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages') var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages')
.where(pg.sql.in('messageid', references)).val(NEXT); .where(pg.sql.in('messageid', references)).val(NEXT.cb());
if (err) throw err; if (err) throw new Error(''+err);
if (!threadId) if (!threadId)
{ {
var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages') var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages')
.where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT); .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.cb());
if (err) throw err; if (err) throw new Error(''+err);
} }
if (threadId) if (threadId)
{ {
console.log(seqnum+' 2');
try try
{ {
var [ err, threadId ] = yield pg.update('threads', { msg_count: pg.sql('msg_count+1') }) var [ err ] = yield pg.update('threads', { msg_count: pg.sql('msg_count+1') })
.where({ id: threadId }).run(NEXT); .where({ id: threadId }).run(NEXT.cb());
if (err) throw new Error(''+err); if (err) throw new Error(''+err);
} }
catch (e) catch (e)
{ {
throw new Error(''+e); throw new Error(''+e);
} }
console.log(seqnum+' 3');
} }
msgrow.thread_id = threadId; msgrow.thread_id = threadId;
} }
@ -179,28 +183,26 @@ function* main(NEXT, account)
msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()]; msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()];
msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
console.log(pg.insert('messages', msgrow)+''); var [ err, id ] = yield pg.raw(
var [ err, id ] = yield pg.raw(pg.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id').val(NEXT); pg.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id'
if (err) throw err; ).val(NEXT.cb());
if (err) throw new Error(''+err);
msgrow.id = id; msgrow.id = id;
if (!msgrow.thread_id) if (!msgrow.thread_id)
{ {
console.log(seqnum+' 4');
var [ err, thread_id ] = yield pg.insert('threads', { var [ err, thread_id ] = yield pg.insert('threads', {
first_msg: msgrow.id, first_msg: msgrow.id,
msg_count: 1 msg_count: 1
}).returning('id').val(NEXT); }).returning('id').val(NEXT.cb());
if (err) throw err; if (err) throw new Error(''+err);
msgrow.thread_id = thread_id; msgrow.thread_id = thread_id;
console.log(seqnum+' 5'); var [ err, row ] = yield pg.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb());
var [ err, row ] = yield pg.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT);
if (err) throw new Error(''+err); if (err) throw new Error(''+err);
console.log(seqnum+' 6');
} }
}); });
}); });
yield f.on('end', NEXT); yield f.once('end', NEXT.cb());
yield srv.closeBox(NEXT); yield srv.closeBox(NEXT.cb());
} }
srv.end(); srv.end();
} }