Compare commits

...

18 Commits

Author SHA1 Message Date
williamlardier 1f32cc8e8f wip 2023-10-04 21:50:22 +02:00
williamlardier aec5ddeec6 wip 2023-10-04 21:16:06 +02:00
williamlardier 5d6c1033c6 wip 2023-10-04 19:43:42 +02:00
williamlardier 6fa9cdbc3f wip 2023-10-04 19:23:35 +02:00
williamlardier 4eb829c414 wip 2023-10-04 19:08:48 +02:00
williamlardier ec43ddda93 wip 2023-10-04 19:01:23 +02:00
williamlardier 7c16611bce wip 2023-10-04 17:59:28 +02:00
williamlardier c170ca2183 wip 2023-10-04 17:47:46 +02:00
williamlardier 0af8869522 wip 2023-10-04 17:39:54 +02:00
williamlardier 251cf29ddc s 2023-10-04 17:16:20 +02:00
williamlardier 5293d7ef6a wip 8 2023-10-03 16:45:59 +02:00
williamlardier 0c262f7000 wip 7 2023-10-03 15:40:25 +02:00
williamlardier c31969140d wip 6 2023-10-03 15:22:47 +02:00
williamlardier 4138547cf1 wip 5 2023-10-03 14:44:39 +02:00
williamlardier dc5a276b92 wip 4 2023-10-03 11:29:18 +02:00
williamlardier 7894734382 wip 3 2023-10-03 10:21:49 +02:00
williamlardier 926da3da1a wip 2 2023-10-03 09:05:24 +02:00
williamlardier 65c9f80e2c wip 2023-10-03 09:02:57 +02:00
3 changed files with 93 additions and 27 deletions

View File

