Compare commits

...

13 Commits

Author SHA1 Message Date
Jonathan Gramain 5f2a4d8e3b [exp] MD-712 implement synchronized M and V in delimiterVersions 2020-05-08 18:17:15 -07:00
Jonathan Gramain 3fd9995b98 bugfix: MD-712 tooling class to merge two sorted streams
Create class MergeStream to merge two readable sorted stream into one
readable stream, providing a comparison function.

This class is used to implement listing in bucket versioning key
format v1, that requires listing master keys and version keys
synchronously.
2020-05-08 15:16:59 -07:00
Jonathan Gramain db53330cf8 [exp] MD-712 fix MPU listing with delimiter 2020-05-06 17:16:22 -07:00
Jonathan Gramain 9f3b2b12c7 [exp] MD-712 fix MPU listing 2020-05-06 17:04:24 -07:00
Jonathan Gramain 9a7c2192e4 [exp] MD-712 tune listing for v1 2020-05-06 14:24:28 -07:00
Jonathan Gramain b5e3b28bca [exp] MD-712 fixes in listing algo 2020-05-06 12:49:05 -07:00
Jonathan Gramain 8e5a97860d [exp] MD-712 extension.filter() now gets the raw listing key 2020-05-06 12:12:29 -07:00
Jonathan Gramain de87c5bbec [exp] MD-712 support v0 and v1 in listing algos 2020-05-06 10:22:55 -07:00
Jonathan Gramain 63e507b3dd [exp] MD-712 re-add genMDParams() in delimiterMaster.js 2020-05-05 14:23:48 -07:00
Jonathan Gramain c8826ecd27 [exp] MD-712 restore delimiter.js to its original state 2020-05-05 12:35:23 -07:00
Jonathan Gramain 14b994b6be [exp] MD-712 remove case of delete marker in delimiterMaster: no more repair task 2020-05-05 12:35:23 -07:00
Jonathan Gramain 1dd93eb95b [exp] MD-712 update for new master/version prefix 2020-05-05 12:35:23 -07:00
Jonathan Gramain bfd65c0467 MD-712 wip
MD-712 wip

MD-712 wip

MD-712 wip

MD-712 remove DelimiterMaster.filter(): rely on Delimiter.filter() instead

MD-712 restore a simplified version of DelimiterMaster.filter() with a specific check to skip repaired delete markers
2020-05-05 12:35:23 -07:00
10 changed files with 536 additions and 66 deletions

View File

@ -27,6 +27,9 @@ module.exports = {
cache: {
LRUCache: require('./lib/algos/cache/LRUCache'),
},
stream: {
MergeStream: require('./lib/algos/stream/MergeStream'),
},
},
policies: {
evaluators: require('./lib/policyEvaluator/evaluator.js'),

View File

@ -2,6 +2,8 @@
const { inc, checkLimit, FILTER_END, FILTER_ACCEPT } = require('./tools');
const DEFAULT_MAX_KEYS = 1000;
const VSConst = require('../../versioning/constants').VersioningConstants;
const { DbPrefixes, BucketVersioningKeyFormat } = VSConst;
function numberDefault(num, defaultNum) {
const parsedNum = Number.parseInt(num, 10);
@ -19,8 +21,9 @@ class MultipartUploads {
* @param {RequestLogger} logger - The logger of the request
* @return {undefined}
*/
constructor(params, logger) {
constructor(params, vFormat, logger) {
this.params = params;
this.vFormat = vFormat;
this.CommonPrefixes = [];
this.Uploads = [];
this.IsTruncated = false;
@ -54,6 +57,18 @@ class MultipartUploads {
}
params.lt = inc(this.params.prefix);
}
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
if (params.gt !== undefined) {
params.gt = `${DbPrefixes.Master}${params.gt}`;
}
if (params.gte !== undefined) {
params.gte = `${DbPrefixes.Master}${params.gte}`;
}
if (params.lt !== undefined) {
params.lt = `${DbPrefixes.Master}${params.lt}`;
}
}
return params;
}
@ -113,7 +128,13 @@ class MultipartUploads {
this.IsTruncated = this.maxKeys > 0;
return FILTER_END;
}
const key = obj.key;
let key;
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
key = obj.key.slice(DbPrefixes.Master.length);
} else {
key = obj.key;
}
const value = obj.value;
if (this.delimiter) {
const mpuPrefixSlice = `overview${this.splitter}`.length;

View File

@ -16,8 +16,9 @@ class List extends Extension {
* @param {RequestLogger} logger - The logger of the request
* @return {undefined}
*/
constructor(parameters, logger) {
constructor(parameters, vFormat, logger) {
super(parameters, logger);
// vFormat is ignored
this.res = [];
if (parameters) {
this.maxKeys = checkLimit(parameters.maxKeys, DEFAULT_MAX_KEYS);

View File

@ -2,18 +2,10 @@
const Extension = require('./Extension').default;
const { inc, FILTER_END, FILTER_ACCEPT, FILTER_SKIP } = require('./tools');
/**
* Find the next delimiter in the path
*
* @param {string} key - path of the object
* @param {string} delimiter - string to find
* @param {number} index - index to start at
* @return {number} delimiterIndex - returns -1 in case no delimiter is found
*/
function nextDelimiter(key, delimiter, index) {
return key.indexOf(delimiter, index);
}
const Version = require('../../versioning/Version').Version;
const VSConst = require('../../versioning/constants').VersioningConstants;
const { DbPrefixes, BucketVersioningKeyFormat } = VSConst;
const VID_SEP = VSConst.VersionId.Separator;
/**
* Find the common prefix in the path
@ -63,7 +55,7 @@ class Delimiter extends Extension {
* @param {RequestLogger} logger - The logger of the
* request
*/
constructor(parameters, logger) {
constructor(parameters, vFormat, logger) {
super(parameters, logger);
// original listing parameters
this.delimiter = parameters.delimiter;
@ -76,6 +68,8 @@ class Delimiter extends Extension {
typeof parameters.alphabeticalOrder !== 'undefined' ?
parameters.alphabeticalOrder : true;
// versioning key format
this.vFormat = vFormat;
// results
this.CommonPrefixes = [];
this.Contents = [];
@ -102,6 +96,14 @@ class Delimiter extends Extension {
}
genMDParams() {
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
return this.genMDParamsV1();
}
return this.genMDParamsV0();
}
genMDParamsV0() {
const params = {};
if (this.prefix) {
params.gte = this.prefix;
@ -118,6 +120,26 @@ class Delimiter extends Extension {
return params;
}
genMDParamsV1() {
const params = {};
if (this.prefix) {
params.gte = `${DbPrefixes.Master}${this.prefix}`;
params.lt = `${DbPrefixes.Master}${inc(this.prefix)}`;
} else {
params.gte = DbPrefixes.Master;
params.lt = inc(DbPrefixes.Master); // stop after the last master key
}
const startVal = this[this.continueMarker] || this[this.startMarker];
if (startVal) {
if (params.gte > `${DbPrefixes.Master}${startVal}`) {
return params;
}
delete params.gte;
params.gt = `${DbPrefixes.Master}${startVal}`;
}
return params;
}
/**
* check if the max keys count has been reached and set the
* final state of the result if it is the case
@ -162,7 +184,13 @@ class Delimiter extends Extension {
* @return {number} - indicates if iteration should continue
*/
filter(obj) {
const key = obj.key;
let key;
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
key = obj.key.slice(DbPrefixes.Master.length);
} else {
key = obj.key;
}
const value = obj.value;
if ((this.prefix && !key.startsWith(this.prefix))
|| (this.alphabeticalOrder
@ -172,9 +200,7 @@ class Delimiter extends Extension {
}
if (this.delimiter) {
const baseIndex = this.prefix ? this.prefix.length : 0;
const delimiterIndex = nextDelimiter(key,
this.delimiter,
baseIndex);
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
if (delimiterIndex === -1) {
return this.addContents(key, value);
}
@ -211,8 +237,20 @@ class Delimiter extends Extension {
* that it's enough and should move on
*/
skipping() {
return this[this.nextContinueMarker];
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
return this.skippingV1();
}
return this.skippingV0();
}
skippingV0() {
return this[this.nextContinueMarker];
}
skippingV1() {
return `${DbPrefixes.Master}${this[this.nextContinueMarker]}`;
}
/**
* Return an object containing all mandatory fields to use once the

View File

@ -3,9 +3,11 @@
const Delimiter = require('./delimiter').Delimiter;
const Version = require('../../versioning/Version').Version;
const VSConst = require('../../versioning/constants').VersioningConstants;
const { FILTER_ACCEPT, FILTER_SKIP, SKIP_NONE } = require('./tools');
const { BucketVersioningKeyFormat } = VSConst;
const { inc, FILTER_ACCEPT, FILTER_SKIP, SKIP_NONE } = require('./tools');
const VID_SEP = VSConst.VersionId.Separator;
const { DbPrefixes } = VSConst;
/**
* Handle object listing with parameters. This extends the base class Delimiter
@ -24,25 +26,19 @@ class DelimiterMaster extends Delimiter {
* @param {String} parameters.continuationToken - obfuscated amazon token
* @param {RequestLogger} logger - The logger of the request
*/
constructor(parameters, logger) {
super(parameters, logger);
// non-PHD master version or a version whose master is a PHD version
this.prvKey = undefined;
this.prvPHDKey = undefined;
constructor(parameters, vFormat, logger) {
super(parameters, vFormat, logger);
}
/**
* Filter to apply on each iteration, based on:
* - prefix
* - delimiter
* - maxKeys
* The marker is being handled directly by levelDB
* @param {Object} obj - The key and value of the element
* @param {String} obj.key - The key of the element
* @param {String} obj.value - The value of the element
* @return {number} - indicates if iteration should continue
*/
filter(obj) {
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
return this.filterV1(obj);
}
return this.filterV0(obj);
}
filterV0(obj) {
let key = obj.key;
const value = obj.value;
@ -80,6 +76,7 @@ class DelimiterMaster extends Delimiter {
return FILTER_SKIP;
}
}
// TODO optimize for v1 versioning key format
if (Version.isPHD(value)) {
/* master version is a PHD version, we want to wait for the next
* one:
@ -121,6 +118,10 @@ class DelimiterMaster extends Delimiter {
return this.addContents(key, value);
}
filterV1(obj) {
return super.filter(obj);
}
skipping() {
if (this[this.nextContinueMarker]) {
// next marker or next continuation token:
@ -129,9 +130,9 @@ class DelimiterMaster extends Delimiter {
const index = this[this.nextContinueMarker].
lastIndexOf(this.delimiter);
if (index === this[this.nextContinueMarker].length - 1) {
return this[this.nextContinueMarker];
return `${DbPrefixes.Master}${this[this.nextContinueMarker]}`;
}
return this[this.nextContinueMarker] + VID_SEP;
return `${DbPrefixes.Master}${this[this.nextContinueMarker]}${VID_SEP}`;
}
return SKIP_NONE;
}

View File

@ -7,10 +7,7 @@ const { inc, FILTER_END, FILTER_ACCEPT, FILTER_SKIP, SKIP_NONE } =
require('./tools');
const VID_SEP = VSConst.VersionId.Separator;
function formatVersionKey(key, versionId) {
return `${key}${VID_SEP}${versionId}`;
}
const { DbPrefixes, BucketVersioningKeyFormat } = VSConst;
/**
* Handle object listing with parameters
@ -25,8 +22,8 @@ function formatVersionKey(key, versionId) {
* @prop {Number} maxKeys - number of keys to list
*/
class DelimiterVersions extends Delimiter {
constructor(parameters, logger) {
super(parameters, logger);
constructor(parameters, vFormat, logger) {
super(parameters, vFormat, logger);
// specific to version listing
this.keyMarker = parameters.keyMarker;
this.versionIdMarker = parameters.versionIdMarker;
@ -39,6 +36,14 @@ class DelimiterVersions extends Delimiter {
}
genMDParams() {
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
return this.genMDParamsV1();
}
return this.genMDParamsV0();
}
genMDParamsV0() {
const params = {};
if (this.parameters.prefix) {
params.gte = this.parameters.prefix;
@ -52,8 +57,8 @@ class DelimiterVersions extends Delimiter {
if (this.parameters.versionIdMarker) {
// versionIdMarker should always come with keyMarker
// but may not be the other way around
params.gt = formatVersionKey(this.parameters.keyMarker,
this.parameters.versionIdMarker);
params.gt = `${this.parameters.keyMarker}` +
`${VID_SEP}${this.parameters.versionIdMarker}`;
} else {
params.gt = inc(this.parameters.keyMarker + VID_SEP);
}
@ -61,6 +66,46 @@ class DelimiterVersions extends Delimiter {
return params;
}
genMDParamsV1() {
const params = [{}, {}];
if (this.parameters.prefix) {
params[0].gte = `${DbPrefixes.Master}${this.parameters.prefix}`;
params[0].lt = `${DbPrefixes.Master}${inc(this.parameters.prefix)}`;
params[1].gte = `${DbPrefixes.Version}${this.parameters.prefix}`;
params[1].lt = `${DbPrefixes.Version}${inc(this.parameters.prefix)}`;
} else {
params[0].gte = DbPrefixes.Master;
params[0].lt = inc(DbPrefixes.Master); // stop after the last master key
params[1].gte = DbPrefixes.Version;
params[1].lt = inc(DbPrefixes.Version); // stop after the last version key
}
if (this.parameters.keyMarker) {
if (params[1].gte <= `${DbPrefixes.Version}${this.parameters.keyMarker}`) {
delete params[0].gte;
delete params[1].gte;
params[0].gt = `${DbPrefixes.Master}${inc(this.parameters.keyMarker + VID_SEP)}`;
if (this.parameters.versionIdMarker) {
// versionIdMarker should always come with keyMarker
// but may not be the other way around
params[1].gt = `${DbPrefixes.Version}${this.parameters.keyMarker}` +
`${VID_SEP}${this.parameters.versionIdMarker}`;
} else {
params[1].gt = `${DbPrefixes.Version}${inc(this.parameters.keyMarker + VID_SEP)}`;
}
}
}
return params;
}
compareObjects(obj1, obj2) {
const key1 = obj1.key.slice(DbPrefixes.Master.length);
const key2 = obj2.key.slice(DbPrefixes.Version.length);
if (key1 < key2) {
return -1;
}
return 1;
}
/**
* Add a (key, versionId, value) tuple to the listing.
* Set the NextMarker to the current key
@ -98,26 +143,37 @@ class DelimiterVersions extends Delimiter {
* @return {number} - indicates if iteration should continue
*/
filter(obj) {
if (Version.isPHD(obj.value)) {
return FILTER_ACCEPT; // trick repd to not increase its streak
let key;
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
key = obj.key.slice(DbPrefixes.Version.length);
} else {
key = obj.key;
if (Version.isPHD(obj.value)) {
return FILTER_ACCEPT; // trick repd to not increase its streak
}
}
if (this.prefix && !obj.key.startsWith(this.prefix)) {
return this.filterCommon(key, obj.value);
}
filterCommon(key, value) {
if (this.prefix && !key.startsWith(this.prefix)) {
return FILTER_SKIP;
}
let key = obj.key; // original key
let versionId = undefined; // versionId
const versionIdIndex = obj.key.indexOf(VID_SEP);
let nonversionedKey;
let versionId = undefined;
const versionIdIndex = key.indexOf(VID_SEP);
if (versionIdIndex < 0) {
this.masterKey = obj.key;
nonversionedKey = key;
this.masterKey = key;
this.masterVersionId =
Version.from(obj.value).getVersionId() || 'null';
Version.from(value).getVersionId() || 'null';
versionId = this.masterVersionId;
} else {
// eslint-disable-next-line
key = obj.key.slice(0, versionIdIndex);
// eslint-disable-next-line
versionId = obj.key.slice(versionIdIndex + 1);
if (this.masterKey === key && this.masterVersionId === versionId) {
nonversionedKey = key.slice(0, versionIdIndex);
versionId = key.slice(versionIdIndex + 1);
if (this.masterKey === nonversionedKey &&
this.masterVersionId === versionId) {
return FILTER_ACCEPT; // trick repd to not increase its streak
}
this.masterKey = undefined;
@ -125,15 +181,25 @@ class DelimiterVersions extends Delimiter {
}
if (this.delimiter) {
const baseIndex = this.prefix ? this.prefix.length : 0;
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
const delimiterIndex =
nonversionedKey.indexOf(this.delimiter, baseIndex);
if (delimiterIndex >= 0) {
return this.addCommonPrefix(key, delimiterIndex);
return this.addCommonPrefix(nonversionedKey, delimiterIndex);
}
}
return this.addContents({ key, value: obj.value, versionId });
return this.addContents({ key: nonversionedKey, value, versionId });
}
skipping() {
if ([BucketVersioningKeyFormat.v1,
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
// TODO may need skipping if a delimiter is present
return SKIP_NONE;
}
return this.skippingV0();
}
skippingV0() {
if (this.NextMarker) {
const index = this.NextMarker.lastIndexOf(this.delimiter);
if (index === this.NextMarker.length - 1) {
@ -141,7 +207,7 @@ class DelimiterVersions extends Delimiter {
}
}
return SKIP_NONE;
}
}
/**
* Return an object containing all mandatory fields to use once the

View File

@ -0,0 +1,101 @@
const stream = require('stream');
class MergeStream extends stream.Readable {
constructor(stream1, stream2, compare) {
super({ objectMode: true });
this._compare = compare;
this._streams = [stream1, stream2];
// peekItems elements represent the latest item consumed from
// the respective input stream but not yet pushed. It can also
// be one of the following special values:
// - undefined: stream hasn't started emitting items
// - null: EOF reached and no more item to peek
this._peekItems = [undefined, undefined];
this._streamEof = [false, false];
this._streamToResume = null;
stream1.on('data', item => this._onItem(stream1, item, 0, 1));
stream1.once('end', () => this._onEnd(stream1, 0, 1));
stream1.once('error', err => this._onError(stream1, err, 0, 1));
stream2.on('data', item => this._onItem(stream2, item, 1, 0));
stream2.once('end', () => this._onEnd(stream2, 1, 0));
stream2.once('error', err => this._onError(stream2, err, 1, 0));
}
_read() {
if (this._streamToResume) {
this._streamToResume.resume();
this._streamToResume = null;
}
}
_onItem(myStream, myItem, myIndex, otherIndex) {
this._peekItems[myIndex] = myItem;
const otherItem = this._peekItems[otherIndex];
if (otherItem === undefined) {
// wait for the other stream to wake up
return myStream.pause();
}
if (otherItem === null || this._compare(myItem, otherItem) <= 0) {
if (!this.push(myItem)) {
myStream.pause();
this._streamToResume = myStream;
}
return undefined;
}
const otherStream = this._streams[otherIndex];
const otherMore = this.push(otherItem);
if (this._streamEof[otherIndex]) {
this._peekItems[otherIndex] = null;
return this.push(myItem);
}
myStream.pause();
if (otherMore) {
return otherStream.resume();
}
this._streamToResume = otherStream;
return undefined;
}
_showPeek() {
return `[${this._peekItems[0]},${this._peekItems[1]}]`;
}
_onEnd(myStream, myIndex, otherIndex) {
this._streamEof[myIndex] = true;
if (this._peekItems[myIndex] === undefined) {
this._peekItems[myIndex] = null;
}
const myItem = this._peekItems[myIndex];
const otherItem = this._peekItems[otherIndex];
if (otherItem === undefined) {
// wait for the other stream to wake up
return undefined;
}
if (otherItem === null) {
return this.push(null);
}
if (myItem === null || this._compare(myItem, otherItem) <= 0) {
this.push(otherItem);
this._peekItems[myIndex] = null;
}
if (this._streamEof[otherIndex]) {
return this.push(null);
}
const otherStream = this._streams[otherIndex];
return otherStream.resume();
}
_onError(myStream, err, myIndex, otherIndex) {
myStream.destroy();
if (this._streams[otherIndex]) {
this._streams[otherIndex].destroy();
}
this.emit('error', err);
}
}
module.exports = MergeStream;

View File

@ -2,4 +2,15 @@ module.exports.VersioningConstants = {
VersionId: {
Separator: '\0',
},
DbPrefixes: {
Master: '\x7fM',
Version: '\x7fV',
},
BucketVersioningKeyFormat: {
current: 'v1',
v0: 'v0',
v0mig: 'v0mig',
v1mig: 'v1mig',
v1: 'v1',
},
};

View File

@ -100,7 +100,7 @@ describe('Delimiter All masters listing algorithm', () => {
assert.deepStrictEqual(delimiter.result(), EmptyResult);
});
it('should skip entries superior to next marker', () => {
it('should skip entries inferior to next marker', () => {
const delimiter = new DelimiterMaster({ marker: 'b' }, fakeLogger);
assert.strictEqual(delimiter.filter({ key: 'a' }), FILTER_SKIP);

View File

@ -0,0 +1,228 @@
const assert = require('assert');
const stream = require('stream');
const MergeStream = require('../../../../lib/algos/stream/MergeStream');
class Streamify extends stream.Readable {
constructor(objectsToSend, errorAtEnd) {
super({ objectMode: true });
this._remaining = Array.from(objectsToSend);
this._remaining.reverse();
this._errorAtEnd = errorAtEnd || false;
}
_read() {
process.nextTick(() => {
while (this._remaining.length > 0) {
const item = this._remaining.pop();
if (!this.push(item)) {
return undefined;
}
}
if (this._errorAtEnd) {
return this.emit('error', new Error('OOPS'));
}
return this.push(null);
});
}
}
function readAll(stream, usePauseResume, cb) {
const result = [];
stream.on('data', item => {
result.push(item);
if (usePauseResume) {
stream.pause();
setTimeout(() => stream.resume(), 1);
}
});
stream.once('end', () => cb(null, result));
stream.once('error', err => cb(err));
}
function testMergeStreamWithIntegers(contents1, contents2,
usePauseResume, errorAtEnd, cb) {
const compareInt = (a, b) => {
if (a < b) {
return -1;
}
if (a > b) {
return 1;
}
return 0;
};
const expectedItems = contents1.concat(contents2).sort(compareInt);
const mergeStream = new MergeStream(
new Streamify(contents1, errorAtEnd)
.on('error', () => {}),
new Streamify(contents2)
.on('error', () => {}),
compareInt);
readAll(mergeStream, usePauseResume, (err, readItems) => {
if (errorAtEnd) {
assert(err);
} else {
assert.ifError(err);
assert.deepStrictEqual(readItems, expectedItems);
}
cb();
});
}
function testCasePretty(testCase, reversed) {
const desc1 = JSON.stringify(
reversed ? testCase.stream2 : testCase.stream1);
const desc2 = JSON.stringify(
reversed ? testCase.stream1 : testCase.stream2);
return `${desc1} merged with ${desc2}`;
}
describe('MergeStream', () => {
[
{
stream1: [],
stream2: [],
},
{
stream1: [0],
stream2: [],
},
{
stream1: [0, 1, 2, 3, 4],
stream2: [],
},
{
stream1: [0],
stream2: [1],
},
{
stream1: [1, 2, 3, 4, 5],
stream2: [0],
},
{
stream1: [0, 1, 2, 3, 4],
stream2: [5],
},
{
stream1: [1, 2],
stream2: [3, 4, 5],
},
{
stream1: [1, 2, 3],
stream2: [4, 5],
},
{
stream1: [1, 3, 5, 7, 9],
stream2: [2, 4, 6, 8, 10],
},
{
stream1: [1, 4, 7],
stream2: [0, 2, 3, 5, 6, 8, 9, 10],
},
{
stream1: [0, 10],
stream2: [1, 2, 3, 4, 5, 6, 7, 8, 9],
},
{
stream1: [4, 5, 6],
stream2: [1, 2, 3, 7, 8, 9],
},
{
stream1: [0],
stream2: [0],
},
{
stream1: [0, 1],
stream2: [0, 1],
},
{
stream1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
stream2: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
},
{
stream1: [0, 2, 3, 4],
stream2: [0, 1, 2, 4],
},
{
stream1: [0, 1, 2, 3],
stream2: [1, 2, 3, 4],
},
{
stream1: [0, 1, 2, 3],
stream2: [2, 3, 4, 5, 6, 7],
},
{
stream1: [0, 1, 2, 3],
stream2: [3, 4, 5, 6],
},
{
stream1: [0, 1, 2, 3],
stream2: [0, 3],
},
].forEach(testCase => {
[false, true].forEach(usePauseResume => {
[false, true].forEach(errorAtEnd => {
const testDesc =
`${testCasePretty(testCase, false)}` +
`${usePauseResume ? ' with pause/resume' : ''}` +
`${errorAtEnd ? ' with error' : ''}`;
it(`should cover ${testDesc}`, done => {
testMergeStreamWithIntegers(
testCase.stream1, testCase.stream2,
usePauseResume, errorAtEnd, done);
});
it(`should cover ${testDesc}`, done => {
testMergeStreamWithIntegers(
testCase.stream2, testCase.stream1,
usePauseResume, errorAtEnd, done);
});
});
});
});
[100, 1000, 10000, 100000].forEach(nbEntries => {
[false, true].forEach(usePauseResume => {
[false, true].forEach(errorAtEnd => {
if ((!usePauseResume && !errorAtEnd) || nbEntries <= 1000) {
const fixtureDesc =
`${usePauseResume ? ' with pause/resume' : ''}` +
`${errorAtEnd ? ' with error' : ''}`;
it(`${nbEntries} sequential entries${fixtureDesc}`,
function bigMergeSequential(done) {
this.timeout(10000);
const stream1 = [];
const stream2 = [];
for (let i = 0; i < nbEntries; ++i) {
// picked two large arbitrary prime numbers to get a
// deterministic random-looking series
if (Math.floor(i / (nbEntries / 10)) % 2 === 0) {
stream1.push(i);
} else {
stream2.push(i);
}
}
testMergeStreamWithIntegers(
stream1, stream2, usePauseResume, errorAtEnd, done);
});
it(`${nbEntries} randomly mingled entries${fixtureDesc}`,
function bigMergeRandom(done) {
this.timeout(10000);
const stream1 = [];
const stream2 = [];
let accu = nbEntries;
for (let i = 0; i < nbEntries; ++i) {
// picked two large arbitrary prime numbers to get a
// deterministic random-looking series
accu = (accu * 1592760451) % 8448053;
if (accu % 2 === 0) {
stream1.push(i);
} else {
stream2.push(i);
}
}
testMergeStreamWithIntegers(
stream1, stream2, usePauseResume, errorAtEnd, done);
});
}
});
});
});
});