Compare commits
6 Commits
160fe96b18
...
aa7abc0cae
Author | SHA1 | Date |
---|---|---|
philipyoo | aa7abc0cae | |
philipyoo | 52fbdadf5a | |
philipyoo | 9998d8d332 | |
philipyoo | 5f5cbb90f9 | |
philipyoo | 9b652bc2eb | |
philipyoo | 807349ad4b |
|
@ -319,6 +319,14 @@ class MetadataWrapper {
|
|||
return this.client.getUUID(log, cb);
|
||||
}
|
||||
|
||||
getReplicationGroupId(log, cb) {
|
||||
if (!this.client.getReplicationGroupID) {
|
||||
log.debug('returning default replication group id');
|
||||
return cb(null, '');
|
||||
}
|
||||
return this.client.getReplicationGroupID(log, cb);
|
||||
}
|
||||
|
||||
getDiskUsage(log, cb) {
|
||||
if (!this.client.getDiskUsage) {
|
||||
log.debug('returning empty disk usage as fallback', {
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
* We use proper atomic operations when needed.
|
||||
*/
|
||||
const async = require('async');
|
||||
const crypto = require('crypto');
|
||||
|
||||
const { EventEmitter } = require('events');
|
||||
const constants = require('../../../constants');
|
||||
|
@ -33,6 +34,7 @@ const USERSBUCKET = '__usersbucket';
|
|||
const METASTORE = '__metastore';
|
||||
const INFOSTORE = '__infostore';
|
||||
const __UUID = 'uuid';
|
||||
const __REP_GROUP_ID = 'repGroupID';
|
||||
const PENSIEVE = 'PENSIEVE';
|
||||
const ASYNC_REPAIR_TIMEOUT = 15000;
|
||||
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
|
||||
|
@ -129,7 +131,16 @@ class MongoClientInterface {
|
|||
this.db = client.db(this.database, {
|
||||
ignoreUndefined: true,
|
||||
});
|
||||
return this.usersBucketHack(cb);
|
||||
return async.series([
|
||||
next => this.usersBucketHack(next),
|
||||
next => this._setupReplicationGroupID((err, repGroupId) => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
this.replicationGroupId = repGroupId;
|
||||
return next();
|
||||
}),
|
||||
], cb);
|
||||
});
|
||||
}
|
||||
usersBucketHack(cb) {
|
||||
|
@ -508,6 +519,53 @@ class MongoClientInterface {
|
|||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* In this case the caller provides a versionId. This function will
|
||||
* save the versionId object as is and will set PHD on master. We rely
|
||||
* on the natural ordering of versions.
|
||||
* This is specifically used by backbeat Ingestion process.
|
||||
*/
|
||||
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
|
||||
// eslint-disable-next-line
|
||||
objVal.versionId = params.versionId;
|
||||
const vObjName = formatVersionKey(objName, params.versionId);
|
||||
const mst = generatePHDVersion(params.versionId);
|
||||
return c.bulkWrite([{
|
||||
updateOne: {
|
||||
filter: {
|
||||
_id: vObjName,
|
||||
},
|
||||
update: {
|
||||
$set: {
|
||||
_id: vObjName,
|
||||
value: objVal,
|
||||
},
|
||||
},
|
||||
upsert: true,
|
||||
},
|
||||
}, {
|
||||
updateOne: {
|
||||
filter: {
|
||||
_id: objName,
|
||||
},
|
||||
update: {
|
||||
_id: objName, value: mst,
|
||||
},
|
||||
upsert: true,
|
||||
},
|
||||
}], {
|
||||
ordered: 1,
|
||||
}, err => {
|
||||
if (err) {
|
||||
log.error('putObjectVerCase4: error putting object version', {
|
||||
error: err.message,
|
||||
});
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null, `{"versionId": "${objVal.versionId}"}`);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Put object when versioning is not enabled
|
||||
*/
|
||||
|
@ -539,9 +597,12 @@ class MongoClientInterface {
|
|||
} else if (params && params.versionId === '') {
|
||||
return this.putObjectVerCase2(c, bucketName, objName, objVal,
|
||||
params, log, cb);
|
||||
} else if (params && params.versionId) {
|
||||
} else if (params && params.versionId && !params.usePHD) {
|
||||
return this.putObjectVerCase3(c, bucketName, objName, objVal,
|
||||
params, log, cb);
|
||||
} else if (params && params.versionId && params.usePHD) {
|
||||
return this.putObjectVerCase4(c, bucketName, objName, objVal,
|
||||
params, log, cb);
|
||||
}
|
||||
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
||||
params, log, cb);
|
||||
|
@ -937,18 +998,18 @@ class MongoClientInterface {
|
|||
});
|
||||
}
|
||||
|
||||
writeUUIDIfNotExists(uuid, log, cb) {
|
||||
writeToInfostoreIfNotExists(id, value, log, cb) {
|
||||
const i = this.getCollection(INFOSTORE);
|
||||
i.insert({
|
||||
_id: __UUID,
|
||||
value: uuid,
|
||||
_id: id,
|
||||
value,
|
||||
}, {}, err => {
|
||||
if (err) {
|
||||
if (err.code === 11000) {
|
||||
// duplicate key error
|
||||
return cb(errors.KeyAlreadyExists);
|
||||
}
|
||||
log.error('writeUUIDIfNotExists: error writing UUID',
|
||||
log.error(`writeToInfostoreIfNotExists: error writing to ${id}`,
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
|
@ -963,7 +1024,7 @@ class MongoClientInterface {
|
|||
*/
|
||||
getUUID(log, cb) {
|
||||
const uuid = initialInstanceID || Uuid.v4();
|
||||
this.writeUUIDIfNotExists(uuid, log, err => {
|
||||
this.writeToInfostoreIfNotExists(__UUID, uuid, log, err => {
|
||||
if (err) {
|
||||
if (err === errors.InternalError) {
|
||||
log.error('getUUID: error getting UUID',
|
||||
|
@ -976,6 +1037,47 @@ class MongoClientInterface {
|
|||
});
|
||||
}
|
||||
|
||||
_setupReplicationGroupID(cb) {
|
||||
// random 7 characters
|
||||
const repGroupId = crypto.randomBytes(4).toString('hex').slice(1);
|
||||
this.writeToInfostoreIfNotExists(__REP_GROUP_ID, repGroupId,
|
||||
this.logger, err => {
|
||||
if (err) {
|
||||
if (err === errors.InternalError) {
|
||||
this.logger.error('_setupReplicationGroupID: error ' +
|
||||
'getting replication group id', {
|
||||
error: err.message,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return this.getReplicationGroupID(this.logger, cb);
|
||||
}
|
||||
// need to save to instance
|
||||
this.replicationGroupId = repGroupId;
|
||||
return cb(null, repGroupId);
|
||||
});
|
||||
}
|
||||
|
||||
getReplicationGroupID(log, cb) {
|
||||
const i = this.getCollection(INFOSTORE);
|
||||
i.findOne({
|
||||
_id: __REP_GROUP_ID,
|
||||
}, {}, (err, doc) => {
|
||||
if (err) {
|
||||
log.error('getReplicationGroupID: error reading ' +
|
||||
'replication group id', {
|
||||
error: err.message,
|
||||
});
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (!doc) {
|
||||
return cb(errors.NoSuchKey);
|
||||
// return cb(null, this.replicationGroupId);
|
||||
}
|
||||
return cb(null, doc.value);
|
||||
});
|
||||
}
|
||||
|
||||
getDiskUsage(cb) {
|
||||
// FIXME: for basic one server deployment the infrastructure
|
||||
// configurator shall set a path to the actual MongoDB volume.
|
||||
|
|
|
@ -56,6 +56,19 @@ function getInfVid(replicationGroupId) {
|
|||
padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Identify if decoded version id contains given replication group id
|
||||
* @param {string} version - decoded version id
|
||||
* @param {string} replicationGroupId - replication group id to check
|
||||
* @return {boolean} true if version id contains replication group id
|
||||
*/
|
||||
function hasRepGroupId(version, replicationGroupId) {
|
||||
const rgStartIndex = LENGTH_TS + LENGTH_SEQ;
|
||||
const rgEndIndex = rgStartIndex + LENGTH_RG;
|
||||
return version.substring(rgStartIndex, rgEndIndex) === replicationGroupId;
|
||||
}
|
||||
|
||||
|
||||
// internal state of the module
|
||||
let lastTimestamp = 0; // epoch of the last versionId
|
||||
let lastSeq = 0; // sequential number of the last versionId
|
||||
|
@ -152,4 +165,10 @@ function decode(str) {
|
|||
}
|
||||
}
|
||||
|
||||
module.exports = { generateVersionId, getInfVid, encode, decode };
|
||||
module.exports = {
|
||||
generateVersionId,
|
||||
getInfVid,
|
||||
encode,
|
||||
decode,
|
||||
hasRepGroupId,
|
||||
};
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const VID = require('../../../lib/versioning/VersionID.js');
|
||||
const assert = require('assert');
|
||||
const crypto = require('crypto');
|
||||
|
||||
function randkey(length) {
|
||||
let key = '';
|
||||
|
@ -9,6 +10,7 @@ function randkey(length) {
|
|||
return key;
|
||||
}
|
||||
|
||||
describe('VersionID', () => {
|
||||
describe('test generating versionIds', () => {
|
||||
const count = 1000;
|
||||
const vids = Array(count).fill(null);
|
||||
|
@ -37,3 +39,30 @@ describe('test generating versionIds', () => {
|
|||
assert.deepStrictEqual(vids, decoded);
|
||||
});
|
||||
});
|
||||
|
||||
describe('::hasRepGroupId', () => {
|
||||
it('should find if version id has replication group id', () => {
|
||||
const repGroupId1 = 'ZENKO ';
|
||||
const vid1 = VID.generateVersionId(randkey(15), repGroupId1);
|
||||
// generate random 7 characters
|
||||
const repGroupId2 = crypto.randomBytes(4).toString('hex').slice(1);
|
||||
const vid2 = VID.generateVersionId(randkey(15), repGroupId2);
|
||||
assert.strictEqual(VID.hasRepGroupId(vid1, 'fAlSe'), false);
|
||||
assert.strictEqual(VID.hasRepGroupId(vid2, 'nope012'), false);
|
||||
assert.strictEqual(VID.hasRepGroupId(vid1, repGroupId1), true);
|
||||
assert.strictEqual(VID.hasRepGroupId(vid2, repGroupId2), true);
|
||||
|
||||
// to compare against production version ids with existing default
|
||||
// replication group id
|
||||
const repGroupIdDefault = 'RG001 ';
|
||||
[
|
||||
'98445675956517999999RG001 14.124.3',
|
||||
'99999999999999999999RG001 ',
|
||||
'98445608101957999999RG001 24.14',
|
||||
].forEach(v => {
|
||||
assert.strictEqual(
|
||||
VID.hasRepGroupId(v, repGroupIdDefault), true);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue