Slightly structure gen-thread code

master
Vitaliy Filippov 2016-07-20 01:09:27 +03:00
parent dce1046222
commit 6c553f51f1
1 changed files with 100 additions and 92 deletions

116
index.js
View File

@ -1,38 +1,28 @@
// Yet Another Hack to fight node.js callback hell: generator-based coroutines
module.exports.run = runThread; module.exports.run = runThread;
module.exports.runParallel = runParallel; module.exports.runParallel = runParallel;
var q = [];
var pending = [];
function finishq()
{
for (var i = 0; i < q.length; i++)
{
if (q[i]._done)
{
q.splice(i, 1);
i--;
}
}
while (pending.length > 0 && q.length < pending[0][2])
{
var t = pending.shift();
q.push(t[0]);
process.nextTick(t[1]);
}
}
var tid = 0;
function runThread(main, arg, done) function runThread(main, arg, done)
{ {
var thread = function() var thread = function() { continueThread.apply(thread) };
thread._gens = [ main(thread, arg) ];
thread.throttle = throttleThread;
thread.cb = threadCallback.bind(thread);
thread._finishThrottleQueue = finishThrottleQueue.bind(thread);
thread._finishCallback = done;
thread();
return thread;
}
function continueThread()
{ {
// pass parameters as yield result // pass parameters as yield result
var pass = Array.prototype.slice.call(arguments, 0); var pass = Array.prototype.slice.call(arguments, 0);
var v; var v;
try try
{ {
v = thread.gens[0].next(pass); v = this._gens[0].next(pass);
} }
catch (e) catch (e)
{ {
@ -41,49 +31,36 @@ function runThread(main, arg, done)
if (v.done) if (v.done)
{ {
// generator finished // generator finished
thread.gens.shift(); this._gens.shift();
if (thread.gens.length) if (this._gens.length)
{ {
// return to previous generator // return to previous generator
thread(v.value); this(v.value);
return; return;
} }
} }
if (typeof v.value == 'object' && if (typeof v.value == 'object' &&
v.value.constructor.constructor == thread.gens[0].constructor.constructor) v.value.constructor.constructor == this._gens[0].constructor.constructor)
{ {
// another generator instance returned - add it to stack and call // another generator instance returned - add it to stack and call
thread.gens.unshift(v.value); this._gens.unshift(v.value);
thread(); this();
return; return;
} }
if (!thread.gens.length) if (!this._gens.length)
{ {
thread._done = true; this._done = true;
process.nextTick(finishq); process.nextTick(this._finishThrottleQueue);
} }
if (v.error) if (v.error)
throw v.error; throw v.error;
if (!thread.gens.length && done) if (!this._gens.length && this._finishCallback)
done(v.value); this._finishCallback(v.value);
};
thread.id = tid++;
thread.gens = [ main(thread, arg) ];
thread.throttle = function(count)
{
finishq();
if (q.length < count)
{
q.push(thread);
process.nextTick(thread.cb());
} }
else
{ function threadCallback()
pending.push([ thread, thread.cb(), count ]);
}
};
thread.cb = function()
{ {
var thread = this;
var fn = function() var fn = function()
{ {
if (thread._current != fn) if (thread._current != fn)
@ -99,9 +76,40 @@ function runThread(main, arg, done)
fn._stack = new Error().stack; fn._stack = new Error().stack;
thread._current = fn; thread._current = fn;
return fn; return fn;
}; }
thread();
return thread; function throttleThread(count)
{
if (!this.throttleData)
this.throttleData = this._gens[0].__proto__._genThreadThrottle = this._gens[0].__proto__._genThreadThrottle || { queue: [], pending: [] };
this._finishThrottleQueue();
if (this.throttleData.queue.length < count)
{
this.throttleData.queue.push(this);
process.nextTick(this.cb());
}
else
this.throttleData.pending.push([ this, this.cb(), count ]);
}
function finishThrottleQueue()
{
if (!this.throttleData)
return;
for (var i = 0; i < this.throttleData.queue.length; i++)
{
if (this.throttleData.queue[i]._done)
{
this.throttleData.queue.splice(i, 1);
i--;
}
}
while (this.throttleData.pending.length > 0 && this.throttleData.queue.length < this.throttleData.pending[0][2])
{
var t = this.throttleData.pending.shift();
this.throttleData.queue.push(t[0]);
process.nextTick(t[1]);
}
} }
function runParallel(threads, done) function runParallel(threads, done)