diff --git a/operetta.js b/operetta.js index c2eac81..4dcf18e 100644 --- a/operetta.js +++ b/operetta.js @@ -41,34 +41,30 @@ function toPgArray(a) function* main(NEXT, account) { var accountId; - var [ err, rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT.cb()); - if (err) throw new Error(''+err); + var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT.ef()); if (rows[0] && rows[0].id) accountId = rows[0].id; else { - var [ err, row ] = yield pg.insert('accounts', { + var [ row ] = pg.insert('accounts', { name: account.name, email: account.email, settings: { imap: account.imap } - }).returning('id').row(NEXT.cb()); - if (err) throw new Error(''+err); + }).returning('id').row(NEXT.ef()); accountId = row.id; } var srv = new Imap(account.imap); srv.once('ready', NEXT.cb()); yield srv.connect(); - var [ err, boxes ] = yield srv.getBoxes(NEXT.cb()); + var [ boxes ] = yield srv.getBoxes(NEXT.ef()); for (var k in boxes) { - var [ err, box ] = yield srv.openBox(k, true, NEXT.cb()); - if (err) throw new Error(''+err); + var [ box ] = yield srv.openBox(k, true, NEXT.ef()); var boxId; - var [ err, rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) - .where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT.cb()); - if (err) throw new Error(''+err); + var [ rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) + .where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT.ef()); if (rows[0] && rows[0].id) { // IMAP sync: http://tools.ietf.org/html/rfc4549 @@ -77,14 +73,13 @@ function* main(NEXT, account) } else { - var [ err, row ] = yield pg.insert('folders', { + var [ row ] = yield pg.insert('folders', { name: box.name, uidvalidity: box.uidvalidity, account_id: accountId, unread_count: box.messages.new, // total_count: box.messages.count - }).returning('id').row(NEXT.cb()); - if (err) throw new Error(''+err); + }).returning('id').row(NEXT.ef()); boxId = row.id; } var f = srv.fetch('1:*', { @@ -120,10 +115,6 @@ function* main(NEXT, account) // TODO: send pull request if (srv._curReq.fetchCache) delete srv._curReq.fetchCache[seqnum]; - /*var cached = 0; - for (var i in srv._curReq.fetchCache) - cached++; - console.log('msg '+seqnum+': '+cached+' in cache');*/ parsed++; if (!paused && parsed > 20) @@ -132,100 +123,110 @@ function* main(NEXT, account) srv._parser._ignoreReadable = true; paused = true; } - try { - var [ pgtx, end_transaction ] = yield pg.transaction(NEXT.cb(), function(e) { if (e) throw e; }); - //yield NEXT.throttle(20); - - var header = Imap.parseHeader(msgrow.headers); - for (var i in header) - for (var k = 0; k < header[i].length; k++) - header[i][k] = header[i][k].replace(/\x00/g, ''); - header.from = header.from && splitEmails(header.from[0])[0]; - msgrow.from_email = header.from && header.from.email || ''; - msgrow.from_name = header.from && header.from.name || ''; - header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; - msgrow.replyto_email = header.replyto && header.replyto.email || ''; - msgrow.replyto_name = header.replyto && header.replyto.name || ''; - msgrow.to_list = header.to && header.to[0] || ''; - msgrow.cc_list = header.cc && header.cc[0] || ''; - msgrow.bcc_list = header.bcc && header.bcc[0] || ''; - msgrow.subject = header.subject && header.subject[0] || ''; - msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; - msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; - msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); - var re = /(<[^>]*>)/; - var references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); - msgrow.refs = toPgArray(references); - if (references.length) + var pgtx, end_transaction; + try { - if (references.length > 10) - references = [ references[0] ].concat(references.slice(references.count-9)); - if (!msgrow.inreplyto) - msgrow.inreplyto = references[references.length-1]; - else if (references[references.length-1] != msgrow.inreplyto) - references.push(msgrow.inreplyto); - var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(pg.sql.in('messageid', references)).val(NEXT.cb()); - if (err) throw new Error(''+err); - if (!threadId) + [ pgtx, end_transaction ] = yield pg.transaction(NEXT.cb(), function(e) { if (e) throw e; }); + + msgrow.uid = attrs.uid; + msgrow.folder_id = boxId; + msgrow.flags = 0; + for (var i = 0; i < attrs.flags.length; i++) + msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()]; + msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag + + var [ exists ] = yield pgtx.select('id').from('messages').where({ folder_id: msgrow.folder_id, uid: msgrow.uid }).rows(NEXT.ef()); + if (exists.length) { - var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.cb()); - if (err) throw new Error(''+err); + process.stderr.write('\rsynchronizing '+msgrow.uid+'...'); + yield pgtx.update('messages', { flags: msgrow.flags }).where({ folder_id: msgrow.folder_id, uid: msgrow.uid }).run(NEXT.ef()); } - if (threadId) + else { - try + var header = Imap.parseHeader(msgrow.headers); + for (var i in header) + for (var k = 0; k < header[i].length; k++) + header[i][k] = header[i][k].replace(/\x00/g, ''); + header.from = header.from && splitEmails(header.from[0])[0]; + header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; + var re = /(<[^>]*>)/; + header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); + if (header.references.length) { - var [ err ] = yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') }) - .where({ id: threadId }).run(NEXT.cb()); - if (err) throw new Error(''+err); + if (header.references.length > 10) + header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9)); + if (!header['in-reply-to'] || !header['in-reply-to'][0]) + header['in-reply-to'] = [ header.references[header.references.length-1] ]; + else if (header.references[header.references.length-1] != header['in-reply-to'][0]) + header.references.push(header['in-reply-to'][0]); } - catch (e) + if (header.date) { - throw new Error(''+e); + var t = Date.parse(header.date[0]); + if (!isNaN(t)) + header.date = new Date(t); + } + if (!header.date) + header.date = new Date(attrs.date); + + msgrow.from_email = header.from && header.from.email || ''; + msgrow.from_name = header.from && header.from.name || ''; + msgrow.replyto_email = header.replyto && header.replyto.email || ''; + msgrow.replyto_name = header.replyto && header.replyto.name || ''; + msgrow.to_list = header.to && header.to[0] || ''; + msgrow.cc_list = header.cc && header.cc[0] || ''; + msgrow.bcc_list = header.bcc && header.bcc[0] || ''; + msgrow.subject = header.subject && header.subject[0] || ''; + msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; + msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; + msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); + msgrow.time = header.date; + msgrow.refs = toPgArray(header.references); + if (header.references.length) + { + var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') + .where(pg.sql.in('messageid', header.references)).val(NEXT.ef()); + if (!threadId) + { + [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') + .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.ef()); + } + if (threadId) + { + try + { + yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') }) + .where({ id: threadId }).run(NEXT.ef()); + } + catch (e) + { + throw new Error(''+e); + } + } + msgrow.thread_id = threadId; + } + console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); + [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(NEXT.ef()); + if (!msgrow.thread_id) + { + [ msgrow.thread_id ] = yield pgtx.insert('threads', { + first_msg: msgrow.id, + msg_count: 1 + }).returning('id').val(NEXT.ef()); + yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.ef()); } } - msgrow.thread_id = threadId; - } - if (header.date) - { - var t = Date.parse(header.date[0]); - if (!isNaN(t)) - msgrow.time = new Date(t); - } - if (!msgrow.time) - msgrow.time = new Date(attrs.date); - msgrow.uid = attrs.uid; - msgrow.folder_id = boxId; - msgrow.flags = 0; - for (var i = 0; i < attrs.flags.length; i++) - msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()]; - msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag - console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); - var [ err, id ] = yield pgtx.raw( - pgtx.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id' - ).val(NEXT.cb()); - if (err) throw new Error(''+err); - msgrow.id = id; - if (!msgrow.thread_id) - { - var [ err, thread_id ] = yield pgtx.insert('threads', { - first_msg: msgrow.id, - msg_count: 1 - }).returning('id').val(NEXT.cb()); - if (err) throw new Error(''+err); - msgrow.thread_id = thread_id; - var [ err, row ] = yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb()); - if (err) throw new Error(''+err); - } - end_transaction(); - - } catch (e0) { + end_transaction(); + } + catch (e0) + { + if (end_transaction) + end_transaction(); throw e0; } + parsed--; if (paused && parsed <= 10) {