Compare commits

...

1 Commits

Author SHA1 Message Date
Jonathan Gramain 94c10e4383 S3C-2987 helper class algo.stream.SerialStream
This helper concatenates two object streams into a single stream.

Will be used by the listing code in RepdServer to handle v0mig
migration stage where two listing on two different ranges need to be
done one after the other.
2020-07-09 16:30:14 -07:00
5 changed files with 251 additions and 34 deletions

View File

@ -28,6 +28,7 @@ module.exports = {
LRUCache: require('./lib/algos/cache/LRUCache'), LRUCache: require('./lib/algos/cache/LRUCache'),
}, },
stream: { stream: {
SerialStream: require('./lib/algos/stream/SerialStream'),
MergeStream: require('./lib/algos/stream/MergeStream'), MergeStream: require('./lib/algos/stream/MergeStream'),
}, },
}, },

View File

@ -0,0 +1,56 @@
const stream = require('stream');
class SerialStream extends stream.Readable {
constructor(stream1, stream2) {
super({ objectMode: true });
this._streams = [stream1, stream2];
this._currentStream = stream1;
this._streamToResume = null;
stream1.on('data', item => this._onItem(stream1, item));
stream1.once('end', () => this._onEndStream1());
stream1.once('error', err => this._onError(stream1, err));
}
_read() {
if (this._streamToResume) {
this._streamToResume.resume();
this._streamToResume = null;
}
}
_destroy(err, callback) {
this._currentStream.destroy();
if (this._currentStream === this._streams[0]) {
this._streams[1].destroy();
}
callback();
}
_onItem(myStream, item) {
if (!this.push(item)) {
myStream.pause();
this._streamToResume = myStream;
}
}
_onEndStream1() {
// stream1 is done, now move on with data from stream2
const stream2 = this._streams[1];
stream2.on('data', item => this._onItem(stream2, item));
stream2.once('end', () => this._onEnd());
stream2.once('error', err => this._onError(stream2, err));
}
_onEnd() {
this.push(null);
}
_onError(myStream, err) {
this.emit('error', err);
this._destroy(err, () => {});
}
}
module.exports = SerialStream;

View File