@ -4,6 +4,7 @@ class MergeStream extends stream.Readable {
constructor(stream1, stream2, compare) { constructor(stream1, stream2, compare) {
super({ objectMode: true }); super({ objectMode: true });
console.log('MERGE STREAM');
this._compare = compare; this._compare = compare;
this._streams = [stream1, stream2]; this._streams = [stream1, stream2];
@ -18,10 +19,12 @@ class MergeStream extends stream.Readable {
stream1.on('data', item => this._onItem(stream1, item, 0, 1)); stream1.on('data', item => this._onItem(stream1, item, 0, 1));
stream1.once('end', () => this._onEnd(stream1, 0, 1)); stream1.once('end', () => this._onEnd(stream1, 0, 1));
stream1.once('close', () => this._onEnd(stream1, 0, 1));
stream1.once('error', err => this._onError(stream1, err, 0, 1)); stream1.once('error', err => this._onError(stream1, err, 0, 1));
stream2.on('data', item => this._onItem(stream2, item, 1, 0)); stream2.on('data', item => this._onItem(stream2, item, 1, 0));
stream2.once('end', () => this._onEnd(stream2, 1, 0)); stream2.once('end', () => this._onEnd(stream2, 1, 0));
stream2.once('close', () => this._onEnd(stream2, 1, 0));
stream2.once('error', err => this._onError(stream2, err, 1, 0)); stream2.once('error', err => this._onError(stream2, err, 1, 0));
} }

View File

@ -53,6 +53,9 @@ const CONCURRENT_CURSORS = 10;
const initialInstanceID = process.env.INITIAL_INSTANCE_ID; const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
let uidCounter = 0; let uidCounter = 0;
let ___cache = null; // Global cache
const isCached = process.env.MONGO_DEBUG === 'true';
console.log('!!!!!', isCached);
const BUCKET_VERSIONS = require('../../../versioning/constants') const BUCKET_VERSIONS = require('../../../versioning/constants')
.VersioningConstants.BucketVersioningKeyFormat; .VersioningConstants.BucketVersioningKeyFormat;
@ -1050,14 +1053,19 @@ class MongoClientInterface {
* @return {undefined} * @return {undefined}
*/ */
getObjects(bucketName, objects, log, callback) { getObjects(bucketName, objects, log, callback) {
console.log(___cache)
if (___cache && isCached) {
console.log('returning cached')
return callback(null, ___cache);
}
const c = this.getCollection(bucketName); const c = this.getCollection(bucketName);
let vFormat = null; let vFormat = null;
if (!Array.isArray(objects)) { if (!Array.isArray(objects)) {
return callback(errors.InternalError.customizeDescription('objects must be an array')); return callback(errors.InternalError.customizeDescription('objects must be an array'));
} }
// We do not accept more than 1000 keys in a single request // We do not accept more than listingParams.maxKeys keys in a single request
if (objects.length > 1000) { if (objects.length > listingParams.maxKeys) {
return callback(errors.InternalError.customizeDescription('cannot get more than 1000 objects')); return callback(errors.InternalError.customizeDescription(`cannot get more than ${listingParams.maxKeys} objects`));
} }
// Function to process each document // Function to process each document
const processDoc = (doc, objName, params, key, cb) => { const processDoc = (doc, objName, params, key, cb) => {
@ -1094,29 +1102,46 @@ class MongoClientInterface {
return callback(err); return callback(err);
} }
vFormat = _vFormat; vFormat = _vFormat;
const keys = objects.map(({ key: objName, params }) => (params && params.versionId
const keys = objects.map(({ key: objName, params }) => {
return params && params.versionId
? formatVersionKey(objName, params.versionId, vFormat) ? formatVersionKey(objName, params.versionId, vFormat)
: formatMasterKey(objName, vFormat))); : formatMasterKey(objName, vFormat);
return c.find({ });
const cursor = c.find({
_id: { $in: keys }, _id: { $in: keys },
$or: [ $or: [
{ 'value.deleted': { $exists: false } }, { 'value.deleted': { $exists: false } },
{ 'value.deleted': { $eq: false } }, { 'value.deleted': { $eq: false } },
], ],
}).toArray().then(docs => { }).batchSize(listingParams.maxKeys).noCursorTimeout(false);
// Create a Map to quickly find docs by their keys
const docByKey = new Map(docs.map(doc => [doc._id, doc])); const docByKey = new Map();
// Process each document using associated context (objName, params)
async.mapLimit(objects, constants.maxBatchingConcurrentOperations, cursor.forEach(doc => {
({ key: objName, params }, cb) => { docByKey.set(doc._id, doc);
}, err => {
if (err) {
return callback(err);
}
async.mapLimit(objects, constants.maxBatchingConcurrentOperations, ({ key: objName, params }, cb) => {
const key = params && params.versionId const key = params && params.versionId
? formatVersionKey(objName, params.versionId, vFormat) ? formatVersionKey(objName, params.versionId, vFormat)
: formatMasterKey(objName, vFormat); : formatMasterKey(objName, vFormat);
const doc = docByKey.get(key); const doc = docByKey.get(key);
processDoc(doc, objName, params, key, cb); processDoc(doc, objName, params, key, cb);
}, callback); }, (err, results) => {
}).catch(err => { if (err) {
callback(err); callback(err);
} else {
___cache = results;
callback(null, results);
}
docByKey.clear();
cursor.close();
});
}); });
}); });
} }
@ -1631,6 +1656,7 @@ class MongoClientInterface {
* @param {Object} params.secondaryStreamParams internal listing param applied * @param {Object} params.secondaryStreamParams internal listing param applied
* to the secondary stream (versionStream when having two streams) (is optional) * to the secondary stream (versionStream when having two streams) (is optional)
* @param {Object} params.mongifiedSearch search options * @param {Object} params.mongifiedSearch search options
* @param {Number} params.maxKeys max number of keys to list
* @param {Object} extension listing extention * @param {Object} extension listing extention
* @param {String} vFormat bucket format version * @param {String} vFormat bucket format version
* @param {Object} log logger * @param {Object} log logger
@ -1641,9 +1667,13 @@ class MongoClientInterface {
const c = this.getCollection(bucketName); const c = this.getCollection(bucketName);
const getLatestVersion = this.getLatestVersion; const getLatestVersion = this.getLatestVersion;
let stream; let stream;
let baseStream;
console.log(' >> internalListObject 1');
if (!params.secondaryStreamParams) { if (!params.secondaryStreamParams) {
// listing masters only (DelimiterMaster) // listing masters only (DelimiterMaster)
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch); stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.maxKeys);
baseStream = stream;
console.log(' >> internalListObject 2', vFormat === BUCKET_VERSIONS.v1);
if (vFormat === BUCKET_VERSIONS.v1) { if (vFormat === BUCKET_VERSIONS.v1) {
/** /**
* When listing masters only in v1 we can't just skip PHD * When listing masters only in v1 we can't just skip PHD
@ -1653,12 +1683,14 @@ class MongoClientInterface {
* mongo read steam and that checks and replaces the key * mongo read steam and that checks and replaces the key
* read if it's a PHD * read if it's a PHD
* */ * */
console.log(' >> internalListObject 3');
const resolvePhdKey = new Transform({ const resolvePhdKey = new Transform({
objectMode: true, objectMode: true,
transform(obj, encoding, callback) { transform(obj, encoding, callback) {
if (Version.isPHD(obj.value)) { if (Version.isPHD(obj.value)) {
const key = obj.key.slice(DB_PREFIXES.Master.length); const key = obj.key.slice(DB_PREFIXES.Master.length);
getLatestVersion(c, key, BUCKET_VERSIONS.v1, log, (err, version) => { getLatestVersion(c, key, BUCKET_VERSIONS.v1, log, (err, version) => {
console.log(' >> internalListObject 4');
if (err) { if (err) {
// ignoring PHD keys with no versions as all versions // ignoring PHD keys with no versions as all versions
// might get deleted before the PHD key gets resolved by the listing // might get deleted before the PHD key gets resolved by the listing
@ -1684,12 +1716,17 @@ class MongoClientInterface {
} }
}, },
}); });
console.log(' >> internalListObject 5');
stream = stream.pipe(resolvePhdKey); stream = stream.pipe(resolvePhdKey);
// Propagate 'end' event from resolvePhdKey to stream
resolvePhdKey.on('end', () => {
baseStream.emit('end');
});
} }
} else { } else {
// listing both master and version keys (delimiterVersion Algo) // listing both master and version keys (delimiterVersion Algo)
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch); const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.maxKeys);
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch); const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch, params.maxKeys);
stream = new MergeStream( stream = new MergeStream(
versionStream, masterStream, extension.compareObjects.bind(extension)); versionStream, masterStream, extension.compareObjects.bind(extension));
} }
@ -1699,8 +1736,10 @@ class MongoClientInterface {
extension, extension,
gte: gteParams, gte: gteParams,
}); });
console.log(' >> internalListObject 6');
const cbOnce = jsutil.once(cb); const cbOnce = jsutil.once(cb);
skip.setListingEndCb(() => { skip.setListingEndCb(() => {
console.log(' >> internalListObject 7', typeof stream, typeof stream._cleanup);
stream.emit('end'); stream.emit('end');
stream.destroy(); stream.destroy();
}); });
@ -1721,14 +1760,17 @@ class MongoClientInterface {
// eslint-disable-next-line no-param-reassign // eslint-disable-next-line no-param-reassign
newParams.mainStreamParams.gte = range; newParams.mainStreamParams.gte = range;
} }
console.log(' >> internalListObject 8');
// then continue listing the next key range // then continue listing the next key range
this.internalListObject(bucketName, newParams, extension, vFormat, log, cb); this.internalListObject(bucketName, newParams, extension, vFormat, log, cb);
}); });
stream stream
.on('data', entry => { .on('data', entry => {
console.log(' >> internalListObject 9');
skip.filter(entry); skip.filter(entry);
}) })
.on('error', err => { .on('error', err => {
console.log(' >> internalListObject 10', err);
const logObj = { const logObj = {
rawError: err, rawError: err,
error: err.message, error: err.message,
@ -1739,7 +1781,10 @@ class MongoClientInterface {
cbOnce(err); cbOnce(err);
}) })
.on('end', () => { .on('end', () => {
console.log(' >> internalListObject 11');
const data = extension.result(); const data = extension.result();
// clean the stream by calling destroy
stream.destroy();
cbOnce(null, data); cbOnce(null, data);
}); });
return undefined; return undefined;
@ -1775,6 +1820,7 @@ class MongoClientInterface {
const internalParams = { const internalParams = {
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams, mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null, secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
maxKeys: params.maxKeys,
}; };
internalParams.mongifiedSearch = params.mongifiedSearch; internalParams.mongifiedSearch = params.mongifiedSearch;
return this.internalListObject(bucketName, internalParams, extension, return this.internalListObject(bucketName, internalParams, extension,

View File

@ -1,12 +1,21 @@
const Readable = require('stream').Readable; const Readable = require('stream').Readable;
const MongoUtils = require('./utils'); const MongoUtils = require('./utils');
setInterval(() => {
console.log("numberOfReadStreamOpen", MongoReadStream.numberOfReadStreamOpen);
console.log("numberOfReadStreamClosed", MongoReadStream.numberOfReadStreamClosed);
}, 1000);
class MongoReadStream extends Readable { class MongoReadStream extends Readable {
constructor(c, options, searchOptions) { static numberOfReadStreamOpen = 0;
static numberOfReadStreamClosed = 0;
constructor(c, options, searchOptions, batchSize) {
super({ super({
objectMode: true, objectMode: true,
highWaterMark: 0, highWaterMark: 0,
}); });
MongoReadStream.numberOfReadStreamOpen++;
if (options.limit === 0) { if (options.limit === 0) {
return; return;
@ -87,7 +96,7 @@ class MongoReadStream extends Readable {
this._cursor = c.find(query).sort({ this._cursor = c.find(query).sort({
_id: options.reverse ? -1 : 1, _id: options.reverse ? -1 : 1,
}); }).batchSize(batchSize);
if (options.limit && options.limit !== -1) { if (options.limit && options.limit !== -1) {
this._cursor = this._cursor.limit(options.limit); this._cursor = this._cursor.limit(options.limit);
} }
@ -132,27 +141,35 @@ class MongoReadStream extends Readable {
if (this._destroyed) { if (this._destroyed) {
return; return;
} }
console.error('Error in stream', err);
this.emit('error', err); this.emit('error', err);
return; return;
}); });
} }
_cleanup() { _cleanup() {
console.log(' >> _cleanup');
if (this._destroyed) { if (this._destroyed) {
return; return;
} }
this._destroyed = true; this._destroyed = true;
MongoReadStream.numberOfReadStreamClosed++;
console.log(' >> _cleanup post inc');
this._cursor.close().catch(err => { this._cursor.close().catch(err => {
console.log(' >> _cleanup error');
if (err) { if (err) {
this.emit('error', err); this.emit('error', err);
return; return;
} }
this.emit('close'); this.emit('close');
}).then(() => {
console.log(' >> _cleanup then');
}); });
} }
destroy() { destroy() {
console.log(' >> destroy');
return this._cleanup(); return this._cleanup();
} }
} }