Compare commits
1 Commits
developmen
...
bugfix/S3C
Author | SHA1 | Date |
---|---|---|
Jonathan Gramain | 94c10e4383 |
1
index.js
1
index.js
|
@ -28,6 +28,7 @@ module.exports = {
|
||||||
LRUCache: require('./lib/algos/cache/LRUCache'),
|
LRUCache: require('./lib/algos/cache/LRUCache'),
|
||||||
},
|
},
|
||||||
stream: {
|
stream: {
|
||||||
|
SerialStream: require('./lib/algos/stream/SerialStream'),
|
||||||
MergeStream: require('./lib/algos/stream/MergeStream'),
|
MergeStream: require('./lib/algos/stream/MergeStream'),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
const stream = require('stream');
|
||||||
|
|
||||||
|
class SerialStream extends stream.Readable {
|
||||||
|
constructor(stream1, stream2) {
|
||||||
|
super({ objectMode: true });
|
||||||
|
|
||||||
|
this._streams = [stream1, stream2];
|
||||||
|
this._currentStream = stream1;
|
||||||
|
this._streamToResume = null;
|
||||||
|
|
||||||
|
stream1.on('data', item => this._onItem(stream1, item));
|
||||||
|
stream1.once('end', () => this._onEndStream1());
|
||||||
|
stream1.once('error', err => this._onError(stream1, err));
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
if (this._streamToResume) {
|
||||||
|
this._streamToResume.resume();
|
||||||
|
this._streamToResume = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_destroy(err, callback) {
|
||||||
|
this._currentStream.destroy();
|
||||||
|
if (this._currentStream === this._streams[0]) {
|
||||||
|
this._streams[1].destroy();
|
||||||
|
}
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
_onItem(myStream, item) {
|
||||||
|
if (!this.push(item)) {
|
||||||
|
myStream.pause();
|
||||||
|
this._streamToResume = myStream;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_onEndStream1() {
|
||||||
|
// stream1 is done, now move on with data from stream2
|
||||||
|
const stream2 = this._streams[1];
|
||||||
|
stream2.on('data', item => this._onItem(stream2, item));
|
||||||
|
stream2.once('end', () => this._onEnd());
|
||||||
|
stream2.once('error', err => this._onError(stream2, err));
|
||||||
|
}
|
||||||
|
|
||||||
|
_onEnd() {
|
||||||
|
this.push(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
_onError(myStream, err) {
|
||||||
|
this.emit('error', err);
|
||||||
|
this._destroy(err, () => {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = SerialStream;
|
|
@ -1,38 +1,6 @@
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const stream = require('stream');
|
|
||||||
const MergeStream = require('../../../../lib/algos/stream/MergeStream');
|
const MergeStream = require('../../../../lib/algos/stream/MergeStream');
|
||||||
|
const Streamify = require('./Streamify');
|
||||||
class Streamify extends stream.Readable {
|
|
||||||
constructor(objectsToSend, errorAtEnd) {
|
|
||||||
super({ objectMode: true });
|
|
||||||
this._remaining = Array.from(objectsToSend);
|
|
||||||
this._remaining.reverse();
|
|
||||||
this._errorAtEnd = errorAtEnd || false;
|
|
||||||
this._ended = false;
|
|
||||||
this._destroyed = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
_read() {
|
|
||||||
process.nextTick(() => {
|
|
||||||
while (this._remaining.length > 0) {
|
|
||||||
const item = this._remaining.pop();
|
|
||||||
if (!this.push(item)) {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (this._errorAtEnd) {
|
|
||||||
return this.emit('error', new Error('OOPS'));
|
|
||||||
}
|
|
||||||
this._ended = true;
|
|
||||||
return this.push(null);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
_destroy(err, callback) {
|
|
||||||
this._destroyed = true;
|
|
||||||
callback();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function readAll(stream, usePauseResume, cb) {
|
function readAll(stream, usePauseResume, cb) {
|
||||||
const result = [];
|
const result = [];
|
||||||
|
@ -168,12 +136,16 @@ describe('MergeStream', () => {
|
||||||
`${testCasePretty(testCase, false)}` +
|
`${testCasePretty(testCase, false)}` +
|
||||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||||
`${errorAtEnd ? ' with error' : ''}`;
|
`${errorAtEnd ? ' with error' : ''}`;
|
||||||
|
const testDescRev =
|
||||||
|
`${testCasePretty(testCase, true)}` +
|
||||||
|
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||||
|
`${errorAtEnd ? ' with error' : ''}`;
|
||||||
it(`should cover ${testDesc}`, done => {
|
it(`should cover ${testDesc}`, done => {
|
||||||
testMergeStreamWithIntegers(
|
testMergeStreamWithIntegers(
|
||||||
testCase.stream1, testCase.stream2,
|
testCase.stream1, testCase.stream2,
|
||||||
usePauseResume, errorAtEnd, done);
|
usePauseResume, errorAtEnd, done);
|
||||||
});
|
});
|
||||||
it(`should cover ${testDesc}`, done => {
|
it(`should cover ${testDescRev}`, done => {
|
||||||
testMergeStreamWithIntegers(
|
testMergeStreamWithIntegers(
|
||||||
testCase.stream2, testCase.stream1,
|
testCase.stream2, testCase.stream1,
|
||||||
usePauseResume, errorAtEnd, done);
|
usePauseResume, errorAtEnd, done);
|
||||||
|
|
|
@ -0,0 +1,153 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
const SerialStream = require('../../../../lib/algos/stream/SerialStream');
|
||||||
|
const Streamify = require('./Streamify');
|
||||||
|
|
||||||
|
function readAll(stream, usePauseResume, cb) {
|
||||||
|
const result = [];
|
||||||
|
stream.on('data', item => {
|
||||||
|
result.push(item);
|
||||||
|
if (usePauseResume) {
|
||||||
|
stream.pause();
|
||||||
|
setTimeout(() => stream.resume(), 1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
stream.once('end', () => cb(null, result));
|
||||||
|
stream.once('error', err => cb(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
function testSerialStreamWithIntegers(contents1, contents2,
|
||||||
|
usePauseResume, errorAtEnd, cb) {
|
||||||
|
const expectedItems = contents1.concat(contents2);
|
||||||
|
const serialStream = new SerialStream(
|
||||||
|
new Streamify(contents1, errorAtEnd)
|
||||||
|
.on('error', () => {}),
|
||||||
|
new Streamify(contents2)
|
||||||
|
.on('error', () => {}));
|
||||||
|
readAll(serialStream, usePauseResume, (err, readItems) => {
|
||||||
|
if (errorAtEnd) {
|
||||||
|
assert(err);
|
||||||
|
} else {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(readItems, expectedItems);
|
||||||
|
}
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function testCasePretty(testCase, reversed) {
|
||||||
|
const desc1 = JSON.stringify(
|
||||||
|
reversed ? testCase.stream2 : testCase.stream1);
|
||||||
|
const desc2 = JSON.stringify(
|
||||||
|
reversed ? testCase.stream1 : testCase.stream2);
|
||||||
|
return `${desc1} concatenated with ${desc2}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('SerialStream', () => {
|
||||||
|
[
|
||||||
|
{
|
||||||
|
stream1: [],
|
||||||
|
stream2: [],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
stream1: [0],
|
||||||
|
stream2: [],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
stream1: [0, 1, 2, 3, 4],
|
||||||
|
stream2: [],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
stream1: [0],
|
||||||
|
stream2: [1],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
stream1: [1, 2, 3, 4, 5],
|
||||||
|
stream2: [6],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
stream1: [1, 2, 3, 4, 5],
|
||||||
|
stream2: [6, 7, 8, 9, 10],
|
||||||
|
},
|
||||||
|
].forEach(testCase => {
|
||||||
|
[false, true].forEach(usePauseResume => {
|
||||||
|
[false, true].forEach(errorAtEnd => {
|
||||||
|
const testDesc =
|
||||||
|
`${testCasePretty(testCase, false)}` +
|
||||||
|
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||||
|
`${errorAtEnd ? ' with error' : ''}`;
|
||||||
|
const testDescRev =
|
||||||
|
`${testCasePretty(testCase, true)}` +
|
||||||
|
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||||
|
`${errorAtEnd ? ' with error' : ''}`;
|
||||||
|
it(`should cover ${testDesc}`, done => {
|
||||||
|
testSerialStreamWithIntegers(
|
||||||
|
testCase.stream1, testCase.stream2,
|
||||||
|
usePauseResume, errorAtEnd, done);
|
||||||
|
});
|
||||||
|
it(`should cover ${testDescRev}`, done => {
|
||||||
|
testSerialStreamWithIntegers(
|
||||||
|
testCase.stream2, testCase.stream1,
|
||||||
|
usePauseResume, errorAtEnd, done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
[100, 1000, 10000, 100000].forEach(nbEntries => {
|
||||||
|
[false, true].forEach(usePauseResume => {
|
||||||
|
[false, true].forEach(errorAtEnd => {
|
||||||
|
if ((!usePauseResume && !errorAtEnd) || nbEntries <= 1000) {
|
||||||
|
const fixtureDesc =
|
||||||
|
`${usePauseResume ? ' with pause/resume' : ''}` +
|
||||||
|
`${errorAtEnd ? ' with error' : ''}`;
|
||||||
|
it(`${nbEntries} entries${fixtureDesc}`,
|
||||||
|
function bigConcatSequential(done) {
|
||||||
|
this.timeout(10000);
|
||||||
|
const stream1 = [];
|
||||||
|
const stream2 = [];
|
||||||
|
for (let i = 0; i < nbEntries / 2; ++i) {
|
||||||
|
stream1.push(i);
|
||||||
|
}
|
||||||
|
for (let i = nbEntries / 2; i < nbEntries; ++i) {
|
||||||
|
stream2.push(i);
|
||||||
|
}
|
||||||
|
testSerialStreamWithIntegers(
|
||||||
|
stream1, stream2, usePauseResume, errorAtEnd, done);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
// with 3 items per input stream, we reach the end of stream even
|
||||||
|
// though destroy() has been called (due to buffering), while with
|
||||||
|
// 100 items input streams are aborted before emitting the 'end'
|
||||||
|
// event, so it's useful to test both cases
|
||||||
|
[3, 100].forEach(nbItemsPerStream => {
|
||||||
|
it(`destroy() should destroy both inner streams with ${nbItemsPerStream} items per stream`,
|
||||||
|
done => {
|
||||||
|
const stream1 = new Streamify(new Array(nbItemsPerStream).fill()
|
||||||
|
.map((e, i) => i));
|
||||||
|
const stream2 = new Streamify(new Array(nbItemsPerStream).fill()
|
||||||
|
.map((e, i) => nbItemsPerStream + i));
|
||||||
|
const serialStream = new SerialStream(stream1, stream2);
|
||||||
|
serialStream.on('data', item => {
|
||||||
|
if (item === 5) {
|
||||||
|
serialStream.destroy();
|
||||||
|
const s1ended = stream1._ended;
|
||||||
|
const s2ended = stream2._ended;
|
||||||
|
setTimeout(() => {
|
||||||
|
if (!s1ended) {
|
||||||
|
assert(stream1._destroyed);
|
||||||
|
}
|
||||||
|
if (!s2ended) {
|
||||||
|
assert(stream2._destroyed);
|
||||||
|
}
|
||||||
|
done();
|
||||||
|
}, 10);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
serialStream.once('error', err => {
|
||||||
|
assert.fail(`unexpected error: ${err.message}`);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,35 @@
|
||||||
|
const stream = require('stream');
|
||||||
|
|
||||||
|
class Streamify extends stream.Readable {
|
||||||
|
constructor(objectsToSend, errorAtEnd) {
|
||||||
|
super({ objectMode: true });
|
||||||
|
this._remaining = Array.from(objectsToSend);
|
||||||
|
this._remaining.reverse();
|
||||||
|
this._errorAtEnd = errorAtEnd || false;
|
||||||
|
this._ended = false;
|
||||||
|
this._destroyed = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
process.nextTick(() => {
|
||||||
|
while (this._remaining.length > 0) {
|
||||||
|
const item = this._remaining.pop();
|
||||||
|
if (!this.push(item)) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (this._errorAtEnd) {
|
||||||
|
return this.emit('error', new Error('OOPS'));
|
||||||
|
}
|
||||||
|
this._ended = true;
|
||||||
|
return this.push(null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_destroy(err, callback) {
|
||||||
|
this._destroyed = true;
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = Streamify;
|
Loading…
Reference in New Issue