Compare commits
1 Commits
developmen
...
bugfix/ZEN
Author | SHA1 | Date |
---|---|---|
vrancurel | d8e51a66c1 |
|
@ -10,6 +10,7 @@
|
|||
* We use proper atomic operations when needed.
|
||||
*/
|
||||
const async = require('async');
|
||||
const crypto = require('crypto');
|
||||
|
||||
const constants = require('../../../constants');
|
||||
|
||||
|
@ -39,6 +40,10 @@ const CONNECT_TIMEOUT_MS = 5000;
|
|||
// MongoDB default
|
||||
const SOCKET_TIMEOUT_MS = 360000;
|
||||
const CONCURRENT_CURSORS = 10;
|
||||
// Search
|
||||
const MAX_TIME_MS = 300000;
|
||||
const TEMP_SEARCH_PREFIX = '__temp_search';
|
||||
const SEARCH_PREFIX = '__search';
|
||||
|
||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||
|
||||
|
@ -109,6 +114,11 @@ class MongoClientInterface {
|
|||
!Number.isNaN(process.env.CONCURRENT_CURSORS))
|
||||
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
|
||||
: CONCURRENT_CURSORS;
|
||||
|
||||
this.maxTimeMs = (process.env.MAX_TIME_MS &&
|
||||
!Number.isNaN(process.env.MAX_TIME_MS))
|
||||
? Number.parseInt(process.env.MAX_TIME_MS, 10)
|
||||
: MAX_TIME_MS;
|
||||
}
|
||||
|
||||
setup(cb) {
|
||||
|
@ -939,9 +949,8 @@ class MongoClientInterface {
|
|||
params, log, cb);
|
||||
}
|
||||
|
||||
internalListObject(bucketName, params, extension, log, cb) {
|
||||
const c = this.getCollection(bucketName);
|
||||
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
|
||||
internalListObject(c, params, extension, log, cb) {
|
||||
const stream = new MongoReadStream(c, params);
|
||||
const skip = new Skip({
|
||||
extension,
|
||||
gte: params.gte,
|
||||
|
@ -963,7 +972,7 @@ class MongoClientInterface {
|
|||
newParams.gte = range;
|
||||
|
||||
// then continue listing the next key range
|
||||
this.internalListObject(bucketName, newParams, extension, log, cb);
|
||||
this.internalListObject(c, newParams, extension, log, cb);
|
||||
});
|
||||
|
||||
stream
|
||||
|
@ -993,22 +1002,127 @@ class MongoClientInterface {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
/*
|
||||
* Execute the user-defined query in a stage then sort it for
|
||||
* stateless paging. The output is stored in a temporary
|
||||
* collection in a special namespace that will be periodically
|
||||
* erased (e.g. once a day).
|
||||
*
|
||||
* All search queries are bounded by MAX_TIME_MS env (default is
|
||||
* 5mn).
|
||||
*/
|
||||
doSearch(c, searchCollection, params, extension, searchOptions, log, cb) {
|
||||
// use temp name to avoid races
|
||||
const tempCollection =
|
||||
TEMP_SEARCH_PREFIX +
|
||||
crypto.randomBytes(16).toString('hex');
|
||||
log.info('doSearch: launching aggregate', {
|
||||
searchCollection,
|
||||
});
|
||||
const _cursor = c.aggregate([
|
||||
{ $match: searchOptions }, // user query
|
||||
{ $out: tempCollection }, // a job will clean it up
|
||||
],
|
||||
{
|
||||
maxTimeMs: this.maxTimeMs, // always limit
|
||||
allowDiskUse: true, // stage large queries on disk
|
||||
},
|
||||
null);
|
||||
_cursor.toArray(err => {
|
||||
if (err) {
|
||||
log.error('doSearch: error', {
|
||||
error: err.message,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
// final rename
|
||||
this.db.renameCollection(
|
||||
tempCollection,
|
||||
searchCollection,
|
||||
{
|
||||
dropTarget: true,
|
||||
},
|
||||
err => {
|
||||
if (err) {
|
||||
log.error('doSearch: renaming', {
|
||||
error: err.message,
|
||||
tempCollection,
|
||||
searchCollection,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
log.info('doSearch: aggregate done', {
|
||||
searchCollection,
|
||||
});
|
||||
return this.internalListObject(
|
||||
this.db.collection(searchCollection),
|
||||
params, extension,
|
||||
log, cb);
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the used defined query has been cached otherwise
|
||||
* launch the search
|
||||
*/
|
||||
prepareSearch(bucketName, params, extension, searchOptions, log, cb) {
|
||||
const c = this.getCollection(bucketName);
|
||||
// generate the search collection name
|
||||
const searchCollection =
|
||||
SEARCH_PREFIX +
|
||||
crypto.createHash('md5').
|
||||
update(JSON.stringify(searchOptions)).
|
||||
digest('hex');
|
||||
this.db.listCollections({
|
||||
name: searchCollection,
|
||||
}).toArray((err, items) => {
|
||||
if (err) {
|
||||
log.error('prepareSearch: listing collection', {
|
||||
error: err.message,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
if (items.length > 0) {
|
||||
log.info('prepareSearch: using cache', {
|
||||
searchCollection,
|
||||
});
|
||||
return this.internalListObject(
|
||||
this.db.collection(searchCollection),
|
||||
params, extension,
|
||||
log, cb);
|
||||
}
|
||||
return this.doSearch(
|
||||
c, searchCollection,
|
||||
params, extension, searchOptions,
|
||||
log, cb);
|
||||
});
|
||||
}
|
||||
|
||||
listObject(bucketName, params, log, cb) {
|
||||
const extName = params.listingType;
|
||||
const extension = new listAlgos[extName](params, log);
|
||||
const internalParams = extension.genMDParams();
|
||||
internalParams.mongifiedSearch = params.mongifiedSearch;
|
||||
return this.internalListObject(bucketName, internalParams, extension,
|
||||
log, cb);
|
||||
if (params.mongifiedSearch) {
|
||||
return this.prepareSearch(
|
||||
bucketName, internalParams, extension,
|
||||
params.mongifiedSearch, log, cb);
|
||||
}
|
||||
return this.internalListObject(
|
||||
this.getCollection(bucketName),
|
||||
internalParams, extension,
|
||||
log, cb);
|
||||
}
|
||||
|
||||
listMultipartUploads(bucketName, params, log, cb) {
|
||||
const extName = params.listingType;
|
||||
const extension = new listAlgos[extName](params, log);
|
||||
const internalParams = extension.genMDParams();
|
||||
internalParams.mongifiedSearch = params.mongifiedSearch;
|
||||
return this.internalListObject(bucketName, internalParams, extension,
|
||||
log, cb);
|
||||
return this.internalListObject(
|
||||
this.getCollection(bucketName),
|
||||
internalParams, extension,
|
||||
log, cb);
|
||||
}
|
||||
|
||||
checkHealth(implName, log, cb) {
|
||||
|
|
|
@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
|
|||
const MongoUtils = require('./utils');
|
||||
|
||||
class MongoReadStream extends Readable {
|
||||
constructor(c, options, searchOptions) {
|
||||
constructor(c, options) {
|
||||
super({
|
||||
objectMode: true,
|
||||
highWaterMark: 0,
|
||||
|
@ -59,10 +59,6 @@ class MongoReadStream extends Readable {
|
|||
delete query._id;
|
||||
}
|
||||
|
||||
if (searchOptions) {
|
||||
Object.assign(query, searchOptions);
|
||||
}
|
||||
|
||||
this._cursor = c.find(query).sort({
|
||||
_id: options.reverse ? -1 : 1,
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue