Compare commits

...

1 Commits

Author SHA1 Message Date
vrancurel 4bba3b817c ft: ZENKO-2968 enable sharding on bucket creation 2020-11-18 15:03:44 -08:00
3 changed files with 35 additions and 6 deletions

View File

@ -85,6 +85,7 @@ class MetadataWrapper {
replicaSetHosts: params.mongodb.replicaSetHosts, replicaSetHosts: params.mongodb.replicaSetHosts,
writeConcern: params.mongodb.writeConcern, writeConcern: params.mongodb.writeConcern,
replicaSet: params.mongodb.replicaSet, replicaSet: params.mongodb.replicaSet,
enableSharding: params.mongodb.enableSharding,
readPreference: params.mongodb.readPreference, readPreference: params.mongodb.readPreference,
database: params.mongodb.database, database: params.mongodb.database,
replicationGroupId: params.replicationGroupId, replicationGroupId: params.replicationGroupId,

View File

@ -17,10 +17,13 @@ class LogConsumer {
* @param {string} logger - logger * @param {string} logger - logger
*/ */
constructor(mongoConfig, logger) { constructor(mongoConfig, logger) {
const { authCredentials, replicaSetHosts, replicaSet, database } = mongoConfig; const { authCredentials, replicaSetHosts, replicaSet, database,
enableSharding } = mongoConfig;
const cred = MongoUtils.credPrefix(authCredentials); const cred = MongoUtils.credPrefix(authCredentials);
this._mongoUrl = `mongodb://${cred}${replicaSetHosts}/`; this._mongoUrl = `mongodb://${cred}${replicaSetHosts}/`;
this._replicaSet = replicaSet; this._replicaSet = replicaSet;
this._enableSharding =
enableSharding !== undefined ? enableSharding : false;
this._logger = logger; this._logger = logger;
this._oplogNsRegExp = new RegExp(`^${database}\\.`); this._oplogNsRegExp = new RegExp(`^${database}\\.`);
// oplog collection // oplog collection
@ -36,10 +39,15 @@ class LogConsumer {
* @return {undefined} * @return {undefined}
*/ */
connectMongo(done) { connectMongo(done) {
MongoClient.connect(this._mongoUrl, { const options = {
replicaSet: this._replicaSet,
useNewUrlParser: true, useNewUrlParser: true,
}, };
if (!this._enableSharding) {
// XXX real fix is with change streams
options.replicaSet = this._replicaSet;
}
MongoClient.connect(this._mongoUrl,
options,
(err, client) => { (err, client) => {
if (err) { if (err) {
this._logger.error('Unable to connect to MongoDB', this._logger.error('Unable to connect to MongoDB',

View File

@ -91,11 +91,15 @@ function generatePHDVersion(versionId) {
class MongoClientInterface { class MongoClientInterface {
constructor(params) { constructor(params) {
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path, const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
database, logger, replicationGroupId, authCredentials, database, enableSharding, logger,
replicationGroupId, authCredentials,
isLocationTransient } = params; isLocationTransient } = params;
const cred = MongoUtils.credPrefix(authCredentials); const cred = MongoUtils.credPrefix(authCredentials);
this._enableSharding =
enableSharding !== undefined ? enableSharding : false;
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` + this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
`?w=${writeConcern}&replicaSet=${replicaSet}` + `?w=${writeConcern}` +
`${!this._enableSharding && !!replicaSet ? `&replicaSet=${replicaSet}` : ''}` +
`&readPreference=${readPreference}`; `&readPreference=${readPreference}`;
this.logger = logger; this.logger = logger;
this.client = null; this.client = null;
@ -142,6 +146,7 @@ class MongoClientInterface {
this.db = client.db(this.database, { this.db = client.db(this.database, {
ignoreUndefined: true, ignoreUndefined: true,
}); });
this.adminDb = client.db('admin');
return this.usersBucketHack(cb); return this.usersBucketHack(cb);
}); });
} }
@ -219,6 +224,21 @@ class MongoClientInterface {
{ error: err.message }); { error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (this._enableSharding) {
const cmd = {
shardCollection: `${this.database}.${bucketName}`,
key: { _id: 1 },
};
return this.adminDb.command(cmd, {}, err => {
if (err) {
log.error(
'createBucket: enabling sharding',
{ error: err.message });
return cb(errors.InternalError);
}
return cb();
});
}
return cb(); return cb();
}); });
} }