add transaction & workaround for memory leak

master
Vitaliy Filippov 2016-07-19 12:46:45 +03:00
parent 251f596564
commit 7bb64ee02f
2 changed files with 22 additions and 9 deletions

View File

@ -1,6 +1,7 @@
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
var cfg = require('./cfg.json');
require('heapdump');
var gen = require('gen-thread');
var Imap = require('imap');
var inspect = require('util').inspect;
@ -114,7 +115,17 @@ function* main(NEXT, account)
});
yield msg.once('end', NEXT.cb());
yield NEXT.throttle(5);
// Workaround memory leak in node-imap
// TODO: send pull request
if (srv._curReq.fetchCache)
delete srv._curReq.fetchCache[seqnum];
/*var cached = 0;
for (var i in srv._curReq.fetchCache)
cached++;
console.log('msg '+seqnum+': '+cached+' in cache');*/
var [ pgtx, end_transaction ] = yield pg.transaction(NEXT.cb(), function(e) { if (e) throw e; });
//yield NEXT.throttle(20);
var header = Imap.parseHeader(msgrow.headers);
for (var i in header)
@ -144,12 +155,12 @@ function* main(NEXT, account)
msgrow.inreplyto = references[references.length-1];
else if (references[references.length-1] != msgrow.inreplyto)
references.push(msgrow.inreplyto);
var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages')
var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(pg.sql.in('messageid', references)).val(NEXT.cb());
if (err) throw new Error(''+err);
if (!threadId)
{
var [ err, threadId ] = yield pg.select('MAX(thread_id)').from('messages')
var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.cb());
if (err) throw new Error(''+err);
}
@ -157,7 +168,7 @@ function* main(NEXT, account)
{
try
{
var [ err ] = yield pg.update('threads', { msg_count: pg.sql('msg_count+1') })
var [ err ] = yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') })
.where({ id: threadId }).run(NEXT.cb());
if (err) throw new Error(''+err);
}
@ -183,22 +194,24 @@ function* main(NEXT, account)
msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()];
msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
var [ err, id ] = yield pg.raw(
pg.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id'
var [ err, id ] = yield pgtx.raw(
pgtx.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id'
).val(NEXT.cb());
if (err) throw new Error(''+err);
msgrow.id = id;
if (!msgrow.thread_id)
{
var [ err, thread_id ] = yield pg.insert('threads', {
var [ err, thread_id ] = yield pgtx.insert('threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(NEXT.cb());
if (err) throw new Error(''+err);
msgrow.thread_id = thread_id;
var [ err, row ] = yield pg.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb());
var [ err, row ] = yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb());
if (err) throw new Error(''+err);
}
end_transaction();
});
});
yield f.once('end', NEXT.cb());

2
run.sh
View File

@ -1,2 +1,2 @@
node_modules/.bin/babel operetta.js > operetta.c.js
nodejs operetta.c.js
nodejs --max_old_space_size=100 operetta.c.js