Compare commits
10 Commits
5293d7ef6a
...
1f32cc8e8f
Author | SHA1 | Date |
---|---|---|
williamlardier | 1f32cc8e8f | |
williamlardier | aec5ddeec6 | |
williamlardier | 5d6c1033c6 | |
williamlardier | 6fa9cdbc3f | |
williamlardier | 4eb829c414 | |
williamlardier | ec43ddda93 | |
williamlardier | 7c16611bce | |
williamlardier | c170ca2183 | |
williamlardier | 0af8869522 | |
williamlardier | 251cf29ddc |
|
@ -4,6 +4,7 @@ class MergeStream extends stream.Readable {
|
|||
constructor(stream1, stream2, compare) {
|
||||
super({ objectMode: true });
|
||||
|
||||
console.log('MERGE STREAM');
|
||||
this._compare = compare;
|
||||
this._streams = [stream1, stream2];
|
||||
|
||||
|
@ -18,10 +19,12 @@ class MergeStream extends stream.Readable {
|
|||
|
||||
stream1.on('data', item => this._onItem(stream1, item, 0, 1));
|
||||
stream1.once('end', () => this._onEnd(stream1, 0, 1));
|
||||
stream1.once('close', () => this._onEnd(stream1, 0, 1));
|
||||
stream1.once('error', err => this._onError(stream1, err, 0, 1));
|
||||
|
||||
stream2.on('data', item => this._onItem(stream2, item, 1, 0));
|
||||
stream2.once('end', () => this._onEnd(stream2, 1, 0));
|
||||
stream2.once('close', () => this._onEnd(stream2, 1, 0));
|
||||
stream2.once('error', err => this._onError(stream2, err, 1, 0));
|
||||
}
|
||||
|
||||
|
|
|
@ -1667,9 +1667,13 @@ class MongoClientInterface {
|
|||
const c = this.getCollection(bucketName);
|
||||
const getLatestVersion = this.getLatestVersion;
|
||||
let stream;
|
||||
let baseStream;
|
||||
console.log(' >> internalListObject 1');
|
||||
if (!params.secondaryStreamParams) {
|
||||
// listing masters only (DelimiterMaster)
|
||||
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.maxKeys);
|
||||
baseStream = stream;
|
||||
console.log(' >> internalListObject 2', vFormat === BUCKET_VERSIONS.v1);
|
||||
if (vFormat === BUCKET_VERSIONS.v1) {
|
||||
/**
|
||||
* When listing masters only in v1 we can't just skip PHD
|
||||
|
@ -1679,12 +1683,14 @@ class MongoClientInterface {
|
|||
* mongo read steam and that checks and replaces the key
|
||||
* read if it's a PHD
|
||||
* */
|
||||
console.log(' >> internalListObject 3');
|
||||
const resolvePhdKey = new Transform({
|
||||
objectMode: true,
|
||||
transform(obj, encoding, callback) {
|
||||
if (Version.isPHD(obj.value)) {
|
||||
const key = obj.key.slice(DB_PREFIXES.Master.length);
|
||||
getLatestVersion(c, key, BUCKET_VERSIONS.v1, log, (err, version) => {
|
||||
console.log(' >> internalListObject 4');
|
||||
if (err) {
|
||||
// ignoring PHD keys with no versions as all versions
|
||||
// might get deleted before the PHD key gets resolved by the listing
|
||||
|
@ -1710,7 +1716,12 @@ class MongoClientInterface {
|
|||
}
|
||||
},
|
||||
});
|
||||
console.log(' >> internalListObject 5');
|
||||
stream = stream.pipe(resolvePhdKey);
|
||||
// Propagate 'end' event from resolvePhdKey to stream
|
||||
resolvePhdKey.on('end', () => {
|
||||
baseStream.emit('end');
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// listing both master and version keys (delimiterVersion Algo)
|
||||
|
@ -1725,8 +1736,10 @@ class MongoClientInterface {
|
|||
extension,
|
||||
gte: gteParams,
|
||||
});
|
||||
console.log(' >> internalListObject 6');
|
||||
const cbOnce = jsutil.once(cb);
|
||||
skip.setListingEndCb(() => {
|
||||
console.log(' >> internalListObject 7', typeof stream, typeof stream._cleanup);
|
||||
stream.emit('end');
|
||||
stream.destroy();
|
||||
});
|
||||
|
@ -1747,14 +1760,17 @@ class MongoClientInterface {
|
|||
// eslint-disable-next-line no-param-reassign
|
||||
newParams.mainStreamParams.gte = range;
|
||||
}
|
||||
console.log(' >> internalListObject 8');
|
||||
// then continue listing the next key range
|
||||
this.internalListObject(bucketName, newParams, extension, vFormat, log, cb);
|
||||
});
|
||||
stream
|
||||
.on('data', entry => {
|
||||
console.log(' >> internalListObject 9');
|
||||
skip.filter(entry);
|
||||
})
|
||||
.on('error', err => {
|
||||
console.log(' >> internalListObject 10', err);
|
||||
const logObj = {
|
||||
rawError: err,
|
||||
error: err.message,
|
||||
|
@ -1765,7 +1781,10 @@ class MongoClientInterface {
|
|||
cbOnce(err);
|
||||
})
|
||||
.on('end', () => {
|
||||
console.log(' >> internalListObject 11');
|
||||
const data = extension.result();
|
||||
// clean the stream by calling destroy
|
||||
stream.destroy();
|
||||
cbOnce(null, data);
|
||||
});
|
||||
return undefined;
|
||||
|
|
|
@ -1,12 +1,21 @@
|
|||
const Readable = require('stream').Readable;
|
||||
const MongoUtils = require('./utils');
|
||||
|
||||
setInterval(() => {
|
||||
console.log("numberOfReadStreamOpen", MongoReadStream.numberOfReadStreamOpen);
|
||||
console.log("numberOfReadStreamClosed", MongoReadStream.numberOfReadStreamClosed);
|
||||
}, 1000);
|
||||
|
||||
class MongoReadStream extends Readable {
|
||||
static numberOfReadStreamOpen = 0;
|
||||
static numberOfReadStreamClosed = 0;
|
||||
|
||||
constructor(c, options, searchOptions, batchSize) {
|
||||
super({
|
||||
objectMode: true,
|
||||
highWaterMark: 0,
|
||||
});
|
||||
MongoReadStream.numberOfReadStreamOpen++;
|
||||
|
||||
if (options.limit === 0) {
|
||||
return;
|
||||
|
@ -132,27 +141,35 @@ class MongoReadStream extends Readable {
|
|||
if (this._destroyed) {
|
||||
return;
|
||||
}
|
||||
console.error('Error in stream', err);
|
||||
this.emit('error', err);
|
||||
return;
|
||||
});
|
||||
}
|
||||
|
||||
_cleanup() {
|
||||
console.log(' >> _cleanup');
|
||||
if (this._destroyed) {
|
||||
return;
|
||||
}
|
||||
this._destroyed = true;
|
||||
MongoReadStream.numberOfReadStreamClosed++;
|
||||
console.log(' >> _cleanup post inc');
|
||||
|
||||
this._cursor.close().catch(err => {
|
||||
console.log(' >> _cleanup error');
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
return;
|
||||
}
|
||||
this.emit('close');
|
||||
}).then(() => {
|
||||
console.log(' >> _cleanup then');
|
||||
});
|
||||
}
|
||||
|
||||
destroy() {
|
||||
console.log(' >> destroy');
|
||||
return this._cleanup();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue