Throttle incoming stream to fight memory usage (74752 messages fetched ok with ~50mb memory usage)
parent
7bb64ee02f
commit
a66867072d
21
operetta.js
21
operetta.js
|
@ -91,6 +91,7 @@ function* main(NEXT, account)
|
|||
size: true,
|
||||
bodies: 'HEADER'
|
||||
});
|
||||
var parsed = 0, paused = false;
|
||||
f.on('message', function(msg, seqnum)
|
||||
{
|
||||
gen.run(function*(NEXT)
|
||||
|
@ -124,6 +125,15 @@ function* main(NEXT, account)
|
|||
cached++;
|
||||
console.log('msg '+seqnum+': '+cached+' in cache');*/
|
||||
|
||||
parsed++;
|
||||
if (!paused && parsed > 20)
|
||||
{
|
||||
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
|
||||
srv._parser._ignoreReadable = true;
|
||||
paused = true;
|
||||
}
|
||||
try {
|
||||
|
||||
var [ pgtx, end_transaction ] = yield pg.transaction(NEXT.cb(), function(e) { if (e) throw e; });
|
||||
//yield NEXT.throttle(20);
|
||||
|
||||
|
@ -212,6 +222,17 @@ function* main(NEXT, account)
|
|||
}
|
||||
|
||||
end_transaction();
|
||||
|
||||
} catch (e0) {
|
||||
throw e0;
|
||||
}
|
||||
parsed--;
|
||||
if (paused && parsed <= 10)
|
||||
{
|
||||
paused = false;
|
||||
srv._parser._ignoreReadable = false;
|
||||
process.nextTick(srv._parser._cbReadable);
|
||||
}
|
||||
});
|
||||
});
|
||||
yield f.once('end', NEXT.cb());
|
||||
|
|
Loading…
Reference in New Issue