diff --git a/example.js b/example.js index 454ba23..b197136 100644 --- a/example.js +++ b/example.js @@ -21,8 +21,6 @@ function* test(thread) return 'result'; } -gen.run(test, null, function(result) { console.log(result); }); - function* test_throttle(thread) { yield thread.throttle(5); @@ -38,5 +36,25 @@ function* other_gen(thread) console.log('finished in another generator'); } -for (var i = 0; i < 15; i++) - gen.run(test_throttle); +function* test_throw(thread) +{ + var cb = thread.errorfirst(); + try + { + yield setTimeout(function() { cb(new Error()); }, 500); + } + catch (e) + { + console.log('Catched '+e.stack); + } + console.log(yield setTimeout(thread.cb(), 500)); + console.log('sleep'); + console.log(yield setTimeout(thread.cb(), 500)); + console.log('continue'); +} + +//gen.run(test, null, function(result) { console.log(result); }); + +//for (var i = 0; i < 15; i++) gen.run(test_throttle); + +gen.run(test_throw); diff --git a/index.js b/index.js index a0cb8a8..55ed2c8 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,7 @@ // Yet Another Hack to fight node.js callback hell: generator-based coroutines // Distinctive features: -// - simple to use: does not require modifications of existing callback-based code -// - checks control flow safely +// - simple to use: does not require modifications of existing callback or promise based code +// - safely checks control flow module.exports.run = runThread; module.exports.runParallel = runParallel; @@ -11,6 +11,7 @@ function runThread(generator, arg, finishCallback) var thread = function() { continueThread.apply(thread) }; thread.throttle = throttleThread; thread.cb = threadCallback.bind(thread); + thread.errorfirst = errorFirst.bind(thread); thread._gen = generator(thread, arg); thread._finishThrottleQueue = finishThrottleQueue.bind(thread); thread._finishCallback = finishCallback; @@ -21,11 +22,15 @@ function runThread(generator, arg, finishCallback) function continueThread() { // pass parameters as yield result - var pass = Array.prototype.slice.call(arguments, 0); + callGen(this, 'next', Array.prototype.slice.call(arguments, 0)); +} + +function callGen(thread, method, arg) +{ var v; try { - v = this._gen.next(pass); + v = thread._gen[method](arg); } catch (e) { @@ -34,13 +39,22 @@ function continueThread() if (v.done) { // generator finished - this._done = true; - process.nextTick(this._finishThrottleQueue); + thread._done = true; + process.nextTick(thread._finishThrottleQueue); } if (v.error) throw v.error; - if (v.done && this._finishCallback) - this._finishCallback(v.value); + if (v.done && thread._finishCallback) + thread._finishCallback(v.value); + if (typeof v.value == 'object' && v.value.then) + { + // check if v.value is a Promise + var cb = thread.cb(); + v.value.then(cb, function(error) + { + callGen(thread, 'throw', error); + }); + } } function threadCallback() @@ -63,6 +77,28 @@ function threadCallback() return fn; } +function errorFirst() +{ + var thread = this; + var fn = function() + { + if (thread._current != fn) + { + throw new Error('Broken control flow! Callback'+ + thread._current._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+ + '\nmust be called to resume thread, but this one is called instead:'+ + fn._stack.replace(/^\s*Error\s*at Function\.thread\.cb\s*\([^)]*\)/, '')+'\n--' + ); + } + if (arguments[0]) + return callGen(thread, 'throw', arguments[0]); + return callGen(thread, Array.prototype.slice.call(arguments, 0)); + }; + fn._stack = new Error().stack; + thread._current = fn; + return fn; +} + function throttleThread(count) { if (!this.throttleData)