From 6c553f51f1759c5f8a96ab78a73e4ac11d3e4717 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 20 Jul 2016 01:09:27 +0300 Subject: [PATCH] Slightly structure gen-thread code --- index.js | 192 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 100 insertions(+), 92 deletions(-) diff --git a/index.js b/index.js index 0d0014c..382c803 100644 --- a/index.js +++ b/index.js @@ -1,109 +1,117 @@ +// Yet Another Hack to fight node.js callback hell: generator-based coroutines + module.exports.run = runThread; module.exports.runParallel = runParallel; -var q = []; -var pending = []; - -function finishq() +function runThread(main, arg, done) { - for (var i = 0; i < q.length; i++) + var thread = function() { continueThread.apply(thread) }; + thread._gens = [ main(thread, arg) ]; + thread.throttle = throttleThread; + thread.cb = threadCallback.bind(thread); + thread._finishThrottleQueue = finishThrottleQueue.bind(thread); + thread._finishCallback = done; + thread(); + return thread; +} + +function continueThread() +{ + // pass parameters as yield result + var pass = Array.prototype.slice.call(arguments, 0); + var v; + try { - if (q[i]._done) + v = this._gens[0].next(pass); + } + catch (e) + { + v = { done: 1, error: e }; + } + if (v.done) + { + // generator finished + this._gens.shift(); + if (this._gens.length) { - q.splice(i, 1); + // return to previous generator + this(v.value); + return; + } + } + if (typeof v.value == 'object' && + v.value.constructor.constructor == this._gens[0].constructor.constructor) + { + // another generator instance returned - add it to stack and call + this._gens.unshift(v.value); + this(); + return; + } + if (!this._gens.length) + { + this._done = true; + process.nextTick(this._finishThrottleQueue); + } + if (v.error) + throw v.error; + if (!this._gens.length && this._finishCallback) + this._finishCallback(v.value); +} + +function threadCallback() +{ + var thread = this; + var fn = function() + { + if (thread._current != fn) + { + throw new Error('Broken control flow! Callback'+ + thread._current._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+ + '\nmust be called to resume thread, but this one is called instead:'+ + fn._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+'\n--' + ); + } + return thread.apply(thread, arguments); + }; + fn._stack = new Error().stack; + thread._current = fn; + return fn; +} + +function throttleThread(count) +{ + if (!this.throttleData) + this.throttleData = this._gens[0].__proto__._genThreadThrottle = this._gens[0].__proto__._genThreadThrottle || { queue: [], pending: [] }; + this._finishThrottleQueue(); + if (this.throttleData.queue.length < count) + { + this.throttleData.queue.push(this); + process.nextTick(this.cb()); + } + else + this.throttleData.pending.push([ this, this.cb(), count ]); +} + +function finishThrottleQueue() +{ + if (!this.throttleData) + return; + for (var i = 0; i < this.throttleData.queue.length; i++) + { + if (this.throttleData.queue[i]._done) + { + this.throttleData.queue.splice(i, 1); i--; } } - while (pending.length > 0 && q.length < pending[0][2]) + while (this.throttleData.pending.length > 0 && this.throttleData.queue.length < this.throttleData.pending[0][2]) { - var t = pending.shift(); - q.push(t[0]); + var t = this.throttleData.pending.shift(); + this.throttleData.queue.push(t[0]); process.nextTick(t[1]); } } -var tid = 0; -function runThread(main, arg, done) -{ - var thread = function() - { - // pass parameters as yield result - var pass = Array.prototype.slice.call(arguments, 0); - var v; - try - { - v = thread.gens[0].next(pass); - } - catch (e) - { - v = { done: 1, error: e }; - } - if (v.done) - { - // generator finished - thread.gens.shift(); - if (thread.gens.length) - { - // return to previous generator - thread(v.value); - return; - } - } - if (typeof v.value == 'object' && - v.value.constructor.constructor == thread.gens[0].constructor.constructor) - { - // another generator instance returned - add it to stack and call - thread.gens.unshift(v.value); - thread(); - return; - } - if (!thread.gens.length) - { - thread._done = true; - process.nextTick(finishq); - } - if (v.error) - throw v.error; - if (!thread.gens.length && done) - done(v.value); - }; - thread.id = tid++; - thread.gens = [ main(thread, arg) ]; - thread.throttle = function(count) - { - finishq(); - if (q.length < count) - { - q.push(thread); - process.nextTick(thread.cb()); - } - else - { - pending.push([ thread, thread.cb(), count ]); - } - }; - thread.cb = function() - { - var fn = function() - { - if (thread._current != fn) - { - throw new Error('Broken control flow! Callback'+ - thread._current._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+ - '\nmust be called to resume thread, but this one is called instead:'+ - fn._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+'\n--' - ); - } - return thread.apply(thread, arguments); - }; - fn._stack = new Error().stack; - thread._current = fn; - return fn; - }; - thread(); - return thread; -} - function runParallel(threads, done) { var results = [];