@ -1,38 +1,6 @@
const assert = require('assert'); const assert = require('assert');
const stream = require('stream');
const MergeStream = require('../../../../lib/algos/stream/MergeStream'); const MergeStream = require('../../../../lib/algos/stream/MergeStream');
const Streamify = require('./Streamify');
class Streamify extends stream.Readable {
constructor(objectsToSend, errorAtEnd) {
super({ objectMode: true });
this._remaining = Array.from(objectsToSend);
this._remaining.reverse();
this._errorAtEnd = errorAtEnd || false;
this._ended = false;
this._destroyed = false;
}
_read() {
process.nextTick(() => {
while (this._remaining.length > 0) {
const item = this._remaining.pop();
if (!this.push(item)) {
return undefined;
}
}
if (this._errorAtEnd) {
return this.emit('error', new Error('OOPS'));
}
this._ended = true;
return this.push(null);
});
}
_destroy(err, callback) {
this._destroyed = true;
callback();
}
}
function readAll(stream, usePauseResume, cb) { function readAll(stream, usePauseResume, cb) {
const result = []; const result = [];
@ -168,12 +136,16 @@ describe('MergeStream', () => {
`${testCasePretty(testCase, false)}` + `${testCasePretty(testCase, false)}` +
`${usePauseResume ? ' with pause/resume' : ''}` + `${usePauseResume ? ' with pause/resume' : ''}` +
`${errorAtEnd ? ' with error' : ''}`; `${errorAtEnd ? ' with error' : ''}`;
const testDescRev =
`${testCasePretty(testCase, true)}` +
`${usePauseResume ? ' with pause/resume' : ''}` +
`${errorAtEnd ? ' with error' : ''}`;
it(`should cover ${testDesc}`, done => { it(`should cover ${testDesc}`, done => {
testMergeStreamWithIntegers( testMergeStreamWithIntegers(
testCase.stream1, testCase.stream2, testCase.stream1, testCase.stream2,
usePauseResume, errorAtEnd, done); usePauseResume, errorAtEnd, done);
}); });
it(`should cover ${testDesc}`, done => { it(`should cover ${testDescRev}`, done => {
testMergeStreamWithIntegers( testMergeStreamWithIntegers(
testCase.stream2, testCase.stream1, testCase.stream2, testCase.stream1,
usePauseResume, errorAtEnd, done); usePauseResume, errorAtEnd, done);

View File

@ -0,0 +1,153 @@
const assert = require('assert');
const SerialStream = require('../../../../lib/algos/stream/SerialStream');
const Streamify = require('./Streamify');
function readAll(stream, usePauseResume, cb) {
const result = [];
stream.on('data', item => {
result.push(item);
if (usePauseResume) {
stream.pause();
setTimeout(() => stream.resume(), 1);
}
});
stream.once('end', () => cb(null, result));
stream.once('error', err => cb(err));
}
function testSerialStreamWithIntegers(contents1, contents2,
usePauseResume, errorAtEnd, cb) {
const expectedItems = contents1.concat(contents2);
const serialStream = new SerialStream(
new Streamify(contents1, errorAtEnd)
.on('error', () => {}),
new Streamify(contents2)
.on('error', () => {}));
readAll(serialStream, usePauseResume, (err, readItems) => {
if (errorAtEnd) {
assert(err);
} else {
assert.ifError(err);
assert.deepStrictEqual(readItems, expectedItems);
}
cb();
});
}
function testCasePretty(testCase, reversed) {
const desc1 = JSON.stringify(
reversed ? testCase.stream2 : testCase.stream1);
const desc2 = JSON.stringify(
reversed ? testCase.stream1 : testCase.stream2);
return `${desc1} concatenated with ${desc2}`;
}
describe('SerialStream', () => {
[
{
stream1: [],
stream2: [],
},
{
stream1: [0],
stream2: [],
},
{
stream1: [0, 1, 2, 3, 4],
stream2: [],
},
{
stream1: [0],
stream2: [1],
},
{
stream1: [1, 2, 3, 4, 5],
stream2: [6],
},
{
stream1: [1, 2, 3, 4, 5],
stream2: [6, 7, 8, 9, 10],
},
].forEach(testCase => {
[false, true].forEach(usePauseResume => {
[false, true].forEach(errorAtEnd => {
const testDesc =
`${testCasePretty(testCase, false)}` +
`${usePauseResume ? ' with pause/resume' : ''}` +
`${errorAtEnd ? ' with error' : ''}`;
const testDescRev =
`${testCasePretty(testCase, true)}` +
`${usePauseResume ? ' with pause/resume' : ''}` +
`${errorAtEnd ? ' with error' : ''}`;
it(`should cover ${testDesc}`, done => {
testSerialStreamWithIntegers(
testCase.stream1, testCase.stream2,
usePauseResume, errorAtEnd, done);
});
it(`should cover ${testDescRev}`, done => {
testSerialStreamWithIntegers(
testCase.stream2, testCase.stream1,
usePauseResume, errorAtEnd, done);
});
});
});
});
[100, 1000, 10000, 100000].forEach(nbEntries => {
[false, true].forEach(usePauseResume => {
[false, true].forEach(errorAtEnd => {
if ((!usePauseResume && !errorAtEnd) || nbEntries <= 1000) {
const fixtureDesc =
`${usePauseResume ? ' with pause/resume' : ''}` +
`${errorAtEnd ? ' with error' : ''}`;
it(`${nbEntries} entries${fixtureDesc}`,
function bigConcatSequential(done) {
this.timeout(10000);
const stream1 = [];
const stream2 = [];
for (let i = 0; i < nbEntries / 2; ++i) {
stream1.push(i);
}
for (let i = nbEntries / 2; i < nbEntries; ++i) {
stream2.push(i);
}
testSerialStreamWithIntegers(
stream1, stream2, usePauseResume, errorAtEnd, done);
});
}
});
});
});
// with 3 items per input stream, we reach the end of stream even
// though destroy() has been called (due to buffering), while with
// 100 items input streams are aborted before emitting the 'end'
// event, so it's useful to test both cases
[3, 100].forEach(nbItemsPerStream => {
it(`destroy() should destroy both inner streams with ${nbItemsPerStream} items per stream`,
done => {
const stream1 = new Streamify(new Array(nbItemsPerStream).fill()
.map((e, i) => i));
const stream2 = new Streamify(new Array(nbItemsPerStream).fill()
.map((e, i) => nbItemsPerStream + i));
const serialStream = new SerialStream(stream1, stream2);
serialStream.on('data', item => {
if (item === 5) {
serialStream.destroy();
const s1ended = stream1._ended;
const s2ended = stream2._ended;
setTimeout(() => {
if (!s1ended) {
assert(stream1._destroyed);
}
if (!s2ended) {
assert(stream2._destroyed);
}
done();
}, 10);
}
});
serialStream.once('error', err => {
assert.fail(`unexpected error: ${err.message}`);
});
});
});
});

View File

@ -0,0 +1,35 @@
const stream = require('stream');
class Streamify extends stream.Readable {
constructor(objectsToSend, errorAtEnd) {
super({ objectMode: true });
this._remaining = Array.from(objectsToSend);
this._remaining.reverse();
this._errorAtEnd = errorAtEnd || false;
this._ended = false;
this._destroyed = false;
}
_read() {
process.nextTick(() => {
while (this._remaining.length > 0) {
const item = this._remaining.pop();
if (!this.push(item)) {
return undefined;
}
}
if (this._errorAtEnd) {
return this.emit('error', new Error('OOPS'));
}
this._ended = true;
return this.push(null);
});
}
_destroy(err, callback) {
this._destroyed = true;
callback();
}
}
module.exports = Streamify;