gen-thread/index.js

123 lines
3.0 KiB
JavaScript
Raw Normal View History

2016-06-27 16:03:06 +03:00
module.exports.run = runThread;
module.exports.runParallel = runParallel;
2016-07-17 00:35:35 +03:00
var q = [];
var pending = [];
function finishq()
{
for (var i = 0; i < q.length; i++)
{
if (q[i]._done)
{
q.splice(i, 1);
i--;
}
}
while (pending.length > 0 && q.length < pending[0][2])
2016-07-17 00:35:35 +03:00
{
var t = pending.shift();
q.push(t[0]);
process.nextTick(t[1]);
2016-07-17 00:35:35 +03:00
}
}
var tid = 0;
2016-06-27 16:03:06 +03:00
function runThread(main, arg, done)
{
var thread = function()
{
// pass parameters as yield result
var pass = Array.prototype.slice.call(arguments, 0);
var v;
2016-07-17 00:35:35 +03:00
try
{
2016-07-19 13:38:35 +03:00
v = thread.gens[0].next(pass);
2016-07-17 00:35:35 +03:00
}
catch (e)
{
v = { done: 1, error: e };
2016-07-17 00:35:35 +03:00
}
if (v.done)
2016-07-19 13:38:35 +03:00
{
// generator finished
thread.gens.shift();
if (thread.gens.length)
{
// return to previous generator
thread(v.value);
return;
}
}
if (typeof v.value == 'object' &&
v.value.constructor.constructor == thread.gens[0].constructor.constructor)
{
// another generator instance returned - add it to stack and call
thread.gens.unshift(v.value);
thread();
return;
}
if (!thread.gens.length)
2016-07-17 00:35:35 +03:00
{
thread._done = true;
process.nextTick(finishq);
2016-07-17 00:35:35 +03:00
}
if (v.error)
throw v.error;
2016-07-19 13:38:35 +03:00
if (!thread.gens.length && done)
2016-06-27 16:03:06 +03:00
done(v.value);
};
2016-07-17 00:35:35 +03:00
thread.id = tid++;
2016-07-19 13:38:35 +03:00
thread.gens = [ main(thread, arg) ];
2016-07-17 00:35:35 +03:00
thread.throttle = function(count)
{
finishq();
if (q.length < count)
{
q.push(thread);
process.nextTick(thread.cb());
2016-07-17 00:35:35 +03:00
}
else
{
pending.push([ thread, thread.cb(), count ]);
2016-07-17 00:35:35 +03:00
}
};
thread.cb = function()
{
var fn = function()
{
if (thread._current != fn)
{
throw new Error('Broken control flow! Callback'+
thread._current._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+
'\nmust be called to resume thread, but this one is called instead:'+
fn._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+'\n--'
);
}
return thread.apply(thread, arguments);
};
fn._stack = new Error().stack;
thread._current = fn;
return fn;
};
2016-06-27 16:03:06 +03:00
thread();
2016-07-19 13:38:35 +03:00
return thread;
2016-06-27 16:03:06 +03:00
}
function runParallel(threads, done)
{
var results = [];
var resultCount = 0;
var allDone = function(i, result)
{
if (!results[i])
{
results[i] = result;
resultCount++;
if (resultCount == threads.length)
done(results);
}
};
threads.map((t, i) => runThread(t, null, function(result) { allDone(i, result); }));
}