Compare commits
1 Commits
developmen
...
bugfix/S3C
Author | SHA1 | Date |
---|---|---|
Jonathan Gramain | 94c10e4383 |
1
index.js
1
index.js
|
@ -28,6 +28,7 @@ module.exports = {
|
|||
LRUCache: require('./lib/algos/cache/LRUCache'),
|
||||
},
|
||||
stream: {
|
||||
SerialStream: require('./lib/algos/stream/SerialStream'),
|
||||
MergeStream: require('./lib/algos/stream/MergeStream'),
|
||||
},
|
||||
},
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
const stream = require('stream');
|
||||
|
||||
class SerialStream extends stream.Readable {
|
||||
constructor(stream1, stream2) {
|
||||
super({ objectMode: true });
|
||||
|
||||
this._streams = [stream1, stream2];
|
||||
this._currentStream = stream1;
|
||||
this._streamToResume = null;
|
||||
|
||||
stream1.on('data', item => this._onItem(stream1, item));
|
||||
stream1.once('end', () => this._onEndStream1());
|
||||
stream1.once('error', err => this._onError(stream1, err));
|
||||
}
|
||||
|
||||
_read() {
|
||||
if (this._streamToResume) {
|
||||
this._streamToResume.resume();
|
||||
this._streamToResume = null;
|
||||
}
|
||||
}
|
||||
|
||||
_destroy(err, callback) {
|
||||
this._currentStream.destroy();
|
||||
if (this._currentStream === this._streams[0]) {
|
||||
this._streams[1].destroy();
|
||||
}
|
||||
callback();
|
||||
}
|
||||
|
||||
_onItem(myStream, item) {
|
||||
if (!this.push(item)) {
|
||||
myStream.pause();
|
||||
this._streamToResume = myStream;
|
||||
}
|
||||
}
|
||||
|
||||
_onEndStream1() {
|
||||
// stream1 is done, now move on with data from stream2
|
||||
const stream2 = this._streams[1];
|
||||
stream2.on('data', item => this._onItem(stream2, item));
|
||||
stream2.once('end', () => this._onEnd());
|
||||
stream2.once('error', err => this._onError(stream2, err));
|
||||
}
|
||||
|
||||
_onEnd() {
|
||||
this.push(null);
|
||||
}
|
||||
|
||||
_onError(myStream, err) {
|
||||
this.emit('error', err);
|
||||
this._destroy(err, () => {});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = SerialStream;
|
|
@ -1,38 +1,6 @@
|
|||
const assert = require('assert');
|
||||
const stream = require('stream');
|
||||
const MergeStream = require('../../../../lib/algos/stream/MergeStream');
|
||||
|
||||
class Streamify extends stream.Readable {
|
||||
constructor(objectsToSend, errorAtEnd) {
|
||||
super({ objectMode: true });
|
||||
this._remaining = Array.from(objectsToSend);
|
||||
this._remaining.reverse();
|
||||
this._errorAtEnd = errorAtEnd || false;
|
||||
this._ended = false;
|
||||
this._destroyed = false;
|
||||
}
|
||||
|
||||
_read() {
|
||||
process.nextTick(() => {
|
||||
while (this._remaining.length > 0) {
|
||||
const item = this._remaining.pop();
|
||||
if (!this.push(item)) {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
if (this._errorAtEnd) {
|
||||
return this.emit('error', new Error('OOPS'));
|
||||
}
|
||||
this._ended = true;
|
||||
return this.push(null);
|
||||
});
|
||||
}
|
||||
|
||||
_destroy(err, callback) {
|
||||
this._destroyed = true;
|
||||
callback();
|
||||
}
|
||||
}
|
||||
const Streamify = require('./Streamify');
|
||||
|
||||
function readAll(stream, usePauseResume, cb) {
|
||||
const result = [];
|
||||
|
@ -168,12 +136,16 @@ describe('MergeStream', () => {
|
|||
`${testCasePretty(testCase, false)}` +
|
||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||
`${errorAtEnd ? ' with error' : ''}`;
|
||||
const testDescRev =
|
||||
`${testCasePretty(testCase, true)}` +
|
||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||
`${errorAtEnd ? ' with error' : ''}`;
|
||||
it(`should cover ${testDesc}`, done => {
|
||||
testMergeStreamWithIntegers(
|
||||
testCase.stream1, testCase.stream2,
|
||||
usePauseResume, errorAtEnd, done);
|
||||
});
|
||||
it(`should cover ${testDesc}`, done => {
|
||||
it(`should cover ${testDescRev}`, done => {
|
||||
testMergeStreamWithIntegers(
|
||||
testCase.stream2, testCase.stream1,
|
||||
usePauseResume, errorAtEnd, done);
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
const assert = require('assert');
|
||||
const SerialStream = require('../../../../lib/algos/stream/SerialStream');
|
||||
const Streamify = require('./Streamify');
|
||||
|
||||
function readAll(stream, usePauseResume, cb) {
|
||||
const result = [];
|
||||
stream.on('data', item => {
|
||||
result.push(item);
|
||||
if (usePauseResume) {
|
||||
stream.pause();
|
||||
setTimeout(() => stream.resume(), 1);
|
||||
}
|
||||
});
|
||||
stream.once('end', () => cb(null, result));
|
||||
stream.once('error', err => cb(err));
|
||||
}
|
||||
|
||||
function testSerialStreamWithIntegers(contents1, contents2,
|
||||
usePauseResume, errorAtEnd, cb) {
|
||||
const expectedItems = contents1.concat(contents2);
|
||||
const serialStream = new SerialStream(
|
||||
new Streamify(contents1, errorAtEnd)
|
||||
.on('error', () => {}),
|
||||
new Streamify(contents2)
|
||||
.on('error', () => {}));
|
||||
readAll(serialStream, usePauseResume, (err, readItems) => {
|
||||
if (errorAtEnd) {
|
||||
assert(err);
|
||||
} else {
|
||||
assert.ifError(err);
|
||||
assert.deepStrictEqual(readItems, expectedItems);
|
||||
}
|
||||
cb();
|
||||
});
|
||||
}
|
||||
|
||||
function testCasePretty(testCase, reversed) {
|
||||
const desc1 = JSON.stringify(
|
||||
reversed ? testCase.stream2 : testCase.stream1);
|
||||
const desc2 = JSON.stringify(
|
||||
reversed ? testCase.stream1 : testCase.stream2);
|
||||
return `${desc1} concatenated with ${desc2}`;
|
||||
}
|
||||
|
||||
describe('SerialStream', () => {
|
||||
[
|
||||
{
|
||||
stream1: [],
|
||||
stream2: [],
|
||||
},
|
||||
{
|
||||
stream1: [0],
|
||||
stream2: [],
|
||||
},
|
||||
{
|
||||
stream1: [0, 1, 2, 3, 4],
|
||||
stream2: [],
|
||||
},
|
||||
{
|
||||
stream1: [0],
|
||||
stream2: [1],
|
||||
},
|
||||
{
|
||||
stream1: [1, 2, 3, 4, 5],
|
||||
stream2: [6],
|
||||
},
|
||||
{
|
||||
stream1: [1, 2, 3, 4, 5],
|
||||
stream2: [6, 7, 8, 9, 10],
|
||||
},
|
||||
].forEach(testCase => {
|
||||
[false, true].forEach(usePauseResume => {
|
||||
[false, true].forEach(errorAtEnd => {
|
||||
const testDesc =
|
||||
`${testCasePretty(testCase, false)}` +
|
||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||
`${errorAtEnd ? ' with error' : ''}`;
|
||||
const testDescRev =
|
||||
`${testCasePretty(testCase, true)}` +
|
||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||
`${errorAtEnd ? ' with error' : ''}`;
|
||||
it(`should cover ${testDesc}`, done => {
|
||||
testSerialStreamWithIntegers(
|
||||
testCase.stream1, testCase.stream2,
|
||||
usePauseResume, errorAtEnd, done);
|
||||
});
|
||||
it(`should cover ${testDescRev}`, done => {
|
||||
testSerialStreamWithIntegers(
|
||||
testCase.stream2, testCase.stream1,
|
||||
usePauseResume, errorAtEnd, done);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
[100, 1000, 10000, 100000].forEach(nbEntries => {
|
||||
[false, true].forEach(usePauseResume => {
|
||||
[false, true].forEach(errorAtEnd => {
|
||||
if ((!usePauseResume && !errorAtEnd) || nbEntries <= 1000) {
|
||||
const fixtureDesc =
|
||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||
`${errorAtEnd ? ' with error' : ''}`;
|
||||
it(`${nbEntries} entries${fixtureDesc}`,
|
||||
function bigConcatSequential(done) {
|
||||
this.timeout(10000);
|
||||
const stream1 = [];
|
||||
const stream2 = [];
|
||||
for (let i = 0; i < nbEntries / 2; ++i) {
|
||||
stream1.push(i);
|
||||
}
|
||||
for (let i = nbEntries / 2; i < nbEntries; ++i) {
|
||||
stream2.push(i);
|
||||
}
|
||||
testSerialStreamWithIntegers(
|
||||
stream1, stream2, usePauseResume, errorAtEnd, done);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
// with 3 items per input stream, we reach the end of stream even
|
||||
// though destroy() has been called (due to buffering), while with
|
||||
// 100 items input streams are aborted before emitting the 'end'
|
||||
// event, so it's useful to test both cases
|
||||
[3, 100].forEach(nbItemsPerStream => {
|
||||
it(`destroy() should destroy both inner streams with ${nbItemsPerStream} items per stream`,
|
||||
done => {
|
||||
const stream1 = new Streamify(new Array(nbItemsPerStream).fill()
|
||||
.map((e, i) => i));
|
||||
const stream2 = new Streamify(new Array(nbItemsPerStream).fill()
|
||||
.map((e, i) => nbItemsPerStream + i));
|
||||
const serialStream = new SerialStream(stream1, stream2);
|
||||
serialStream.on('data', item => {
|
||||
if (item === 5) {
|
||||
serialStream.destroy();
|
||||
const s1ended = stream1._ended;
|
||||
const s2ended = stream2._ended;
|
||||
setTimeout(() => {
|
||||
if (!s1ended) {
|
||||
assert(stream1._destroyed);
|
||||
}
|
||||
if (!s2ended) {
|
||||
assert(stream2._destroyed);
|
||||
}
|
||||
done();
|
||||
}, 10);
|
||||
}
|
||||
});
|
||||
serialStream.once('error', err => {
|
||||
assert.fail(`unexpected error: ${err.message}`);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,35 @@
|
|||
const stream = require('stream');
|
||||
|
||||
class Streamify extends stream.Readable {
|
||||
constructor(objectsToSend, errorAtEnd) {
|
||||
super({ objectMode: true });
|
||||
this._remaining = Array.from(objectsToSend);
|
||||
this._remaining.reverse();
|
||||
this._errorAtEnd = errorAtEnd || false;
|
||||
this._ended = false;
|
||||
this._destroyed = false;
|
||||
}
|
||||
|
||||
_read() {
|
||||
process.nextTick(() => {
|
||||
while (this._remaining.length > 0) {
|
||||
const item = this._remaining.pop();
|
||||
if (!this.push(item)) {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
if (this._errorAtEnd) {
|
||||
return this.emit('error', new Error('OOPS'));
|
||||
}
|
||||
this._ended = true;
|
||||
return this.push(null);
|
||||
});
|
||||
}
|
||||
|
||||
_destroy(err, callback) {
|
||||
this._destroyed = true;
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Streamify;
|
Loading…
Reference in New Issue