Compare commits
1 Commits
developmen
...
feature/ZE
Author | SHA1 | Date |
---|---|---|
vrancurel | 4bba3b817c |
|
@ -85,6 +85,7 @@ class MetadataWrapper {
|
|||
replicaSetHosts: params.mongodb.replicaSetHosts,
|
||||
writeConcern: params.mongodb.writeConcern,
|
||||
replicaSet: params.mongodb.replicaSet,
|
||||
enableSharding: params.mongodb.enableSharding,
|
||||
readPreference: params.mongodb.readPreference,
|
||||
database: params.mongodb.database,
|
||||
replicationGroupId: params.replicationGroupId,
|
||||
|
|
|
@ -17,10 +17,13 @@ class LogConsumer {
|
|||
* @param {string} logger - logger
|
||||
*/
|
||||
constructor(mongoConfig, logger) {
|
||||
const { authCredentials, replicaSetHosts, replicaSet, database } = mongoConfig;
|
||||
const { authCredentials, replicaSetHosts, replicaSet, database,
|
||||
enableSharding } = mongoConfig;
|
||||
const cred = MongoUtils.credPrefix(authCredentials);
|
||||
this._mongoUrl = `mongodb://${cred}${replicaSetHosts}/`;
|
||||
this._replicaSet = replicaSet;
|
||||
this._enableSharding =
|
||||
enableSharding !== undefined ? enableSharding : false;
|
||||
this._logger = logger;
|
||||
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
|
||||
// oplog collection
|
||||
|
@ -36,10 +39,15 @@ class LogConsumer {
|
|||
* @return {undefined}
|
||||
*/
|
||||
connectMongo(done) {
|
||||
MongoClient.connect(this._mongoUrl, {
|
||||
replicaSet: this._replicaSet,
|
||||
const options = {
|
||||
useNewUrlParser: true,
|
||||
},
|
||||
};
|
||||
if (!this._enableSharding) {
|
||||
// XXX real fix is with change streams
|
||||
options.replicaSet = this._replicaSet;
|
||||
}
|
||||
MongoClient.connect(this._mongoUrl,
|
||||
options,
|
||||
(err, client) => {
|
||||
if (err) {
|
||||
this._logger.error('Unable to connect to MongoDB',
|
||||
|
|
|
@ -91,11 +91,15 @@ function generatePHDVersion(versionId) {
|
|||
class MongoClientInterface {
|
||||
constructor(params) {
|
||||
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
|
||||
database, logger, replicationGroupId, authCredentials,
|
||||
database, enableSharding, logger,
|
||||
replicationGroupId, authCredentials,
|
||||
isLocationTransient } = params;
|
||||
const cred = MongoUtils.credPrefix(authCredentials);
|
||||
this._enableSharding =
|
||||
enableSharding !== undefined ? enableSharding : false;
|
||||
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
|
||||
`?w=${writeConcern}&replicaSet=${replicaSet}` +
|
||||
`?w=${writeConcern}` +
|
||||
`${!this._enableSharding && !!replicaSet ? `&replicaSet=${replicaSet}` : ''}` +
|
||||
`&readPreference=${readPreference}`;
|
||||
this.logger = logger;
|
||||
this.client = null;
|
||||
|
@ -142,6 +146,7 @@ class MongoClientInterface {
|
|||
this.db = client.db(this.database, {
|
||||
ignoreUndefined: true,
|
||||
});
|
||||
this.adminDb = client.db('admin');
|
||||
return this.usersBucketHack(cb);
|
||||
});
|
||||
}
|
||||
|
@ -219,6 +224,21 @@ class MongoClientInterface {
|
|||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (this._enableSharding) {
|
||||
const cmd = {
|
||||
shardCollection: `${this.database}.${bucketName}`,
|
||||
key: { _id: 1 },
|
||||
};
|
||||
return this.adminDb.command(cmd, {}, err => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'createBucket: enabling sharding',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue