diff --git a/operetta.js b/operetta.js index e92bf3f..3f51ec6 100644 --- a/operetta.js +++ b/operetta.js @@ -40,8 +40,8 @@ function toPgArray(a) function* main(NEXT, account) { var accountId; - var [ err, rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT); - if (err) throw err; + var [ err, rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT.cb()); + if (err) throw new Error(''+err); if (rows[0] && rows[0].id) accountId = rows[0].id; else @@ -52,21 +52,21 @@ function* main(NEXT, account) settings: { imap: account.imap } - }).returning('id').row(NEXT); - if (err) throw err; + }).returning('id').row(NEXT.cb()); + if (err) throw new Error(''+err); accountId = row.id; } var srv = new Imap(account.imap); - srv.once('ready', NEXT); + srv.once('ready', NEXT.cb()); yield srv.connect(); - var [ err, boxes ] = yield srv.getBoxes(NEXT); + var [ err, boxes ] = yield srv.getBoxes(NEXT.cb()); for (var k in boxes) { - var [ err, box ] = yield srv.openBox(k, true, NEXT); - if (err) throw err; + var [ err, box ] = yield srv.openBox(k, true, NEXT.cb()); + if (err) throw new Error(''+err); var boxId; var [ err, rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) - .where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT); + .where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT.cb()); if (err) throw new Error(''+err); if (rows[0] && rows[0].id) { @@ -82,8 +82,8 @@ function* main(NEXT, account) account_id: accountId, unread_count: box.messages.new, // total_count: box.messages.count - }).returning('id').row(NEXT); - if (err) throw err; + }).returning('id').row(NEXT.cb()); + if (err) throw new Error(''+err); boxId = row.id; } var f = srv.fetch('1:*', { @@ -94,20 +94,27 @@ function* main(NEXT, account) { gen.run(function*(NEXT) { - yield NEXT.throttle(5); - var msgrow = {}; - var [ stream, info ] = yield msg.on('body', NEXT); - var buffer = ''; - stream.on('data', function(chunk) + var attrs; + msg.on('body', function(stream, info) { - buffer += chunk.toString('utf8'); + var buffer = ''; + stream.on('data', function(chunk) + { + buffer += chunk.toString('utf8'); + }); + stream.once('end', function() + { + msgrow.body = ''; + msgrow.headers = buffer; + }); }); - yield stream.once('end', NEXT); - msgrow.headers = buffer; - msgrow.body = ''; - var [ attrs ] = yield msg.once('attributes', NEXT); - yield msg.once('end', NEXT); + msg.once('attributes', function(a) { + attrs = a; + }); + yield msg.once('end', NEXT.cb()); + + yield NEXT.throttle(5); var header = Imap.parseHeader(msgrow.headers); for (var i in header) @@ -129,7 +136,6 @@ function* main(NEXT, account) var re = /(<[^>]*>)/; var references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); msgrow.refs = toPgArray(references); - console.log(seqnum+' 1'); if (references.length) { if (references.length > 10) @@ -139,28 +145,26 @@ function* main(NEXT, account) else if (references[references.length-1] != msgrow.inreplyto) references.push(msgrow.inreplyto); var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages') - .where(pg.sql.in('messageid', references)).val(NEXT); - if (err) throw err; + .where(pg.sql.in('messageid', references)).val(NEXT.cb()); + if (err) throw new Error(''+err); if (!threadId) { var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages') - .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT); - if (err) throw err; + .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.cb()); + if (err) throw new Error(''+err); } if (threadId) { - console.log(seqnum+' 2'); try { - var [ err, threadId ] = yield pg.update('threads', { msg_count: pg.sql('msg_count+1') }) - .where({ id: threadId }).run(NEXT); + var [ err ] = yield pg.update('threads', { msg_count: pg.sql('msg_count+1') }) + .where({ id: threadId }).run(NEXT.cb()); if (err) throw new Error(''+err); } catch (e) { throw new Error(''+e); } - console.log(seqnum+' 3'); } msgrow.thread_id = threadId; } @@ -179,28 +183,26 @@ function* main(NEXT, account) msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()]; msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); - console.log(pg.insert('messages', msgrow)+''); - var [ err, id ] = yield pg.raw(pg.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id').val(NEXT); - if (err) throw err; + var [ err, id ] = yield pg.raw( + pg.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id' + ).val(NEXT.cb()); + if (err) throw new Error(''+err); msgrow.id = id; if (!msgrow.thread_id) { - console.log(seqnum+' 4'); var [ err, thread_id ] = yield pg.insert('threads', { first_msg: msgrow.id, msg_count: 1 - }).returning('id').val(NEXT); - if (err) throw err; - msgrow.thread_id = thread_id; - console.log(seqnum+' 5'); - var [ err, row ] = yield pg.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT); + }).returning('id').val(NEXT.cb()); + if (err) throw new Error(''+err); + msgrow.thread_id = thread_id; + var [ err, row ] = yield pg.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb()); if (err) throw new Error(''+err); - console.log(seqnum+' 6'); } }); }); - yield f.on('end', NEXT); - yield srv.closeBox(NEXT); + yield f.once('end', NEXT.cb()); + yield srv.closeBox(NEXT.cb()); } srv.end(); }