Compare commits

..

No commits in common. "1f32cc8e8f5c93b82022dd5a04b5a9be9f253c30" and "5293d7ef6a9a4d2b8b33c5769185dda854dc3fcb" have entirely different histories.

3 changed files with 1 additions and 40 deletions

View File

@ -4,7 +4,6 @@ class MergeStream extends stream.Readable {
constructor(stream1, stream2, compare) {
super({ objectMode: true });
console.log('MERGE STREAM');
this._compare = compare;
this._streams = [stream1, stream2];
@ -19,12 +18,10 @@ class MergeStream extends stream.Readable {
stream1.on('data', item => this._onItem(stream1, item, 0, 1));
stream1.once('end', () => this._onEnd(stream1, 0, 1));
stream1.once('close', () => this._onEnd(stream1, 0, 1));
stream1.once('error', err => this._onError(stream1, err, 0, 1));
stream2.on('data', item => this._onItem(stream2, item, 1, 0));
stream2.once('end', () => this._onEnd(stream2, 1, 0));
stream2.once('close', () => this._onEnd(stream2, 1, 0));
stream2.once('error', err => this._onError(stream2, err, 1, 0));
}

View File

@ -1667,13 +1667,9 @@ class MongoClientInterface {
const c = this.getCollection(bucketName);
const getLatestVersion = this.getLatestVersion;
let stream;
let baseStream;
console.log(' >> internalListObject 1');
if (!params.secondaryStreamParams) {
// listing masters only (DelimiterMaster)
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.maxKeys);
baseStream = stream;
console.log(' >> internalListObject 2', vFormat === BUCKET_VERSIONS.v1);
if (vFormat === BUCKET_VERSIONS.v1) {
/**
* When listing masters only in v1 we can't just skip PHD
@ -1683,14 +1679,12 @@ class MongoClientInterface {
* mongo read steam and that checks and replaces the key
* read if it's a PHD
* */
console.log(' >> internalListObject 3');
const resolvePhdKey = new Transform({
objectMode: true,
transform(obj, encoding, callback) {
if (Version.isPHD(obj.value)) {
const key = obj.key.slice(DB_PREFIXES.Master.length);
getLatestVersion(c, key, BUCKET_VERSIONS.v1, log, (err, version) => {
console.log(' >> internalListObject 4');
if (err) {
// ignoring PHD keys with no versions as all versions
// might get deleted before the PHD key gets resolved by the listing
@ -1716,12 +1710,7 @@ class MongoClientInterface {
}
},
});
console.log(' >> internalListObject 5');
stream = stream.pipe(resolvePhdKey);
// Propagate 'end' event from resolvePhdKey to stream
resolvePhdKey.on('end', () => {
baseStream.emit('end');
});
}
} else {
// listing both master and version keys (delimiterVersion Algo)
@ -1736,10 +1725,8 @@ class MongoClientInterface {
extension,
gte: gteParams,
});
console.log(' >> internalListObject 6');
const cbOnce = jsutil.once(cb);
skip.setListingEndCb(() => {
console.log(' >> internalListObject 7', typeof stream, typeof stream._cleanup);
stream.emit('end');
stream.destroy();
});
@ -1760,17 +1747,14 @@ class MongoClientInterface {
// eslint-disable-next-line no-param-reassign
newParams.mainStreamParams.gte = range;
}
console.log(' >> internalListObject 8');
// then continue listing the next key range
this.internalListObject(bucketName, newParams, extension, vFormat, log, cb);
});
stream
.on('data', entry => {
console.log(' >> internalListObject 9');
skip.filter(entry);
})
.on('error', err => {
console.log(' >> internalListObject 10', err);
const logObj = {
rawError: err,
error: err.message,
@ -1781,10 +1765,7 @@ class MongoClientInterface {
cbOnce(err);
})
.on('end', () => {
console.log(' >> internalListObject 11');
const data = extension.result();
// clean the stream by calling destroy
stream.destroy();
cbOnce(null, data);
});
return undefined;

View File

@ -1,21 +1,12 @@
const Readable = require('stream').Readable;
const MongoUtils = require('./utils');
setInterval(() => {
console.log("numberOfReadStreamOpen", MongoReadStream.numberOfReadStreamOpen);
console.log("numberOfReadStreamClosed", MongoReadStream.numberOfReadStreamClosed);
}, 1000);
class MongoReadStream extends Readable {
static numberOfReadStreamOpen = 0;
static numberOfReadStreamClosed = 0;
constructor(c, options, searchOptions, batchSize) {
super({
objectMode: true,
highWaterMark: 0,
});
MongoReadStream.numberOfReadStreamOpen++;
if (options.limit === 0) {
return;
@ -141,35 +132,27 @@ class MongoReadStream extends Readable {
if (this._destroyed) {
return;
}
console.error('Error in stream', err);
this.emit('error', err);
return;
});
}
_cleanup() {
console.log(' >> _cleanup');
if (this._destroyed) {
return;
}
this._destroyed = true;
MongoReadStream.numberOfReadStreamClosed++;
console.log(' >> _cleanup post inc');
this._cursor.close().catch(err => {
console.log(' >> _cleanup error');
if (err) {
this.emit('error', err);
return;
}
this.emit('close');
}).then(() => {
console.log(' >> _cleanup then');
});
}
destroy() {
console.log(' >> destroy');
return this._cleanup();
}
}