Compare commits
1 Commits
developmen
...
feature/ZE
Author | SHA1 | Date |
---|---|---|
vrancurel | 4bba3b817c |
|
@ -85,6 +85,7 @@ class MetadataWrapper {
|
||||||
replicaSetHosts: params.mongodb.replicaSetHosts,
|
replicaSetHosts: params.mongodb.replicaSetHosts,
|
||||||
writeConcern: params.mongodb.writeConcern,
|
writeConcern: params.mongodb.writeConcern,
|
||||||
replicaSet: params.mongodb.replicaSet,
|
replicaSet: params.mongodb.replicaSet,
|
||||||
|
enableSharding: params.mongodb.enableSharding,
|
||||||
readPreference: params.mongodb.readPreference,
|
readPreference: params.mongodb.readPreference,
|
||||||
database: params.mongodb.database,
|
database: params.mongodb.database,
|
||||||
replicationGroupId: params.replicationGroupId,
|
replicationGroupId: params.replicationGroupId,
|
||||||
|
|
|
@ -17,10 +17,13 @@ class LogConsumer {
|
||||||
* @param {string} logger - logger
|
* @param {string} logger - logger
|
||||||
*/
|
*/
|
||||||
constructor(mongoConfig, logger) {
|
constructor(mongoConfig, logger) {
|
||||||
const { authCredentials, replicaSetHosts, replicaSet, database } = mongoConfig;
|
const { authCredentials, replicaSetHosts, replicaSet, database,
|
||||||
|
enableSharding } = mongoConfig;
|
||||||
const cred = MongoUtils.credPrefix(authCredentials);
|
const cred = MongoUtils.credPrefix(authCredentials);
|
||||||
this._mongoUrl = `mongodb://${cred}${replicaSetHosts}/`;
|
this._mongoUrl = `mongodb://${cred}${replicaSetHosts}/`;
|
||||||
this._replicaSet = replicaSet;
|
this._replicaSet = replicaSet;
|
||||||
|
this._enableSharding =
|
||||||
|
enableSharding !== undefined ? enableSharding : false;
|
||||||
this._logger = logger;
|
this._logger = logger;
|
||||||
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
|
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
|
||||||
// oplog collection
|
// oplog collection
|
||||||
|
@ -36,10 +39,15 @@ class LogConsumer {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
connectMongo(done) {
|
connectMongo(done) {
|
||||||
MongoClient.connect(this._mongoUrl, {
|
const options = {
|
||||||
replicaSet: this._replicaSet,
|
|
||||||
useNewUrlParser: true,
|
useNewUrlParser: true,
|
||||||
},
|
};
|
||||||
|
if (!this._enableSharding) {
|
||||||
|
// XXX real fix is with change streams
|
||||||
|
options.replicaSet = this._replicaSet;
|
||||||
|
}
|
||||||
|
MongoClient.connect(this._mongoUrl,
|
||||||
|
options,
|
||||||
(err, client) => {
|
(err, client) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
this._logger.error('Unable to connect to MongoDB',
|
this._logger.error('Unable to connect to MongoDB',
|
||||||
|
|
|
@ -91,11 +91,15 @@ function generatePHDVersion(versionId) {
|
||||||
class MongoClientInterface {
|
class MongoClientInterface {
|
||||||
constructor(params) {
|
constructor(params) {
|
||||||
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
|
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
|
||||||
database, logger, replicationGroupId, authCredentials,
|
database, enableSharding, logger,
|
||||||
|
replicationGroupId, authCredentials,
|
||||||
isLocationTransient } = params;
|
isLocationTransient } = params;
|
||||||
const cred = MongoUtils.credPrefix(authCredentials);
|
const cred = MongoUtils.credPrefix(authCredentials);
|
||||||
|
this._enableSharding =
|
||||||
|
enableSharding !== undefined ? enableSharding : false;
|
||||||
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
|
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
|
||||||
`?w=${writeConcern}&replicaSet=${replicaSet}` +
|
`?w=${writeConcern}` +
|
||||||
|
`${!this._enableSharding && !!replicaSet ? `&replicaSet=${replicaSet}` : ''}` +
|
||||||
`&readPreference=${readPreference}`;
|
`&readPreference=${readPreference}`;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.client = null;
|
this.client = null;
|
||||||
|
@ -142,6 +146,7 @@ class MongoClientInterface {
|
||||||
this.db = client.db(this.database, {
|
this.db = client.db(this.database, {
|
||||||
ignoreUndefined: true,
|
ignoreUndefined: true,
|
||||||
});
|
});
|
||||||
|
this.adminDb = client.db('admin');
|
||||||
return this.usersBucketHack(cb);
|
return this.usersBucketHack(cb);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -219,6 +224,21 @@ class MongoClientInterface {
|
||||||
{ error: err.message });
|
{ error: err.message });
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
|
if (this._enableSharding) {
|
||||||
|
const cmd = {
|
||||||
|
shardCollection: `${this.database}.${bucketName}`,
|
||||||
|
key: { _id: 1 },
|
||||||
|
};
|
||||||
|
return this.adminDb.command(cmd, {}, err => {
|
||||||
|
if (err) {
|
||||||
|
log.error(
|
||||||
|
'createBucket: enabling sharding',
|
||||||
|
{ error: err.message });
|
||||||
|
return cb(errors.InternalError);
|
||||||
|
}
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
return cb();
|
return cb();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue