diff --git a/example.js b/example.js index 91c0e58..45ca30a 100644 --- a/example.js +++ b/example.js @@ -1,44 +1,47 @@ var gen = require('./gen-thread.js'); -function* test(thread) +function* test() { console.log('start'); - console.log([ 'next', yield setTimeout(function() { thread('zhopa', 123); }, 500) ]); + var cb = gen.unsafe(); + console.log([ 'next', yield setTimeout(function() { cb('zhopa', 123); }, 500) ]); var args = yield gen.runParallel([ - function*(thread) + (function*() { - yield setTimeout(function() { thread('callback 1'); }, 500); + var cb = gen.unsafe(); + yield setTimeout(function() { cb('callback 1'); }, 500); return 'result 1'; - }, - function*(thread) + })(), + (function*() { - yield setTimeout(function() { thread('callback 2'); }, 500); + var cb = gen.unsafe(); + yield setTimeout(function() { cb('callback 2'); }, 500); return 'result 2'; - } - ], thread); + })() + ], gen.cb()); console.log('abc'); console.log(args); return 'result'; } -function* test_throttle(thread) +function* test_throttle() { - yield thread.throttle(5); + yield gen.throttle(5); console.log('at most 5'); - yield setTimeout(thread, 1000); + yield setTimeout(gen.cb(), 1000); console.log('continue in another generator'); - yield* other_gen(thread); // same as 'yield gen.run(other_gen, null, thread)' + yield* other_gen(); // same as 'yield gen.run(other_gen, gen.cb())' } -function* other_gen(thread) +function* other_gen() { - yield setTimeout(thread, 1000); + yield setTimeout(gen.cb(), 1000); console.log('finished in another generator'); } -function* test_throw(thread) +function* test_throw() { - var cb = thread.errorfirst(); + var cb = gen.errorfirst(); try { yield setTimeout(function() { cb(new Error()); }, 500); @@ -47,13 +50,13 @@ function* test_throw(thread) { console.log('Catched '+e.stack); } - console.log(yield setTimeout(thread.cb(), 500)); + console.log(yield setTimeout(gen.cb(), 500)); console.log('sleep'); - console.log(yield setTimeout(thread.cb(), 500)); + console.log(yield setTimeout(gen.cb(), 500)); console.log('continue'); } -gen.run(test, null, function(result) { console.log(result); }); +gen.run(test, function(result) { console.log(result); }); for (var i = 0; i < 15; i++) gen.run(test_throttle); diff --git a/index.js b/index.js index c788d1c..83dcf5b 100644 --- a/index.js +++ b/index.js @@ -1,20 +1,40 @@ // Yet Another Hack to fight node.js callback hell: generator-based coroutines // Distinctive features: -// - simple to use: does not require modifications of existing callback or promise based code +// - simple to use: does not require promisification of existing callback-based code // - safely checks control flow module.exports.run = runThread; module.exports.runParallel = runParallel; -function runThread(generator, arg, finishCallback) +var current; + +module.exports.unsafe = function() +{ + return current; +}; + +module.exports.callback = module.exports.cb = function() +{ + return threadCallback.call(current); +}; + +module.exports.errorfirst = module.exports.ef = function() +{ + return errorFirst.call(current); +}; + +module.exports.throttle = function(count) +{ + return throttleThread.call(current, count); +}; + +function runThread(generator, onsuccess, onerror) { var thread = function() { continueThread.apply(thread, arguments) }; - thread.throttle = throttleThread; - thread.cb = threadCallback.bind(thread); - thread.ef = thread.errorfirst = errorFirst.bind(thread); - thread._gen = generator(thread, arg); + thread._gen = generator.next ? generator : generator(); thread._finishThrottleQueue = finishThrottleQueue.bind(thread); - thread._finishCallback = finishCallback; + thread._onsuccess = onsuccess; + thread._onerror = onerror; thread(); return thread; } @@ -28,28 +48,35 @@ function continueThread() function callGen(thread, method, arg) { var v; + current = thread; try { v = thread._gen[method](arg); } catch (e) { - v = { done: 1, error: e }; + v = { error: e }; } - if (v.done) + current = null; + if (v.done || v.error) { // generator finished thread._done = true; process.nextTick(thread._finishThrottleQueue); } if (v.error) - throw v.error; - if (v.done && thread._finishCallback) - thread._finishCallback(v.value); - if (typeof v.value == 'object' && v.value.then) + { + if (thread._onerror) + thread._onerror(v.error); + else + throw v.error; + } + else if (v.done && thread._onsuccess) + thread._onsuccess(v.value); + else if (typeof v.value == 'object' && v.value.then) { // check if v.value is a Promise - var cb = thread.cb(); + var cb = threadCallback.call(current); v.value.then(cb, function(error) { callGen(thread, 'throw', error); @@ -107,10 +134,10 @@ function throttleThread(count) if (this.throttleData.queue.length < count) { this.throttleData.queue.push(this); - process.nextTick(this.cb()); + process.nextTick(threadCallback.call(this)); } else - this.throttleData.pending.push([ this, this.cb(), count ]); + this.throttleData.pending.push([ this, threadCallback.call(this), count ]); } function finishThrottleQueue() @@ -136,16 +163,20 @@ function finishThrottleQueue() function runParallel(threads, done) { var results = []; + var errors = []; var resultCount = 0; - var allDone = function(i, result) + var allDone = function(i, result, error) { - if (!results[i]) + if (!results[i] && !errors[i]) { - results[i] = result; + if (error) + errors[i] = error; + else + results[i] = result; resultCount++; if (resultCount == threads.length) - done(results); + done(results, errors); } }; - threads.map((t, i) => runThread(t, null, function(result) { allDone(i, result); })); + threads.map((t, i) => runThread(t, function(result) { allDone(i, result); }, function(error) { allDone(i, null, error); })); }