Compare commits

...

1 Commits

Author SHA1 Message Date
vrancurel d8e51a66c1 bf: search with aggregate instead of find
The search was using find().sort() and was disrupting user defined
search queries and custom indexes. The sort() is needed to implement
a stateless paging system. The combo of user defined query and sort is
now implemented with a 2 stage aggregate on server side.
We always limit the execution time maxTimeMs to 5mn (tunable by an
environment variable).
The result is staged in a temporary bucket and cached for paging.
We rely on an external job to cleanup the searches (e.g. daily).
2020-07-20 14:37:57 -07:00
2 changed files with 125 additions and 15 deletions

View File

@ -10,6 +10,7 @@
* We use proper atomic operations when needed. * We use proper atomic operations when needed.
*/ */
const async = require('async'); const async = require('async');
const crypto = require('crypto');
const constants = require('../../../constants'); const constants = require('../../../constants');
@ -39,6 +40,10 @@ const CONNECT_TIMEOUT_MS = 5000;
// MongoDB default // MongoDB default
const SOCKET_TIMEOUT_MS = 360000; const SOCKET_TIMEOUT_MS = 360000;
const CONCURRENT_CURSORS = 10; const CONCURRENT_CURSORS = 10;
// Search
const MAX_TIME_MS = 300000;
const TEMP_SEARCH_PREFIX = '__temp_search';
const SEARCH_PREFIX = '__search';
const initialInstanceID = process.env.INITIAL_INSTANCE_ID; const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
@ -109,6 +114,11 @@ class MongoClientInterface {
!Number.isNaN(process.env.CONCURRENT_CURSORS)) !Number.isNaN(process.env.CONCURRENT_CURSORS))
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10) ? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
: CONCURRENT_CURSORS; : CONCURRENT_CURSORS;
this.maxTimeMs = (process.env.MAX_TIME_MS &&
!Number.isNaN(process.env.MAX_TIME_MS))
? Number.parseInt(process.env.MAX_TIME_MS, 10)
: MAX_TIME_MS;
} }
setup(cb) { setup(cb) {
@ -939,9 +949,8 @@ class MongoClientInterface {
params, log, cb); params, log, cb);
} }
internalListObject(bucketName, params, extension, log, cb) { internalListObject(c, params, extension, log, cb) {
const c = this.getCollection(bucketName); const stream = new MongoReadStream(c, params);
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
const skip = new Skip({ const skip = new Skip({
extension, extension,
gte: params.gte, gte: params.gte,
@ -963,7 +972,7 @@ class MongoClientInterface {
newParams.gte = range; newParams.gte = range;
// then continue listing the next key range // then continue listing the next key range
this.internalListObject(bucketName, newParams, extension, log, cb); this.internalListObject(c, newParams, extension, log, cb);
}); });
stream stream
@ -993,22 +1002,127 @@ class MongoClientInterface {
return undefined; return undefined;
} }
/*
* Execute the user-defined query in a stage then sort it for
* stateless paging. The output is stored in a temporary
* collection in a special namespace that will be periodically
* erased (e.g. once a day).
*
* All search queries are bounded by MAX_TIME_MS env (default is
* 5mn).
*/
doSearch(c, searchCollection, params, extension, searchOptions, log, cb) {
// use temp name to avoid races
const tempCollection =
TEMP_SEARCH_PREFIX +
crypto.randomBytes(16).toString('hex');
log.info('doSearch: launching aggregate', {
searchCollection,
});
const _cursor = c.aggregate([
{ $match: searchOptions }, // user query
{ $out: tempCollection }, // a job will clean it up
],
{
maxTimeMs: this.maxTimeMs, // always limit
allowDiskUse: true, // stage large queries on disk
},
null);
_cursor.toArray(err => {
if (err) {
log.error('doSearch: error', {
error: err.message,
});
return cb(err);
}
// final rename
this.db.renameCollection(
tempCollection,
searchCollection,
{
dropTarget: true,
},
err => {
if (err) {
log.error('doSearch: renaming', {
error: err.message,
tempCollection,
searchCollection,
});
return cb(err);
}
log.info('doSearch: aggregate done', {
searchCollection,
});
return this.internalListObject(
this.db.collection(searchCollection),
params, extension,
log, cb);
});
return undefined;
});
}
/*
* Check if the used defined query has been cached otherwise
* launch the search
*/
prepareSearch(bucketName, params, extension, searchOptions, log, cb) {
const c = this.getCollection(bucketName);
// generate the search collection name
const searchCollection =
SEARCH_PREFIX +
crypto.createHash('md5').
update(JSON.stringify(searchOptions)).
digest('hex');
this.db.listCollections({
name: searchCollection,
}).toArray((err, items) => {
if (err) {
log.error('prepareSearch: listing collection', {
error: err.message,
});
return cb(err);
}
if (items.length > 0) {
log.info('prepareSearch: using cache', {
searchCollection,
});
return this.internalListObject(
this.db.collection(searchCollection),
params, extension,
log, cb);
}
return this.doSearch(
c, searchCollection,
params, extension, searchOptions,
log, cb);
});
}
listObject(bucketName, params, log, cb) { listObject(bucketName, params, log, cb) {
const extName = params.listingType; const extName = params.listingType;
const extension = new listAlgos[extName](params, log); const extension = new listAlgos[extName](params, log);
const internalParams = extension.genMDParams(); const internalParams = extension.genMDParams();
internalParams.mongifiedSearch = params.mongifiedSearch; if (params.mongifiedSearch) {
return this.internalListObject(bucketName, internalParams, extension, return this.prepareSearch(
log, cb); bucketName, internalParams, extension,
params.mongifiedSearch, log, cb);
}
return this.internalListObject(
this.getCollection(bucketName),
internalParams, extension,
log, cb);
} }
listMultipartUploads(bucketName, params, log, cb) { listMultipartUploads(bucketName, params, log, cb) {
const extName = params.listingType; const extName = params.listingType;
const extension = new listAlgos[extName](params, log); const extension = new listAlgos[extName](params, log);
const internalParams = extension.genMDParams(); const internalParams = extension.genMDParams();
internalParams.mongifiedSearch = params.mongifiedSearch; return this.internalListObject(
return this.internalListObject(bucketName, internalParams, extension, this.getCollection(bucketName),
log, cb); internalParams, extension,
log, cb);
} }
checkHealth(implName, log, cb) { checkHealth(implName, log, cb) {

View File

@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
const MongoUtils = require('./utils'); const MongoUtils = require('./utils');
class MongoReadStream extends Readable { class MongoReadStream extends Readable {
constructor(c, options, searchOptions) { constructor(c, options) {
super({ super({
objectMode: true, objectMode: true,
highWaterMark: 0, highWaterMark: 0,
@ -59,10 +59,6 @@ class MongoReadStream extends Readable {
delete query._id; delete query._id;
} }
if (searchOptions) {
Object.assign(query, searchOptions);
}
this._cursor = c.find(query).sort({ this._cursor = c.find(query).sort({
_id: options.reverse ? -1 : 1, _id: options.reverse ? -1 : 1,
}); });