Compare commits
18 Commits
developmen
...
temp/test-
Author | SHA1 | Date |
---|---|---|
williamlardier | 1f32cc8e8f | |
williamlardier | aec5ddeec6 | |
williamlardier | 5d6c1033c6 | |
williamlardier | 6fa9cdbc3f | |
williamlardier | 4eb829c414 | |
williamlardier | ec43ddda93 | |
williamlardier | 7c16611bce | |
williamlardier | c170ca2183 | |
williamlardier | 0af8869522 | |
williamlardier | 251cf29ddc | |
williamlardier | 5293d7ef6a | |
williamlardier | 0c262f7000 | |
williamlardier | c31969140d | |
williamlardier | 4138547cf1 | |
williamlardier | dc5a276b92 | |
williamlardier | 7894734382 | |
williamlardier | 926da3da1a | |
williamlardier | 65c9f80e2c |
|
@ -4,6 +4,7 @@ class MergeStream extends stream.Readable {
|
||||||
constructor(stream1, stream2, compare) {
|
constructor(stream1, stream2, compare) {
|
||||||
super({ objectMode: true });
|
super({ objectMode: true });
|
||||||
|
|
||||||
|
console.log('MERGE STREAM');
|
||||||
this._compare = compare;
|
this._compare = compare;
|
||||||
this._streams = [stream1, stream2];
|
this._streams = [stream1, stream2];
|
||||||
|
|
||||||
|
@ -18,10 +19,12 @@ class MergeStream extends stream.Readable {
|
||||||
|
|
||||||
stream1.on('data', item => this._onItem(stream1, item, 0, 1));
|
stream1.on('data', item => this._onItem(stream1, item, 0, 1));
|
||||||
stream1.once('end', () => this._onEnd(stream1, 0, 1));
|
stream1.once('end', () => this._onEnd(stream1, 0, 1));
|
||||||
|
stream1.once('close', () => this._onEnd(stream1, 0, 1));
|
||||||
stream1.once('error', err => this._onError(stream1, err, 0, 1));
|
stream1.once('error', err => this._onError(stream1, err, 0, 1));
|
||||||
|
|
||||||
stream2.on('data', item => this._onItem(stream2, item, 1, 0));
|
stream2.on('data', item => this._onItem(stream2, item, 1, 0));
|
||||||
stream2.once('end', () => this._onEnd(stream2, 1, 0));
|
stream2.once('end', () => this._onEnd(stream2, 1, 0));
|
||||||
|
stream2.once('close', () => this._onEnd(stream2, 1, 0));
|
||||||
stream2.once('error', err => this._onError(stream2, err, 1, 0));
|
stream2.once('error', err => this._onError(stream2, err, 1, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,9 @@ const CONCURRENT_CURSORS = 10;
|
||||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||||
|
|
||||||
let uidCounter = 0;
|
let uidCounter = 0;
|
||||||
|
let ___cache = null; // Global cache
|
||||||
|
const isCached = process.env.MONGO_DEBUG === 'true';
|
||||||
|
console.log('!!!!!', isCached);
|
||||||
|
|
||||||
const BUCKET_VERSIONS = require('../../../versioning/constants')
|
const BUCKET_VERSIONS = require('../../../versioning/constants')
|
||||||
.VersioningConstants.BucketVersioningKeyFormat;
|
.VersioningConstants.BucketVersioningKeyFormat;
|
||||||
|
@ -1050,14 +1053,19 @@ class MongoClientInterface {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getObjects(bucketName, objects, log, callback) {
|
getObjects(bucketName, objects, log, callback) {
|
||||||
|
console.log(___cache)
|
||||||
|
if (___cache && isCached) {
|
||||||
|
console.log('returning cached')
|
||||||
|
return callback(null, ___cache);
|
||||||
|
}
|
||||||
const c = this.getCollection(bucketName);
|
const c = this.getCollection(bucketName);
|
||||||
let vFormat = null;
|
let vFormat = null;
|
||||||
if (!Array.isArray(objects)) {
|
if (!Array.isArray(objects)) {
|
||||||
return callback(errors.InternalError.customizeDescription('objects must be an array'));
|
return callback(errors.InternalError.customizeDescription('objects must be an array'));
|
||||||
}
|
}
|
||||||
// We do not accept more than 1000 keys in a single request
|
// We do not accept more than listingParams.maxKeys keys in a single request
|
||||||
if (objects.length > 1000) {
|
if (objects.length > listingParams.maxKeys) {
|
||||||
return callback(errors.InternalError.customizeDescription('cannot get more than 1000 objects'));
|
return callback(errors.InternalError.customizeDescription(`cannot get more than ${listingParams.maxKeys} objects`));
|
||||||
}
|
}
|
||||||
// Function to process each document
|
// Function to process each document
|
||||||
const processDoc = (doc, objName, params, key, cb) => {
|
const processDoc = (doc, objName, params, key, cb) => {
|
||||||
|
@ -1094,29 +1102,46 @@ class MongoClientInterface {
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
vFormat = _vFormat;
|
vFormat = _vFormat;
|
||||||
const keys = objects.map(({ key: objName, params }) => (params && params.versionId
|
|
||||||
? formatVersionKey(objName, params.versionId, vFormat)
|
const keys = objects.map(({ key: objName, params }) => {
|
||||||
: formatMasterKey(objName, vFormat)));
|
return params && params.versionId
|
||||||
return c.find({
|
? formatVersionKey(objName, params.versionId, vFormat)
|
||||||
|
: formatMasterKey(objName, vFormat);
|
||||||
|
});
|
||||||
|
|
||||||
|
const cursor = c.find({
|
||||||
_id: { $in: keys },
|
_id: { $in: keys },
|
||||||
$or: [
|
$or: [
|
||||||
{ 'value.deleted': { $exists: false } },
|
{ 'value.deleted': { $exists: false } },
|
||||||
{ 'value.deleted': { $eq: false } },
|
{ 'value.deleted': { $eq: false } },
|
||||||
],
|
],
|
||||||
}).toArray().then(docs => {
|
}).batchSize(listingParams.maxKeys).noCursorTimeout(false);
|
||||||
// Create a Map to quickly find docs by their keys
|
|
||||||
const docByKey = new Map(docs.map(doc => [doc._id, doc]));
|
const docByKey = new Map();
|
||||||
// Process each document using associated context (objName, params)
|
|
||||||
async.mapLimit(objects, constants.maxBatchingConcurrentOperations,
|
cursor.forEach(doc => {
|
||||||
({ key: objName, params }, cb) => {
|
docByKey.set(doc._id, doc);
|
||||||
const key = params && params.versionId
|
}, err => {
|
||||||
? formatVersionKey(objName, params.versionId, vFormat)
|
if (err) {
|
||||||
: formatMasterKey(objName, vFormat);
|
return callback(err);
|
||||||
const doc = docByKey.get(key);
|
}
|
||||||
processDoc(doc, objName, params, key, cb);
|
|
||||||
}, callback);
|
async.mapLimit(objects, constants.maxBatchingConcurrentOperations, ({ key: objName, params }, cb) => {
|
||||||
}).catch(err => {
|
const key = params && params.versionId
|
||||||
callback(err);
|
? formatVersionKey(objName, params.versionId, vFormat)
|
||||||
|
: formatMasterKey(objName, vFormat);
|
||||||
|
const doc = docByKey.get(key);
|
||||||
|
processDoc(doc, objName, params, key, cb);
|
||||||
|
}, (err, results) => {
|
||||||
|
if (err) {
|
||||||
|
callback(err);
|
||||||
|
} else {
|
||||||
|
___cache = results;
|
||||||
|
callback(null, results);
|
||||||
|
}
|
||||||
|
docByKey.clear();
|
||||||
|
cursor.close();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1631,6 +1656,7 @@ class MongoClientInterface {
|
||||||
* @param {Object} params.secondaryStreamParams internal listing param applied
|
* @param {Object} params.secondaryStreamParams internal listing param applied
|
||||||
* to the secondary stream (versionStream when having two streams) (is optional)
|
* to the secondary stream (versionStream when having two streams) (is optional)
|
||||||
* @param {Object} params.mongifiedSearch search options
|
* @param {Object} params.mongifiedSearch search options
|
||||||
|
* @param {Number} params.maxKeys max number of keys to list
|
||||||
* @param {Object} extension listing extention
|
* @param {Object} extension listing extention
|
||||||
* @param {String} vFormat bucket format version
|
* @param {String} vFormat bucket format version
|
||||||
* @param {Object} log logger
|
* @param {Object} log logger
|
||||||
|
@ -1641,9 +1667,13 @@ class MongoClientInterface {
|
||||||
const c = this.getCollection(bucketName);
|
const c = this.getCollection(bucketName);
|
||||||
const getLatestVersion = this.getLatestVersion;
|
const getLatestVersion = this.getLatestVersion;
|
||||||
let stream;
|
let stream;
|
||||||
|
let baseStream;
|
||||||
|
console.log(' >> internalListObject 1');
|
||||||
if (!params.secondaryStreamParams) {
|
if (!params.secondaryStreamParams) {
|
||||||
// listing masters only (DelimiterMaster)
|
// listing masters only (DelimiterMaster)
|
||||||
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
|
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.maxKeys);
|
||||||
|
baseStream = stream;
|
||||||
|
console.log(' >> internalListObject 2', vFormat === BUCKET_VERSIONS.v1);
|
||||||
if (vFormat === BUCKET_VERSIONS.v1) {
|
if (vFormat === BUCKET_VERSIONS.v1) {
|
||||||
/**
|
/**
|
||||||
* When listing masters only in v1 we can't just skip PHD
|
* When listing masters only in v1 we can't just skip PHD
|
||||||
|
@ -1653,12 +1683,14 @@ class MongoClientInterface {
|
||||||
* mongo read steam and that checks and replaces the key
|
* mongo read steam and that checks and replaces the key
|
||||||
* read if it's a PHD
|
* read if it's a PHD
|
||||||
* */
|
* */
|
||||||
|
console.log(' >> internalListObject 3');
|
||||||
const resolvePhdKey = new Transform({
|
const resolvePhdKey = new Transform({
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
transform(obj, encoding, callback) {
|
transform(obj, encoding, callback) {
|
||||||
if (Version.isPHD(obj.value)) {
|
if (Version.isPHD(obj.value)) {
|
||||||
const key = obj.key.slice(DB_PREFIXES.Master.length);
|
const key = obj.key.slice(DB_PREFIXES.Master.length);
|
||||||
getLatestVersion(c, key, BUCKET_VERSIONS.v1, log, (err, version) => {
|
getLatestVersion(c, key, BUCKET_VERSIONS.v1, log, (err, version) => {
|
||||||
|
console.log(' >> internalListObject 4');
|
||||||
if (err) {
|
if (err) {
|
||||||
// ignoring PHD keys with no versions as all versions
|
// ignoring PHD keys with no versions as all versions
|
||||||
// might get deleted before the PHD key gets resolved by the listing
|
// might get deleted before the PHD key gets resolved by the listing
|
||||||
|
@ -1684,12 +1716,17 @@ class MongoClientInterface {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
console.log(' >> internalListObject 5');
|
||||||
stream = stream.pipe(resolvePhdKey);
|
stream = stream.pipe(resolvePhdKey);
|
||||||
|
// Propagate 'end' event from resolvePhdKey to stream
|
||||||
|
resolvePhdKey.on('end', () => {
|
||||||
|
baseStream.emit('end');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// listing both master and version keys (delimiterVersion Algo)
|
// listing both master and version keys (delimiterVersion Algo)
|
||||||
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
|
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.maxKeys);
|
||||||
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch);
|
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch, params.maxKeys);
|
||||||
stream = new MergeStream(
|
stream = new MergeStream(
|
||||||
versionStream, masterStream, extension.compareObjects.bind(extension));
|
versionStream, masterStream, extension.compareObjects.bind(extension));
|
||||||
}
|
}
|
||||||
|
@ -1699,8 +1736,10 @@ class MongoClientInterface {
|
||||||
extension,
|
extension,
|
||||||
gte: gteParams,
|
gte: gteParams,
|
||||||
});
|
});
|
||||||
|
console.log(' >> internalListObject 6');
|
||||||
const cbOnce = jsutil.once(cb);
|
const cbOnce = jsutil.once(cb);
|
||||||
skip.setListingEndCb(() => {
|
skip.setListingEndCb(() => {
|
||||||
|
console.log(' >> internalListObject 7', typeof stream, typeof stream._cleanup);
|
||||||
stream.emit('end');
|
stream.emit('end');
|
||||||
stream.destroy();
|
stream.destroy();
|
||||||
});
|
});
|
||||||
|
@ -1721,14 +1760,17 @@ class MongoClientInterface {
|
||||||
// eslint-disable-next-line no-param-reassign
|
// eslint-disable-next-line no-param-reassign
|
||||||
newParams.mainStreamParams.gte = range;
|
newParams.mainStreamParams.gte = range;
|
||||||
}
|
}
|
||||||
|
console.log(' >> internalListObject 8');
|
||||||
// then continue listing the next key range
|
// then continue listing the next key range
|
||||||
this.internalListObject(bucketName, newParams, extension, vFormat, log, cb);
|
this.internalListObject(bucketName, newParams, extension, vFormat, log, cb);
|
||||||
});
|
});
|
||||||
stream
|
stream
|
||||||
.on('data', entry => {
|
.on('data', entry => {
|
||||||
|
console.log(' >> internalListObject 9');
|
||||||
skip.filter(entry);
|
skip.filter(entry);
|
||||||
})
|
})
|
||||||
.on('error', err => {
|
.on('error', err => {
|
||||||
|
console.log(' >> internalListObject 10', err);
|
||||||
const logObj = {
|
const logObj = {
|
||||||
rawError: err,
|
rawError: err,
|
||||||
error: err.message,
|
error: err.message,
|
||||||
|
@ -1739,7 +1781,10 @@ class MongoClientInterface {
|
||||||
cbOnce(err);
|
cbOnce(err);
|
||||||
})
|
})
|
||||||
.on('end', () => {
|
.on('end', () => {
|
||||||
|
console.log(' >> internalListObject 11');
|
||||||
const data = extension.result();
|
const data = extension.result();
|
||||||
|
// clean the stream by calling destroy
|
||||||
|
stream.destroy();
|
||||||
cbOnce(null, data);
|
cbOnce(null, data);
|
||||||
});
|
});
|
||||||
return undefined;
|
return undefined;
|
||||||
|
@ -1775,6 +1820,7 @@ class MongoClientInterface {
|
||||||
const internalParams = {
|
const internalParams = {
|
||||||
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
|
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
|
||||||
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
|
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
|
||||||
|
maxKeys: params.maxKeys,
|
||||||
};
|
};
|
||||||
internalParams.mongifiedSearch = params.mongifiedSearch;
|
internalParams.mongifiedSearch = params.mongifiedSearch;
|
||||||
return this.internalListObject(bucketName, internalParams, extension,
|
return this.internalListObject(bucketName, internalParams, extension,
|
||||||
|
|
|
@ -1,12 +1,21 @@
|
||||||
const Readable = require('stream').Readable;
|
const Readable = require('stream').Readable;
|
||||||
const MongoUtils = require('./utils');
|
const MongoUtils = require('./utils');
|
||||||
|
|
||||||
|
setInterval(() => {
|
||||||
|
console.log("numberOfReadStreamOpen", MongoReadStream.numberOfReadStreamOpen);
|
||||||
|
console.log("numberOfReadStreamClosed", MongoReadStream.numberOfReadStreamClosed);
|
||||||
|
}, 1000);
|
||||||
|
|
||||||
class MongoReadStream extends Readable {
|
class MongoReadStream extends Readable {
|
||||||
constructor(c, options, searchOptions) {
|
static numberOfReadStreamOpen = 0;
|
||||||
|
static numberOfReadStreamClosed = 0;
|
||||||
|
|
||||||
|
constructor(c, options, searchOptions, batchSize) {
|
||||||
super({
|
super({
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
highWaterMark: 0,
|
highWaterMark: 0,
|
||||||
});
|
});
|
||||||
|
MongoReadStream.numberOfReadStreamOpen++;
|
||||||
|
|
||||||
if (options.limit === 0) {
|
if (options.limit === 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -87,7 +96,7 @@ class MongoReadStream extends Readable {
|
||||||
|
|
||||||
this._cursor = c.find(query).sort({
|
this._cursor = c.find(query).sort({
|
||||||
_id: options.reverse ? -1 : 1,
|
_id: options.reverse ? -1 : 1,
|
||||||
});
|
}).batchSize(batchSize);
|
||||||
if (options.limit && options.limit !== -1) {
|
if (options.limit && options.limit !== -1) {
|
||||||
this._cursor = this._cursor.limit(options.limit);
|
this._cursor = this._cursor.limit(options.limit);
|
||||||
}
|
}
|
||||||
|
@ -132,27 +141,35 @@ class MongoReadStream extends Readable {
|
||||||
if (this._destroyed) {
|
if (this._destroyed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
console.error('Error in stream', err);
|
||||||
this.emit('error', err);
|
this.emit('error', err);
|
||||||
return;
|
return;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
_cleanup() {
|
_cleanup() {
|
||||||
|
console.log(' >> _cleanup');
|
||||||
if (this._destroyed) {
|
if (this._destroyed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this._destroyed = true;
|
this._destroyed = true;
|
||||||
|
MongoReadStream.numberOfReadStreamClosed++;
|
||||||
|
console.log(' >> _cleanup post inc');
|
||||||
|
|
||||||
this._cursor.close().catch(err => {
|
this._cursor.close().catch(err => {
|
||||||
|
console.log(' >> _cleanup error');
|
||||||
if (err) {
|
if (err) {
|
||||||
this.emit('error', err);
|
this.emit('error', err);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.emit('close');
|
this.emit('close');
|
||||||
|
}).then(() => {
|
||||||
|
console.log(' >> _cleanup then');
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
destroy() {
|
destroy() {
|
||||||
|
console.log(' >> destroy');
|
||||||
return this._cleanup();
|
return this._cleanup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue