Compare commits
1 Commits
developmen
...
bugfix/ZEN
Author | SHA1 | Date |
---|---|---|
vrancurel | d8e51a66c1 |
|
@ -10,6 +10,7 @@
|
||||||
* We use proper atomic operations when needed.
|
* We use proper atomic operations when needed.
|
||||||
*/
|
*/
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
|
const crypto = require('crypto');
|
||||||
|
|
||||||
const constants = require('../../../constants');
|
const constants = require('../../../constants');
|
||||||
|
|
||||||
|
@ -39,6 +40,10 @@ const CONNECT_TIMEOUT_MS = 5000;
|
||||||
// MongoDB default
|
// MongoDB default
|
||||||
const SOCKET_TIMEOUT_MS = 360000;
|
const SOCKET_TIMEOUT_MS = 360000;
|
||||||
const CONCURRENT_CURSORS = 10;
|
const CONCURRENT_CURSORS = 10;
|
||||||
|
// Search
|
||||||
|
const MAX_TIME_MS = 300000;
|
||||||
|
const TEMP_SEARCH_PREFIX = '__temp_search';
|
||||||
|
const SEARCH_PREFIX = '__search';
|
||||||
|
|
||||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||||
|
|
||||||
|
@ -109,6 +114,11 @@ class MongoClientInterface {
|
||||||
!Number.isNaN(process.env.CONCURRENT_CURSORS))
|
!Number.isNaN(process.env.CONCURRENT_CURSORS))
|
||||||
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
|
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
|
||||||
: CONCURRENT_CURSORS;
|
: CONCURRENT_CURSORS;
|
||||||
|
|
||||||
|
this.maxTimeMs = (process.env.MAX_TIME_MS &&
|
||||||
|
!Number.isNaN(process.env.MAX_TIME_MS))
|
||||||
|
? Number.parseInt(process.env.MAX_TIME_MS, 10)
|
||||||
|
: MAX_TIME_MS;
|
||||||
}
|
}
|
||||||
|
|
||||||
setup(cb) {
|
setup(cb) {
|
||||||
|
@ -939,9 +949,8 @@ class MongoClientInterface {
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
internalListObject(bucketName, params, extension, log, cb) {
|
internalListObject(c, params, extension, log, cb) {
|
||||||
const c = this.getCollection(bucketName);
|
const stream = new MongoReadStream(c, params);
|
||||||
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
|
|
||||||
const skip = new Skip({
|
const skip = new Skip({
|
||||||
extension,
|
extension,
|
||||||
gte: params.gte,
|
gte: params.gte,
|
||||||
|
@ -963,7 +972,7 @@ class MongoClientInterface {
|
||||||
newParams.gte = range;
|
newParams.gte = range;
|
||||||
|
|
||||||
// then continue listing the next key range
|
// then continue listing the next key range
|
||||||
this.internalListObject(bucketName, newParams, extension, log, cb);
|
this.internalListObject(c, newParams, extension, log, cb);
|
||||||
});
|
});
|
||||||
|
|
||||||
stream
|
stream
|
||||||
|
@ -993,22 +1002,127 @@ class MongoClientInterface {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Execute the user-defined query in a stage then sort it for
|
||||||
|
* stateless paging. The output is stored in a temporary
|
||||||
|
* collection in a special namespace that will be periodically
|
||||||
|
* erased (e.g. once a day).
|
||||||
|
*
|
||||||
|
* All search queries are bounded by MAX_TIME_MS env (default is
|
||||||
|
* 5mn).
|
||||||
|
*/
|
||||||
|
doSearch(c, searchCollection, params, extension, searchOptions, log, cb) {
|
||||||
|
// use temp name to avoid races
|
||||||
|
const tempCollection =
|
||||||
|
TEMP_SEARCH_PREFIX +
|
||||||
|
crypto.randomBytes(16).toString('hex');
|
||||||
|
log.info('doSearch: launching aggregate', {
|
||||||
|
searchCollection,
|
||||||
|
});
|
||||||
|
const _cursor = c.aggregate([
|
||||||
|
{ $match: searchOptions }, // user query
|
||||||
|
{ $out: tempCollection }, // a job will clean it up
|
||||||
|
],
|
||||||
|
{
|
||||||
|
maxTimeMs: this.maxTimeMs, // always limit
|
||||||
|
allowDiskUse: true, // stage large queries on disk
|
||||||
|
},
|
||||||
|
null);
|
||||||
|
_cursor.toArray(err => {
|
||||||
|
if (err) {
|
||||||
|
log.error('doSearch: error', {
|
||||||
|
error: err.message,
|
||||||
|
});
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
// final rename
|
||||||
|
this.db.renameCollection(
|
||||||
|
tempCollection,
|
||||||
|
searchCollection,
|
||||||
|
{
|
||||||
|
dropTarget: true,
|
||||||
|
},
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
log.error('doSearch: renaming', {
|
||||||
|
error: err.message,
|
||||||
|
tempCollection,
|
||||||
|
searchCollection,
|
||||||
|
});
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
log.info('doSearch: aggregate done', {
|
||||||
|
searchCollection,
|
||||||
|
});
|
||||||
|
return this.internalListObject(
|
||||||
|
this.db.collection(searchCollection),
|
||||||
|
params, extension,
|
||||||
|
log, cb);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check if the used defined query has been cached otherwise
|
||||||
|
* launch the search
|
||||||
|
*/
|
||||||
|
prepareSearch(bucketName, params, extension, searchOptions, log, cb) {
|
||||||
|
const c = this.getCollection(bucketName);
|
||||||
|
// generate the search collection name
|
||||||
|
const searchCollection =
|
||||||
|
SEARCH_PREFIX +
|
||||||
|
crypto.createHash('md5').
|
||||||
|
update(JSON.stringify(searchOptions)).
|
||||||
|
digest('hex');
|
||||||
|
this.db.listCollections({
|
||||||
|
name: searchCollection,
|
||||||
|
}).toArray((err, items) => {
|
||||||
|
if (err) {
|
||||||
|
log.error('prepareSearch: listing collection', {
|
||||||
|
error: err.message,
|
||||||
|
});
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
if (items.length > 0) {
|
||||||
|
log.info('prepareSearch: using cache', {
|
||||||
|
searchCollection,
|
||||||
|
});
|
||||||
|
return this.internalListObject(
|
||||||
|
this.db.collection(searchCollection),
|
||||||
|
params, extension,
|
||||||
|
log, cb);
|
||||||
|
}
|
||||||
|
return this.doSearch(
|
||||||
|
c, searchCollection,
|
||||||
|
params, extension, searchOptions,
|
||||||
|
log, cb);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
listObject(bucketName, params, log, cb) {
|
listObject(bucketName, params, log, cb) {
|
||||||
const extName = params.listingType;
|
const extName = params.listingType;
|
||||||
const extension = new listAlgos[extName](params, log);
|
const extension = new listAlgos[extName](params, log);
|
||||||
const internalParams = extension.genMDParams();
|
const internalParams = extension.genMDParams();
|
||||||
internalParams.mongifiedSearch = params.mongifiedSearch;
|
if (params.mongifiedSearch) {
|
||||||
return this.internalListObject(bucketName, internalParams, extension,
|
return this.prepareSearch(
|
||||||
log, cb);
|
bucketName, internalParams, extension,
|
||||||
|
params.mongifiedSearch, log, cb);
|
||||||
|
}
|
||||||
|
return this.internalListObject(
|
||||||
|
this.getCollection(bucketName),
|
||||||
|
internalParams, extension,
|
||||||
|
log, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
listMultipartUploads(bucketName, params, log, cb) {
|
listMultipartUploads(bucketName, params, log, cb) {
|
||||||
const extName = params.listingType;
|
const extName = params.listingType;
|
||||||
const extension = new listAlgos[extName](params, log);
|
const extension = new listAlgos[extName](params, log);
|
||||||
const internalParams = extension.genMDParams();
|
const internalParams = extension.genMDParams();
|
||||||
internalParams.mongifiedSearch = params.mongifiedSearch;
|
return this.internalListObject(
|
||||||
return this.internalListObject(bucketName, internalParams, extension,
|
this.getCollection(bucketName),
|
||||||
log, cb);
|
internalParams, extension,
|
||||||
|
log, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
checkHealth(implName, log, cb) {
|
checkHealth(implName, log, cb) {
|
||||||
|
|
|
@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
|
||||||
const MongoUtils = require('./utils');
|
const MongoUtils = require('./utils');
|
||||||
|
|
||||||
class MongoReadStream extends Readable {
|
class MongoReadStream extends Readable {
|
||||||
constructor(c, options, searchOptions) {
|
constructor(c, options) {
|
||||||
super({
|
super({
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
highWaterMark: 0,
|
highWaterMark: 0,
|
||||||
|
@ -59,10 +59,6 @@ class MongoReadStream extends Readable {
|
||||||
delete query._id;
|
delete query._id;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (searchOptions) {
|
|
||||||
Object.assign(query, searchOptions);
|
|
||||||
}
|
|
||||||
|
|
||||||
this._cursor = c.find(query).sort({
|
this._cursor = c.find(query).sort({
|
||||||
_id: options.reverse ? -1 : 1,
|
_id: options.reverse ? -1 : 1,
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue