Compare commits
No commits in common. "5f2a4d8e3b0e23ae39f276070f419defabf615f4" and "14b994b6be7e13e9565903103ab264a003a62236" have entirely different histories.
5f2a4d8e3b
...
14b994b6be
3
index.js
3
index.js
|
@ -27,9 +27,6 @@ module.exports = {
|
||||||
cache: {
|
cache: {
|
||||||
LRUCache: require('./lib/algos/cache/LRUCache'),
|
LRUCache: require('./lib/algos/cache/LRUCache'),
|
||||||
},
|
},
|
||||||
stream: {
|
|
||||||
MergeStream: require('./lib/algos/stream/MergeStream'),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
policies: {
|
policies: {
|
||||||
evaluators: require('./lib/policyEvaluator/evaluator.js'),
|
evaluators: require('./lib/policyEvaluator/evaluator.js'),
|
||||||
|
|
|
@ -2,8 +2,6 @@
|
||||||
|
|
||||||
const { inc, checkLimit, FILTER_END, FILTER_ACCEPT } = require('./tools');
|
const { inc, checkLimit, FILTER_END, FILTER_ACCEPT } = require('./tools');
|
||||||
const DEFAULT_MAX_KEYS = 1000;
|
const DEFAULT_MAX_KEYS = 1000;
|
||||||
const VSConst = require('../../versioning/constants').VersioningConstants;
|
|
||||||
const { DbPrefixes, BucketVersioningKeyFormat } = VSConst;
|
|
||||||
|
|
||||||
function numberDefault(num, defaultNum) {
|
function numberDefault(num, defaultNum) {
|
||||||
const parsedNum = Number.parseInt(num, 10);
|
const parsedNum = Number.parseInt(num, 10);
|
||||||
|
@ -21,9 +19,8 @@ class MultipartUploads {
|
||||||
* @param {RequestLogger} logger - The logger of the request
|
* @param {RequestLogger} logger - The logger of the request
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
constructor(params, vFormat, logger) {
|
constructor(params, logger) {
|
||||||
this.params = params;
|
this.params = params;
|
||||||
this.vFormat = vFormat;
|
|
||||||
this.CommonPrefixes = [];
|
this.CommonPrefixes = [];
|
||||||
this.Uploads = [];
|
this.Uploads = [];
|
||||||
this.IsTruncated = false;
|
this.IsTruncated = false;
|
||||||
|
@ -57,18 +54,6 @@ class MultipartUploads {
|
||||||
}
|
}
|
||||||
params.lt = inc(this.params.prefix);
|
params.lt = inc(this.params.prefix);
|
||||||
}
|
}
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
if (params.gt !== undefined) {
|
|
||||||
params.gt = `${DbPrefixes.Master}${params.gt}`;
|
|
||||||
}
|
|
||||||
if (params.gte !== undefined) {
|
|
||||||
params.gte = `${DbPrefixes.Master}${params.gte}`;
|
|
||||||
}
|
|
||||||
if (params.lt !== undefined) {
|
|
||||||
params.lt = `${DbPrefixes.Master}${params.lt}`;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,13 +113,7 @@ class MultipartUploads {
|
||||||
this.IsTruncated = this.maxKeys > 0;
|
this.IsTruncated = this.maxKeys > 0;
|
||||||
return FILTER_END;
|
return FILTER_END;
|
||||||
}
|
}
|
||||||
let key;
|
const key = obj.key;
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
key = obj.key.slice(DbPrefixes.Master.length);
|
|
||||||
} else {
|
|
||||||
key = obj.key;
|
|
||||||
}
|
|
||||||
const value = obj.value;
|
const value = obj.value;
|
||||||
if (this.delimiter) {
|
if (this.delimiter) {
|
||||||
const mpuPrefixSlice = `overview${this.splitter}`.length;
|
const mpuPrefixSlice = `overview${this.splitter}`.length;
|
||||||
|
|
|
@ -16,9 +16,8 @@ class List extends Extension {
|
||||||
* @param {RequestLogger} logger - The logger of the request
|
* @param {RequestLogger} logger - The logger of the request
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
constructor(parameters, vFormat, logger) {
|
constructor(parameters, logger) {
|
||||||
super(parameters, logger);
|
super(parameters, logger);
|
||||||
// vFormat is ignored
|
|
||||||
this.res = [];
|
this.res = [];
|
||||||
if (parameters) {
|
if (parameters) {
|
||||||
this.maxKeys = checkLimit(parameters.maxKeys, DEFAULT_MAX_KEYS);
|
this.maxKeys = checkLimit(parameters.maxKeys, DEFAULT_MAX_KEYS);
|
||||||
|
|
|
@ -1,11 +1,22 @@
|
||||||
'use strict'; // eslint-disable-line strict
|
'use strict'; // eslint-disable-line strict
|
||||||
|
|
||||||
const Extension = require('./Extension').default;
|
const Extension = require('./Extension').default;
|
||||||
const { inc, FILTER_END, FILTER_ACCEPT, FILTER_SKIP } = require('./tools');
|
const { inc, FILTER_END, FILTER_ACCEPT, FILTER_SKIP, SKIP_NONE } = require('./tools');
|
||||||
const Version = require('../../versioning/Version').Version;
|
|
||||||
const VSConst = require('../../versioning/constants').VersioningConstants;
|
const VSConst = require('../../versioning/constants').VersioningConstants;
|
||||||
const { DbPrefixes, BucketVersioningKeyFormat } = VSConst;
|
const { DbPrefixes } = VSConst;
|
||||||
const VID_SEP = VSConst.VersionId.Separator;
|
|
||||||
|
/**
|
||||||
|
* Find the next delimiter in the path
|
||||||
|
*
|
||||||
|
* @param {string} key - path of the object
|
||||||
|
* @param {string} delimiter - string to find
|
||||||
|
* @param {number} index - index to start at
|
||||||
|
* @return {number} delimiterIndex - returns -1 in case no delimiter is found
|
||||||
|
*/
|
||||||
|
function nextDelimiter(key, delimiter, index) {
|
||||||
|
return key.indexOf(delimiter, index);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find the common prefix in the path
|
* Find the common prefix in the path
|
||||||
|
@ -55,7 +66,7 @@ class Delimiter extends Extension {
|
||||||
* @param {RequestLogger} logger - The logger of the
|
* @param {RequestLogger} logger - The logger of the
|
||||||
* request
|
* request
|
||||||
*/
|
*/
|
||||||
constructor(parameters, vFormat, logger) {
|
constructor(parameters, logger) {
|
||||||
super(parameters, logger);
|
super(parameters, logger);
|
||||||
// original listing parameters
|
// original listing parameters
|
||||||
this.delimiter = parameters.delimiter;
|
this.delimiter = parameters.delimiter;
|
||||||
|
@ -68,8 +79,6 @@ class Delimiter extends Extension {
|
||||||
typeof parameters.alphabeticalOrder !== 'undefined' ?
|
typeof parameters.alphabeticalOrder !== 'undefined' ?
|
||||||
parameters.alphabeticalOrder : true;
|
parameters.alphabeticalOrder : true;
|
||||||
|
|
||||||
// versioning key format
|
|
||||||
this.vFormat = vFormat;
|
|
||||||
// results
|
// results
|
||||||
this.CommonPrefixes = [];
|
this.CommonPrefixes = [];
|
||||||
this.Contents = [];
|
this.Contents = [];
|
||||||
|
@ -96,31 +105,6 @@ class Delimiter extends Extension {
|
||||||
}
|
}
|
||||||
|
|
||||||
genMDParams() {
|
genMDParams() {
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
return this.genMDParamsV1();
|
|
||||||
}
|
|
||||||
return this.genMDParamsV0();
|
|
||||||
}
|
|
||||||
|
|
||||||
genMDParamsV0() {
|
|
||||||
const params = {};
|
|
||||||
if (this.prefix) {
|
|
||||||
params.gte = this.prefix;
|
|
||||||
params.lt = inc(this.prefix);
|
|
||||||
}
|
|
||||||
const startVal = this[this.continueMarker] || this[this.startMarker];
|
|
||||||
if (startVal) {
|
|
||||||
if (params.gte && params.gte > startVal) {
|
|
||||||
return params;
|
|
||||||
}
|
|
||||||
delete params.gte;
|
|
||||||
params.gt = startVal;
|
|
||||||
}
|
|
||||||
return params;
|
|
||||||
}
|
|
||||||
|
|
||||||
genMDParamsV1() {
|
|
||||||
const params = {};
|
const params = {};
|
||||||
if (this.prefix) {
|
if (this.prefix) {
|
||||||
params.gte = `${DbPrefixes.Master}${this.prefix}`;
|
params.gte = `${DbPrefixes.Master}${this.prefix}`;
|
||||||
|
@ -184,13 +168,7 @@ class Delimiter extends Extension {
|
||||||
* @return {number} - indicates if iteration should continue
|
* @return {number} - indicates if iteration should continue
|
||||||
*/
|
*/
|
||||||
filter(obj) {
|
filter(obj) {
|
||||||
let key;
|
const key = obj.key;
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
key = obj.key.slice(DbPrefixes.Master.length);
|
|
||||||
} else {
|
|
||||||
key = obj.key;
|
|
||||||
}
|
|
||||||
const value = obj.value;
|
const value = obj.value;
|
||||||
if ((this.prefix && !key.startsWith(this.prefix))
|
if ((this.prefix && !key.startsWith(this.prefix))
|
||||||
|| (this.alphabeticalOrder
|
|| (this.alphabeticalOrder
|
||||||
|
@ -200,7 +178,9 @@ class Delimiter extends Extension {
|
||||||
}
|
}
|
||||||
if (this.delimiter) {
|
if (this.delimiter) {
|
||||||
const baseIndex = this.prefix ? this.prefix.length : 0;
|
const baseIndex = this.prefix ? this.prefix.length : 0;
|
||||||
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
|
const delimiterIndex = nextDelimiter(key,
|
||||||
|
this.delimiter,
|
||||||
|
baseIndex);
|
||||||
if (delimiterIndex === -1) {
|
if (delimiterIndex === -1) {
|
||||||
return this.addContents(key, value);
|
return this.addContents(key, value);
|
||||||
}
|
}
|
||||||
|
@ -237,20 +217,11 @@ class Delimiter extends Extension {
|
||||||
* that it's enough and should move on
|
* that it's enough and should move on
|
||||||
*/
|
*/
|
||||||
skipping() {
|
skipping() {
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
if (this[this.nextContinueMarker]) {
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
return this.skippingV1();
|
|
||||||
}
|
|
||||||
return this.skippingV0();
|
|
||||||
}
|
|
||||||
|
|
||||||
skippingV0() {
|
|
||||||
return this[this.nextContinueMarker];
|
|
||||||
}
|
|
||||||
|
|
||||||
skippingV1() {
|
|
||||||
return `${DbPrefixes.Master}${this[this.nextContinueMarker]}`;
|
return `${DbPrefixes.Master}${this[this.nextContinueMarker]}`;
|
||||||
}
|
}
|
||||||
|
return SKIP_NONE;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return an object containing all mandatory fields to use once the
|
* Return an object containing all mandatory fields to use once the
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
const Delimiter = require('./delimiter').Delimiter;
|
const Delimiter = require('./delimiter').Delimiter;
|
||||||
const Version = require('../../versioning/Version').Version;
|
const Version = require('../../versioning/Version').Version;
|
||||||
const VSConst = require('../../versioning/constants').VersioningConstants;
|
const VSConst = require('../../versioning/constants').VersioningConstants;
|
||||||
const { BucketVersioningKeyFormat } = VSConst;
|
|
||||||
const { inc, FILTER_ACCEPT, FILTER_SKIP, SKIP_NONE } = require('./tools');
|
const { inc, FILTER_ACCEPT, FILTER_SKIP, SKIP_NONE } = require('./tools');
|
||||||
|
|
||||||
const VID_SEP = VSConst.VersionId.Separator;
|
const VID_SEP = VSConst.VersionId.Separator;
|
||||||
|
@ -26,100 +25,39 @@ class DelimiterMaster extends Delimiter {
|
||||||
* @param {String} parameters.continuationToken - obfuscated amazon token
|
* @param {String} parameters.continuationToken - obfuscated amazon token
|
||||||
* @param {RequestLogger} logger - The logger of the request
|
* @param {RequestLogger} logger - The logger of the request
|
||||||
*/
|
*/
|
||||||
constructor(parameters, vFormat, logger) {
|
constructor(parameters, logger) {
|
||||||
super(parameters, vFormat, logger);
|
super(parameters, logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filter to apply on each iteration, based on:
|
||||||
|
* - prefix
|
||||||
|
* - delimiter
|
||||||
|
* - maxKeys
|
||||||
|
* The marker is being handled directly by levelDB
|
||||||
|
* @param {Object} obj - The key and value of the element
|
||||||
|
* @param {String} obj.key - The key of the element
|
||||||
|
* @param {String} obj.value - The value of the element
|
||||||
|
* @return {number} - indicates if iteration should continue
|
||||||
|
*/
|
||||||
filter(obj) {
|
filter(obj) {
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
const key = obj.key;
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
return this.filterV1(obj);
|
|
||||||
}
|
|
||||||
return this.filterV0(obj);
|
|
||||||
}
|
|
||||||
|
|
||||||
filterV0(obj) {
|
|
||||||
let key = obj.key;
|
|
||||||
const value = obj.value;
|
const value = obj.value;
|
||||||
|
|
||||||
/* Skip keys not starting with the prefix or not alphabetically
|
|
||||||
* ordered. */
|
|
||||||
if ((this.prefix && !key.startsWith(this.prefix))
|
if ((this.prefix && !key.startsWith(this.prefix))
|
||||||
|| (typeof this[this.nextContinueMarker] === 'string' &&
|
|| (this.alphabeticalOrder
|
||||||
key <= this[this.nextContinueMarker])) {
|
&& typeof this[this.nextContinueMarker] === 'string'
|
||||||
|
&& key <= this[this.nextContinueMarker])) {
|
||||||
return FILTER_SKIP;
|
return FILTER_SKIP;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Skip version keys (<key><versionIdSeparator><version>) if we already
|
|
||||||
* have a master version. */
|
|
||||||
const versionIdIndex = key.indexOf(VID_SEP);
|
|
||||||
if (versionIdIndex >= 0) {
|
|
||||||
key = key.slice(0, versionIdIndex);
|
|
||||||
/* - key === this.prvKey is triggered when a master version has
|
|
||||||
* been accepted for this key,
|
|
||||||
* - key === this.NextMarker or this.NextContinueToken is triggered
|
|
||||||
* when a listing page ends on an accepted obj and the next page
|
|
||||||
* starts with a version of this object.
|
|
||||||
* In that case prvKey is default set to undefined
|
|
||||||
* in the constructor) and comparing to NextMarker is the only
|
|
||||||
* way to know we should not accept this version. This test is
|
|
||||||
* not redundant with the one at the beginning of this function,
|
|
||||||
* we are comparing here the key without the version suffix,
|
|
||||||
* - key startsWith the previous NextMarker happens because we set
|
|
||||||
* NextMarker to the common prefix instead of the whole key
|
|
||||||
* value. (TODO: remove this test once ZENKO-1048 is fixed. ).
|
|
||||||
* */
|
|
||||||
if (key === this.prvKey || key === this[this.nextContinueMarker] ||
|
|
||||||
(this.delimiter &&
|
|
||||||
key.startsWith(this[this.nextContinueMarker]))) {
|
|
||||||
/* master version already filtered */
|
|
||||||
return FILTER_SKIP;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO optimize for v1 versioning key format
|
|
||||||
if (Version.isPHD(value)) {
|
|
||||||
/* master version is a PHD version, we want to wait for the next
|
|
||||||
* one:
|
|
||||||
* - Set the prvKey to undefined to not skip the next version,
|
|
||||||
* - return accept to avoid users to skip the next values in range
|
|
||||||
* (skip scan mechanism in metadata backend like Metadata or
|
|
||||||
* MongoClient). */
|
|
||||||
this.prvKey = undefined;
|
|
||||||
this.prvPHDKey = key;
|
|
||||||
return FILTER_ACCEPT;
|
|
||||||
}
|
|
||||||
if (Version.isDeleteMarker(value)) {
|
|
||||||
/* This entry is a deleteMarker which has not been filtered by the
|
|
||||||
* version test. Either :
|
|
||||||
* - it is a deleteMarker on the master version, we want to SKIP
|
|
||||||
* all the following entries with this key (no master version),
|
|
||||||
* - or a deleteMarker following a PHD (setting prvKey to undefined
|
|
||||||
* when an entry is a PHD avoids the skip on version for the
|
|
||||||
* next entry). In that case we expect the master version to
|
|
||||||
* follow. */
|
|
||||||
if (key === this.prvPHDKey) {
|
|
||||||
this.prvKey = undefined;
|
|
||||||
return FILTER_ACCEPT;
|
|
||||||
}
|
|
||||||
this.prvKey = key;
|
|
||||||
return FILTER_SKIP;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.prvKey = key;
|
|
||||||
if (this.delimiter) {
|
if (this.delimiter) {
|
||||||
// check if the key has the delimiter
|
|
||||||
const baseIndex = this.prefix ? this.prefix.length : 0;
|
const baseIndex = this.prefix ? this.prefix.length : 0;
|
||||||
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
|
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
|
||||||
if (delimiterIndex >= 0) {
|
if (delimiterIndex === -1) {
|
||||||
// try to add the prefix to the list
|
|
||||||
return this.addCommonPrefix(key, delimiterIndex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return this.addContents(key, value);
|
return this.addContents(key, value);
|
||||||
}
|
}
|
||||||
|
return this.addCommonPrefix(key, delimiterIndex);
|
||||||
filterV1(obj) {
|
}
|
||||||
return super.filter(obj);
|
return this.addContents(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
skipping() {
|
skipping() {
|
||||||
|
|
|
@ -7,7 +7,11 @@ const { inc, FILTER_END, FILTER_ACCEPT, FILTER_SKIP, SKIP_NONE } =
|
||||||
require('./tools');
|
require('./tools');
|
||||||
|
|
||||||
const VID_SEP = VSConst.VersionId.Separator;
|
const VID_SEP = VSConst.VersionId.Separator;
|
||||||
const { DbPrefixes, BucketVersioningKeyFormat } = VSConst;
|
const { DbPrefixes } = VSConst;
|
||||||
|
|
||||||
|
function formatVersionKey(key, versionId) {
|
||||||
|
return `${DbPrefixes.Version}${key}${VID_SEP}${versionId}`;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle object listing with parameters
|
* Handle object listing with parameters
|
||||||
|
@ -22,8 +26,8 @@ const { DbPrefixes, BucketVersioningKeyFormat } = VSConst;
|
||||||
* @prop {Number} maxKeys - number of keys to list
|
* @prop {Number} maxKeys - number of keys to list
|
||||||
*/
|
*/
|
||||||
class DelimiterVersions extends Delimiter {
|
class DelimiterVersions extends Delimiter {
|
||||||
constructor(parameters, vFormat, logger) {
|
constructor(parameters, logger) {
|
||||||
super(parameters, vFormat, logger);
|
super(parameters, logger);
|
||||||
// specific to version listing
|
// specific to version listing
|
||||||
this.keyMarker = parameters.keyMarker;
|
this.keyMarker = parameters.keyMarker;
|
||||||
this.versionIdMarker = parameters.versionIdMarker;
|
this.versionIdMarker = parameters.versionIdMarker;
|
||||||
|
@ -36,76 +40,31 @@ class DelimiterVersions extends Delimiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
genMDParams() {
|
genMDParams() {
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
return this.genMDParamsV1();
|
|
||||||
}
|
|
||||||
return this.genMDParamsV0();
|
|
||||||
}
|
|
||||||
|
|
||||||
genMDParamsV0() {
|
|
||||||
const params = {};
|
const params = {};
|
||||||
if (this.parameters.prefix) {
|
if (this.parameters.prefix) {
|
||||||
params.gte = this.parameters.prefix;
|
params.gte = `${DbPrefixes.Version}${this.parameters.prefix}`;
|
||||||
params.lt = inc(this.parameters.prefix);
|
params.lt = `${DbPrefixes.Version}${inc(this.parameters.prefix)}`;
|
||||||
|
} else {
|
||||||
|
params.gte = DbPrefixes.Version;
|
||||||
|
params.lt = inc(DbPrefixes.Version); // stop after the last version key
|
||||||
}
|
}
|
||||||
if (this.parameters.keyMarker) {
|
if (this.parameters.keyMarker) {
|
||||||
if (params.gte && params.gte > this.parameters.keyMarker) {
|
if (params.gte > `${DbPrefixes.Version}${this.parameters.keyMarker}`) {
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
delete params.gte;
|
delete params.gte;
|
||||||
if (this.parameters.versionIdMarker) {
|
if (this.parameters.versionIdMarker) {
|
||||||
// versionIdMarker should always come with keyMarker
|
// versionIdMarker should always come with keyMarker
|
||||||
// but may not be the other way around
|
// but may not be the other way around
|
||||||
params.gt = `${this.parameters.keyMarker}` +
|
params.gt = formatVersionKey(this.parameters.keyMarker,
|
||||||
`${VID_SEP}${this.parameters.versionIdMarker}`;
|
this.parameters.versionIdMarker);
|
||||||
} else {
|
} else {
|
||||||
params.gt = inc(this.parameters.keyMarker + VID_SEP);
|
params.gt = `${DbPrefixes.Version}${inc(this.parameters.keyMarker + VID_SEP)}`;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
genMDParamsV1() {
|
|
||||||
const params = [{}, {}];
|
|
||||||
if (this.parameters.prefix) {
|
|
||||||
params[0].gte = `${DbPrefixes.Master}${this.parameters.prefix}`;
|
|
||||||
params[0].lt = `${DbPrefixes.Master}${inc(this.parameters.prefix)}`;
|
|
||||||
params[1].gte = `${DbPrefixes.Version}${this.parameters.prefix}`;
|
|
||||||
params[1].lt = `${DbPrefixes.Version}${inc(this.parameters.prefix)}`;
|
|
||||||
} else {
|
|
||||||
params[0].gte = DbPrefixes.Master;
|
|
||||||
params[0].lt = inc(DbPrefixes.Master); // stop after the last master key
|
|
||||||
params[1].gte = DbPrefixes.Version;
|
|
||||||
params[1].lt = inc(DbPrefixes.Version); // stop after the last version key
|
|
||||||
}
|
|
||||||
if (this.parameters.keyMarker) {
|
|
||||||
if (params[1].gte <= `${DbPrefixes.Version}${this.parameters.keyMarker}`) {
|
|
||||||
delete params[0].gte;
|
|
||||||
delete params[1].gte;
|
|
||||||
params[0].gt = `${DbPrefixes.Master}${inc(this.parameters.keyMarker + VID_SEP)}`;
|
|
||||||
if (this.parameters.versionIdMarker) {
|
|
||||||
// versionIdMarker should always come with keyMarker
|
|
||||||
// but may not be the other way around
|
|
||||||
params[1].gt = `${DbPrefixes.Version}${this.parameters.keyMarker}` +
|
|
||||||
`${VID_SEP}${this.parameters.versionIdMarker}`;
|
|
||||||
} else {
|
|
||||||
params[1].gt = `${DbPrefixes.Version}${inc(this.parameters.keyMarker + VID_SEP)}`;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return params;
|
|
||||||
}
|
|
||||||
|
|
||||||
compareObjects(obj1, obj2) {
|
|
||||||
const key1 = obj1.key.slice(DbPrefixes.Master.length);
|
|
||||||
const key2 = obj2.key.slice(DbPrefixes.Version.length);
|
|
||||||
if (key1 < key2) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a (key, versionId, value) tuple to the listing.
|
* Add a (key, versionId, value) tuple to the listing.
|
||||||
* Set the NextMarker to the current key
|
* Set the NextMarker to the current key
|
||||||
|
@ -143,37 +102,26 @@ class DelimiterVersions extends Delimiter {
|
||||||
* @return {number} - indicates if iteration should continue
|
* @return {number} - indicates if iteration should continue
|
||||||
*/
|
*/
|
||||||
filter(obj) {
|
filter(obj) {
|
||||||
let key;
|
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
key = obj.key.slice(DbPrefixes.Version.length);
|
|
||||||
} else {
|
|
||||||
key = obj.key;
|
|
||||||
if (Version.isPHD(obj.value)) {
|
if (Version.isPHD(obj.value)) {
|
||||||
return FILTER_ACCEPT; // trick repd to not increase its streak
|
return FILTER_ACCEPT; // trick repd to not increase its streak
|
||||||
}
|
}
|
||||||
}
|
if (this.prefix && !obj.key.startsWith(this.prefix)) {
|
||||||
return this.filterCommon(key, obj.value);
|
|
||||||
}
|
|
||||||
|
|
||||||
filterCommon(key, value) {
|
|
||||||
if (this.prefix && !key.startsWith(this.prefix)) {
|
|
||||||
return FILTER_SKIP;
|
return FILTER_SKIP;
|
||||||
}
|
}
|
||||||
let nonversionedKey;
|
let key = obj.key; // original key
|
||||||
let versionId = undefined;
|
let versionId = undefined; // versionId
|
||||||
const versionIdIndex = key.indexOf(VID_SEP);
|
const versionIdIndex = obj.key.indexOf(VID_SEP);
|
||||||
if (versionIdIndex < 0) {
|
if (versionIdIndex < 0) {
|
||||||
nonversionedKey = key;
|
this.masterKey = obj.key;
|
||||||
this.masterKey = key;
|
|
||||||
this.masterVersionId =
|
this.masterVersionId =
|
||||||
Version.from(value).getVersionId() || 'null';
|
Version.from(obj.value).getVersionId() || 'null';
|
||||||
versionId = this.masterVersionId;
|
versionId = this.masterVersionId;
|
||||||
} else {
|
} else {
|
||||||
nonversionedKey = key.slice(0, versionIdIndex);
|
// eslint-disable-next-line
|
||||||
versionId = key.slice(versionIdIndex + 1);
|
key = obj.key.slice(0, versionIdIndex);
|
||||||
if (this.masterKey === nonversionedKey &&
|
// eslint-disable-next-line
|
||||||
this.masterVersionId === versionId) {
|
versionId = obj.key.slice(versionIdIndex + 1);
|
||||||
|
if (this.masterKey === key && this.masterVersionId === versionId) {
|
||||||
return FILTER_ACCEPT; // trick repd to not increase its streak
|
return FILTER_ACCEPT; // trick repd to not increase its streak
|
||||||
}
|
}
|
||||||
this.masterKey = undefined;
|
this.masterKey = undefined;
|
||||||
|
@ -181,29 +129,19 @@ class DelimiterVersions extends Delimiter {
|
||||||
}
|
}
|
||||||
if (this.delimiter) {
|
if (this.delimiter) {
|
||||||
const baseIndex = this.prefix ? this.prefix.length : 0;
|
const baseIndex = this.prefix ? this.prefix.length : 0;
|
||||||
const delimiterIndex =
|
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
|
||||||
nonversionedKey.indexOf(this.delimiter, baseIndex);
|
|
||||||
if (delimiterIndex >= 0) {
|
if (delimiterIndex >= 0) {
|
||||||
return this.addCommonPrefix(nonversionedKey, delimiterIndex);
|
return this.addCommonPrefix(key, delimiterIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return this.addContents({ key: nonversionedKey, value, versionId });
|
return this.addContents({ key, value: obj.value, versionId });
|
||||||
}
|
}
|
||||||
|
|
||||||
skipping() {
|
skipping() {
|
||||||
if ([BucketVersioningKeyFormat.v1,
|
|
||||||
BucketVersioningKeyFormat.v1mig].includes(this.vFormat)) {
|
|
||||||
// TODO may need skipping if a delimiter is present
|
|
||||||
return SKIP_NONE;
|
|
||||||
}
|
|
||||||
return this.skippingV0();
|
|
||||||
}
|
|
||||||
|
|
||||||
skippingV0() {
|
|
||||||
if (this.NextMarker) {
|
if (this.NextMarker) {
|
||||||
const index = this.NextMarker.lastIndexOf(this.delimiter);
|
const index = this.NextMarker.lastIndexOf(this.delimiter);
|
||||||
if (index === this.NextMarker.length - 1) {
|
if (index === this.NextMarker.length - 1) {
|
||||||
return this.NextMarker;
|
return `${DbPrefixes.Version}${this.NextMarker}`;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SKIP_NONE;
|
return SKIP_NONE;
|
||||||
|
|
|
@ -1,101 +0,0 @@
|
||||||
const stream = require('stream');
|
|
||||||
|
|
||||||
class MergeStream extends stream.Readable {
|
|
||||||
constructor(stream1, stream2, compare) {
|
|
||||||
super({ objectMode: true });
|
|
||||||
|
|
||||||
this._compare = compare;
|
|
||||||
this._streams = [stream1, stream2];
|
|
||||||
|
|
||||||
// peekItems elements represent the latest item consumed from
|
|
||||||
// the respective input stream but not yet pushed. It can also
|
|
||||||
// be one of the following special values:
|
|
||||||
// - undefined: stream hasn't started emitting items
|
|
||||||
// - null: EOF reached and no more item to peek
|
|
||||||
this._peekItems = [undefined, undefined];
|
|
||||||
this._streamEof = [false, false];
|
|
||||||
this._streamToResume = null;
|
|
||||||
|
|
||||||
stream1.on('data', item => this._onItem(stream1, item, 0, 1));
|
|
||||||
stream1.once('end', () => this._onEnd(stream1, 0, 1));
|
|
||||||
stream1.once('error', err => this._onError(stream1, err, 0, 1));
|
|
||||||
|
|
||||||
stream2.on('data', item => this._onItem(stream2, item, 1, 0));
|
|
||||||
stream2.once('end', () => this._onEnd(stream2, 1, 0));
|
|
||||||
stream2.once('error', err => this._onError(stream2, err, 1, 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
_read() {
|
|
||||||
if (this._streamToResume) {
|
|
||||||
this._streamToResume.resume();
|
|
||||||
this._streamToResume = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_onItem(myStream, myItem, myIndex, otherIndex) {
|
|
||||||
this._peekItems[myIndex] = myItem;
|
|
||||||
const otherItem = this._peekItems[otherIndex];
|
|
||||||
if (otherItem === undefined) {
|
|
||||||
// wait for the other stream to wake up
|
|
||||||
return myStream.pause();
|
|
||||||
}
|
|
||||||
if (otherItem === null || this._compare(myItem, otherItem) <= 0) {
|
|
||||||
if (!this.push(myItem)) {
|
|
||||||
myStream.pause();
|
|
||||||
this._streamToResume = myStream;
|
|
||||||
}
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
const otherStream = this._streams[otherIndex];
|
|
||||||
const otherMore = this.push(otherItem);
|
|
||||||
if (this._streamEof[otherIndex]) {
|
|
||||||
this._peekItems[otherIndex] = null;
|
|
||||||
return this.push(myItem);
|
|
||||||
}
|
|
||||||
myStream.pause();
|
|
||||||
if (otherMore) {
|
|
||||||
return otherStream.resume();
|
|
||||||
}
|
|
||||||
this._streamToResume = otherStream;
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
_showPeek() {
|
|
||||||
return `[${this._peekItems[0]},${this._peekItems[1]}]`;
|
|
||||||
}
|
|
||||||
|
|
||||||
_onEnd(myStream, myIndex, otherIndex) {
|
|
||||||
this._streamEof[myIndex] = true;
|
|
||||||
if (this._peekItems[myIndex] === undefined) {
|
|
||||||
this._peekItems[myIndex] = null;
|
|
||||||
}
|
|
||||||
const myItem = this._peekItems[myIndex];
|
|
||||||
const otherItem = this._peekItems[otherIndex];
|
|
||||||
if (otherItem === undefined) {
|
|
||||||
// wait for the other stream to wake up
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
if (otherItem === null) {
|
|
||||||
return this.push(null);
|
|
||||||
}
|
|
||||||
if (myItem === null || this._compare(myItem, otherItem) <= 0) {
|
|
||||||
this.push(otherItem);
|
|
||||||
this._peekItems[myIndex] = null;
|
|
||||||
}
|
|
||||||
if (this._streamEof[otherIndex]) {
|
|
||||||
return this.push(null);
|
|
||||||
}
|
|
||||||
const otherStream = this._streams[otherIndex];
|
|
||||||
return otherStream.resume();
|
|
||||||
}
|
|
||||||
|
|
||||||
_onError(myStream, err, myIndex, otherIndex) {
|
|
||||||
myStream.destroy();
|
|
||||||
if (this._streams[otherIndex]) {
|
|
||||||
this._streams[otherIndex].destroy();
|
|
||||||
}
|
|
||||||
this.emit('error', err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = MergeStream;
|
|
|
@ -6,11 +6,4 @@ module.exports.VersioningConstants = {
|
||||||
Master: '\x7fM',
|
Master: '\x7fM',
|
||||||
Version: '\x7fV',
|
Version: '\x7fV',
|
||||||
},
|
},
|
||||||
BucketVersioningKeyFormat: {
|
|
||||||
current: 'v1',
|
|
||||||
v0: 'v0',
|
|
||||||
v0mig: 'v0mig',
|
|
||||||
v1mig: 'v1mig',
|
|
||||||
v1: 'v1',
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,228 +0,0 @@
|
||||||
const assert = require('assert');
|
|
||||||
const stream = require('stream');
|
|
||||||
const MergeStream = require('../../../../lib/algos/stream/MergeStream');
|
|
||||||
|
|
||||||
class Streamify extends stream.Readable {
|
|
||||||
constructor(objectsToSend, errorAtEnd) {
|
|
||||||
super({ objectMode: true });
|
|
||||||
this._remaining = Array.from(objectsToSend);
|
|
||||||
this._remaining.reverse();
|
|
||||||
this._errorAtEnd = errorAtEnd || false;
|
|
||||||
}
|
|
||||||
|
|
||||||
_read() {
|
|
||||||
process.nextTick(() => {
|
|
||||||
while (this._remaining.length > 0) {
|
|
||||||
const item = this._remaining.pop();
|
|
||||||
if (!this.push(item)) {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (this._errorAtEnd) {
|
|
||||||
return this.emit('error', new Error('OOPS'));
|
|
||||||
}
|
|
||||||
return this.push(null);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function readAll(stream, usePauseResume, cb) {
|
|
||||||
const result = [];
|
|
||||||
stream.on('data', item => {
|
|
||||||
result.push(item);
|
|
||||||
if (usePauseResume) {
|
|
||||||
stream.pause();
|
|
||||||
setTimeout(() => stream.resume(), 1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
stream.once('end', () => cb(null, result));
|
|
||||||
stream.once('error', err => cb(err));
|
|
||||||
}
|
|
||||||
|
|
||||||
function testMergeStreamWithIntegers(contents1, contents2,
|
|
||||||
usePauseResume, errorAtEnd, cb) {
|
|
||||||
const compareInt = (a, b) => {
|
|
||||||
if (a < b) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (a > b) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
};
|
|
||||||
const expectedItems = contents1.concat(contents2).sort(compareInt);
|
|
||||||
const mergeStream = new MergeStream(
|
|
||||||
new Streamify(contents1, errorAtEnd)
|
|
||||||
.on('error', () => {}),
|
|
||||||
new Streamify(contents2)
|
|
||||||
.on('error', () => {}),
|
|
||||||
compareInt);
|
|
||||||
readAll(mergeStream, usePauseResume, (err, readItems) => {
|
|
||||||
if (errorAtEnd) {
|
|
||||||
assert(err);
|
|
||||||
} else {
|
|
||||||
assert.ifError(err);
|
|
||||||
assert.deepStrictEqual(readItems, expectedItems);
|
|
||||||
}
|
|
||||||
cb();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function testCasePretty(testCase, reversed) {
|
|
||||||
const desc1 = JSON.stringify(
|
|
||||||
reversed ? testCase.stream2 : testCase.stream1);
|
|
||||||
const desc2 = JSON.stringify(
|
|
||||||
reversed ? testCase.stream1 : testCase.stream2);
|
|
||||||
return `${desc1} merged with ${desc2}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
describe('MergeStream', () => {
|
|
||||||
[
|
|
||||||
{
|
|
||||||
stream1: [],
|
|
||||||
stream2: [],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0],
|
|
||||||
stream2: [],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1, 2, 3, 4],
|
|
||||||
stream2: [],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0],
|
|
||||||
stream2: [1],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [1, 2, 3, 4, 5],
|
|
||||||
stream2: [0],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1, 2, 3, 4],
|
|
||||||
stream2: [5],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [1, 2],
|
|
||||||
stream2: [3, 4, 5],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [1, 2, 3],
|
|
||||||
stream2: [4, 5],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [1, 3, 5, 7, 9],
|
|
||||||
stream2: [2, 4, 6, 8, 10],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [1, 4, 7],
|
|
||||||
stream2: [0, 2, 3, 5, 6, 8, 9, 10],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 10],
|
|
||||||
stream2: [1, 2, 3, 4, 5, 6, 7, 8, 9],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [4, 5, 6],
|
|
||||||
stream2: [1, 2, 3, 7, 8, 9],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0],
|
|
||||||
stream2: [0],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1],
|
|
||||||
stream2: [0, 1],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
|
|
||||||
stream2: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 2, 3, 4],
|
|
||||||
stream2: [0, 1, 2, 4],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1, 2, 3],
|
|
||||||
stream2: [1, 2, 3, 4],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1, 2, 3],
|
|
||||||
stream2: [2, 3, 4, 5, 6, 7],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1, 2, 3],
|
|
||||||
stream2: [3, 4, 5, 6],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
stream1: [0, 1, 2, 3],
|
|
||||||
stream2: [0, 3],
|
|
||||||
},
|
|
||||||
].forEach(testCase => {
|
|
||||||
[false, true].forEach(usePauseResume => {
|
|
||||||
[false, true].forEach(errorAtEnd => {
|
|
||||||
const testDesc =
|
|
||||||
`${testCasePretty(testCase, false)}` +
|
|
||||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
|
||||||
`${errorAtEnd ? ' with error' : ''}`;
|
|
||||||
it(`should cover ${testDesc}`, done => {
|
|
||||||
testMergeStreamWithIntegers(
|
|
||||||
testCase.stream1, testCase.stream2,
|
|
||||||
usePauseResume, errorAtEnd, done);
|
|
||||||
});
|
|
||||||
it(`should cover ${testDesc}`, done => {
|
|
||||||
testMergeStreamWithIntegers(
|
|
||||||
testCase.stream2, testCase.stream1,
|
|
||||||
usePauseResume, errorAtEnd, done);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
[100, 1000, 10000, 100000].forEach(nbEntries => {
|
|
||||||
[false, true].forEach(usePauseResume => {
|
|
||||||
[false, true].forEach(errorAtEnd => {
|
|
||||||
if ((!usePauseResume && !errorAtEnd) || nbEntries <= 1000) {
|
|
||||||
const fixtureDesc =
|
|
||||||
`${usePauseResume ? ' with pause/resume' : ''}` +
|
|
||||||
`${errorAtEnd ? ' with error' : ''}`;
|
|
||||||
it(`${nbEntries} sequential entries${fixtureDesc}`,
|
|
||||||
function bigMergeSequential(done) {
|
|
||||||
this.timeout(10000);
|
|
||||||
const stream1 = [];
|
|
||||||
const stream2 = [];
|
|
||||||
for (let i = 0; i < nbEntries; ++i) {
|
|
||||||
// picked two large arbitrary prime numbers to get a
|
|
||||||
// deterministic random-looking series
|
|
||||||
if (Math.floor(i / (nbEntries / 10)) % 2 === 0) {
|
|
||||||
stream1.push(i);
|
|
||||||
} else {
|
|
||||||
stream2.push(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
testMergeStreamWithIntegers(
|
|
||||||
stream1, stream2, usePauseResume, errorAtEnd, done);
|
|
||||||
});
|
|
||||||
it(`${nbEntries} randomly mingled entries${fixtureDesc}`,
|
|
||||||
function bigMergeRandom(done) {
|
|
||||||
this.timeout(10000);
|
|
||||||
const stream1 = [];
|
|
||||||
const stream2 = [];
|
|
||||||
let accu = nbEntries;
|
|
||||||
for (let i = 0; i < nbEntries; ++i) {
|
|
||||||
// picked two large arbitrary prime numbers to get a
|
|
||||||
// deterministic random-looking series
|
|
||||||
accu = (accu * 1592760451) % 8448053;
|
|
||||||
if (accu % 2 === 0) {
|
|
||||||
stream1.push(i);
|
|
||||||
} else {
|
|
||||||
stream2.push(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
testMergeStreamWithIntegers(
|
|
||||||
stream1, stream2, usePauseResume, errorAtEnd, done);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
Loading…
Reference in New Issue