diff --git a/operetta.js b/operetta.js index 3f51ec6..d7d0f29 100644 --- a/operetta.js +++ b/operetta.js @@ -1,6 +1,7 @@ process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; var cfg = require('./cfg.json'); +require('heapdump'); var gen = require('gen-thread'); var Imap = require('imap'); var inspect = require('util').inspect; @@ -114,7 +115,17 @@ function* main(NEXT, account) }); yield msg.once('end', NEXT.cb()); - yield NEXT.throttle(5); + // Workaround memory leak in node-imap + // TODO: send pull request + if (srv._curReq.fetchCache) + delete srv._curReq.fetchCache[seqnum]; + /*var cached = 0; + for (var i in srv._curReq.fetchCache) + cached++; + console.log('msg '+seqnum+': '+cached+' in cache');*/ + + var [ pgtx, end_transaction ] = yield pg.transaction(NEXT.cb(), function(e) { if (e) throw e; }); + //yield NEXT.throttle(20); var header = Imap.parseHeader(msgrow.headers); for (var i in header) @@ -144,12 +155,12 @@ function* main(NEXT, account) msgrow.inreplyto = references[references.length-1]; else if (references[references.length-1] != msgrow.inreplyto) references.push(msgrow.inreplyto); - var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages') + var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') .where(pg.sql.in('messageid', references)).val(NEXT.cb()); if (err) throw new Error(''+err); if (!threadId) { - var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages') + var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.cb()); if (err) throw new Error(''+err); } @@ -157,7 +168,7 @@ function* main(NEXT, account) { try { - var [ err ] = yield pg.update('threads', { msg_count: pg.sql('msg_count+1') }) + var [ err ] = yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') }) .where({ id: threadId }).run(NEXT.cb()); if (err) throw new Error(''+err); } @@ -183,22 +194,24 @@ function* main(NEXT, account) msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()]; msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); - var [ err, id ] = yield pg.raw( - pg.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id' + var [ err, id ] = yield pgtx.raw( + pgtx.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id' ).val(NEXT.cb()); if (err) throw new Error(''+err); msgrow.id = id; if (!msgrow.thread_id) { - var [ err, thread_id ] = yield pg.insert('threads', { + var [ err, thread_id ] = yield pgtx.insert('threads', { first_msg: msgrow.id, msg_count: 1 }).returning('id').val(NEXT.cb()); if (err) throw new Error(''+err); msgrow.thread_id = thread_id; - var [ err, row ] = yield pg.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb()); + var [ err, row ] = yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb()); if (err) throw new Error(''+err); } + + end_transaction(); }); }); yield f.once('end', NEXT.cb()); diff --git a/run.sh b/run.sh index 0b0d182..88f216a 100755 --- a/run.sh +++ b/run.sh @@ -1,2 +1,2 @@ node_modules/.bin/babel operetta.js > operetta.c.js -nodejs operetta.c.js +nodejs --max_old_space_size=100 operetta.c.js