Compare commits
6 Commits
160fe96b18
...
aa7abc0cae
Author | SHA1 | Date |
---|---|---|
philipyoo | aa7abc0cae | |
philipyoo | 52fbdadf5a | |
philipyoo | 9998d8d332 | |
philipyoo | 5f5cbb90f9 | |
philipyoo | 9b652bc2eb | |
philipyoo | 807349ad4b |
|
@ -319,6 +319,14 @@ class MetadataWrapper {
|
||||||
return this.client.getUUID(log, cb);
|
return this.client.getUUID(log, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getReplicationGroupId(log, cb) {
|
||||||
|
if (!this.client.getReplicationGroupID) {
|
||||||
|
log.debug('returning default replication group id');
|
||||||
|
return cb(null, '');
|
||||||
|
}
|
||||||
|
return this.client.getReplicationGroupID(log, cb);
|
||||||
|
}
|
||||||
|
|
||||||
getDiskUsage(log, cb) {
|
getDiskUsage(log, cb) {
|
||||||
if (!this.client.getDiskUsage) {
|
if (!this.client.getDiskUsage) {
|
||||||
log.debug('returning empty disk usage as fallback', {
|
log.debug('returning empty disk usage as fallback', {
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
* We use proper atomic operations when needed.
|
* We use proper atomic operations when needed.
|
||||||
*/
|
*/
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
|
const crypto = require('crypto');
|
||||||
|
|
||||||
const { EventEmitter } = require('events');
|
const { EventEmitter } = require('events');
|
||||||
const constants = require('../../../constants');
|
const constants = require('../../../constants');
|
||||||
|
@ -33,6 +34,7 @@ const USERSBUCKET = '__usersbucket';
|
||||||
const METASTORE = '__metastore';
|
const METASTORE = '__metastore';
|
||||||
const INFOSTORE = '__infostore';
|
const INFOSTORE = '__infostore';
|
||||||
const __UUID = 'uuid';
|
const __UUID = 'uuid';
|
||||||
|
const __REP_GROUP_ID = 'repGroupID';
|
||||||
const PENSIEVE = 'PENSIEVE';
|
const PENSIEVE = 'PENSIEVE';
|
||||||
const ASYNC_REPAIR_TIMEOUT = 15000;
|
const ASYNC_REPAIR_TIMEOUT = 15000;
|
||||||
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
|
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
|
||||||
|
@ -129,7 +131,16 @@ class MongoClientInterface {
|
||||||
this.db = client.db(this.database, {
|
this.db = client.db(this.database, {
|
||||||
ignoreUndefined: true,
|
ignoreUndefined: true,
|
||||||
});
|
});
|
||||||
return this.usersBucketHack(cb);
|
return async.series([
|
||||||
|
next => this.usersBucketHack(next),
|
||||||
|
next => this._setupReplicationGroupID((err, repGroupId) => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
this.replicationGroupId = repGroupId;
|
||||||
|
return next();
|
||||||
|
}),
|
||||||
|
], cb);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
usersBucketHack(cb) {
|
usersBucketHack(cb) {
|
||||||
|
@ -508,6 +519,53 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In this case the caller provides a versionId. This function will
|
||||||
|
* save the versionId object as is and will set PHD on master. We rely
|
||||||
|
* on the natural ordering of versions.
|
||||||
|
* This is specifically used by backbeat Ingestion process.
|
||||||
|
*/
|
||||||
|
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
objVal.versionId = params.versionId;
|
||||||
|
const vObjName = formatVersionKey(objName, params.versionId);
|
||||||
|
const mst = generatePHDVersion(params.versionId);
|
||||||
|
return c.bulkWrite([{
|
||||||
|
updateOne: {
|
||||||
|
filter: {
|
||||||
|
_id: vObjName,
|
||||||
|
},
|
||||||
|
update: {
|
||||||
|
$set: {
|
||||||
|
_id: vObjName,
|
||||||
|
value: objVal,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
upsert: true,
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
updateOne: {
|
||||||
|
filter: {
|
||||||
|
_id: objName,
|
||||||
|
},
|
||||||
|
update: {
|
||||||
|
_id: objName, value: mst,
|
||||||
|
},
|
||||||
|
upsert: true,
|
||||||
|
},
|
||||||
|
}], {
|
||||||
|
ordered: 1,
|
||||||
|
}, err => {
|
||||||
|
if (err) {
|
||||||
|
log.error('putObjectVerCase4: error putting object version', {
|
||||||
|
error: err.message,
|
||||||
|
});
|
||||||
|
return cb(errors.InternalError);
|
||||||
|
}
|
||||||
|
return cb(null, `{"versionId": "${objVal.versionId}"}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Put object when versioning is not enabled
|
* Put object when versioning is not enabled
|
||||||
*/
|
*/
|
||||||
|
@ -539,9 +597,12 @@ class MongoClientInterface {
|
||||||
} else if (params && params.versionId === '') {
|
} else if (params && params.versionId === '') {
|
||||||
return this.putObjectVerCase2(c, bucketName, objName, objVal,
|
return this.putObjectVerCase2(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
} else if (params && params.versionId) {
|
} else if (params && params.versionId && !params.usePHD) {
|
||||||
return this.putObjectVerCase3(c, bucketName, objName, objVal,
|
return this.putObjectVerCase3(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
|
} else if (params && params.versionId && params.usePHD) {
|
||||||
|
return this.putObjectVerCase4(c, bucketName, objName, objVal,
|
||||||
|
params, log, cb);
|
||||||
}
|
}
|
||||||
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
|
@ -937,18 +998,18 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
writeUUIDIfNotExists(uuid, log, cb) {
|
writeToInfostoreIfNotExists(id, value, log, cb) {
|
||||||
const i = this.getCollection(INFOSTORE);
|
const i = this.getCollection(INFOSTORE);
|
||||||
i.insert({
|
i.insert({
|
||||||
_id: __UUID,
|
_id: id,
|
||||||
value: uuid,
|
value,
|
||||||
}, {}, err => {
|
}, {}, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err.code === 11000) {
|
if (err.code === 11000) {
|
||||||
// duplicate key error
|
// duplicate key error
|
||||||
return cb(errors.KeyAlreadyExists);
|
return cb(errors.KeyAlreadyExists);
|
||||||
}
|
}
|
||||||
log.error('writeUUIDIfNotExists: error writing UUID',
|
log.error(`writeToInfostoreIfNotExists: error writing to ${id}`,
|
||||||
{ error: err.message });
|
{ error: err.message });
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
|
@ -963,7 +1024,7 @@ class MongoClientInterface {
|
||||||
*/
|
*/
|
||||||
getUUID(log, cb) {
|
getUUID(log, cb) {
|
||||||
const uuid = initialInstanceID || Uuid.v4();
|
const uuid = initialInstanceID || Uuid.v4();
|
||||||
this.writeUUIDIfNotExists(uuid, log, err => {
|
this.writeToInfostoreIfNotExists(__UUID, uuid, log, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err === errors.InternalError) {
|
if (err === errors.InternalError) {
|
||||||
log.error('getUUID: error getting UUID',
|
log.error('getUUID: error getting UUID',
|
||||||
|
@ -976,6 +1037,47 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_setupReplicationGroupID(cb) {
|
||||||
|
// random 7 characters
|
||||||
|
const repGroupId = crypto.randomBytes(4).toString('hex').slice(1);
|
||||||
|
this.writeToInfostoreIfNotExists(__REP_GROUP_ID, repGroupId,
|
||||||
|
this.logger, err => {
|
||||||
|
if (err) {
|
||||||
|
if (err === errors.InternalError) {
|
||||||
|
this.logger.error('_setupReplicationGroupID: error ' +
|
||||||
|
'getting replication group id', {
|
||||||
|
error: err.message,
|
||||||
|
});
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return this.getReplicationGroupID(this.logger, cb);
|
||||||
|
}
|
||||||
|
// need to save to instance
|
||||||
|
this.replicationGroupId = repGroupId;
|
||||||
|
return cb(null, repGroupId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getReplicationGroupID(log, cb) {
|
||||||
|
const i = this.getCollection(INFOSTORE);
|
||||||
|
i.findOne({
|
||||||
|
_id: __REP_GROUP_ID,
|
||||||
|
}, {}, (err, doc) => {
|
||||||
|
if (err) {
|
||||||
|
log.error('getReplicationGroupID: error reading ' +
|
||||||
|
'replication group id', {
|
||||||
|
error: err.message,
|
||||||
|
});
|
||||||
|
return cb(errors.InternalError);
|
||||||
|
}
|
||||||
|
if (!doc) {
|
||||||
|
return cb(errors.NoSuchKey);
|
||||||
|
// return cb(null, this.replicationGroupId);
|
||||||
|
}
|
||||||
|
return cb(null, doc.value);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
getDiskUsage(cb) {
|
getDiskUsage(cb) {
|
||||||
// FIXME: for basic one server deployment the infrastructure
|
// FIXME: for basic one server deployment the infrastructure
|
||||||
// configurator shall set a path to the actual MongoDB volume.
|
// configurator shall set a path to the actual MongoDB volume.
|
||||||
|
|
|
@ -56,6 +56,19 @@ function getInfVid(replicationGroupId) {
|
||||||
padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId);
|
padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Identify if decoded version id contains given replication group id
|
||||||
|
* @param {string} version - decoded version id
|
||||||
|
* @param {string} replicationGroupId - replication group id to check
|
||||||
|
* @return {boolean} true if version id contains replication group id
|
||||||
|
*/
|
||||||
|
function hasRepGroupId(version, replicationGroupId) {
|
||||||
|
const rgStartIndex = LENGTH_TS + LENGTH_SEQ;
|
||||||
|
const rgEndIndex = rgStartIndex + LENGTH_RG;
|
||||||
|
return version.substring(rgStartIndex, rgEndIndex) === replicationGroupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// internal state of the module
|
// internal state of the module
|
||||||
let lastTimestamp = 0; // epoch of the last versionId
|
let lastTimestamp = 0; // epoch of the last versionId
|
||||||
let lastSeq = 0; // sequential number of the last versionId
|
let lastSeq = 0; // sequential number of the last versionId
|
||||||
|
@ -152,4 +165,10 @@ function decode(str) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = { generateVersionId, getInfVid, encode, decode };
|
module.exports = {
|
||||||
|
generateVersionId,
|
||||||
|
getInfVid,
|
||||||
|
encode,
|
||||||
|
decode,
|
||||||
|
hasRepGroupId,
|
||||||
|
};
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const VID = require('../../../lib/versioning/VersionID.js');
|
const VID = require('../../../lib/versioning/VersionID.js');
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
|
const crypto = require('crypto');
|
||||||
|
|
||||||
function randkey(length) {
|
function randkey(length) {
|
||||||
let key = '';
|
let key = '';
|
||||||
|
@ -9,7 +10,8 @@ function randkey(length) {
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
|
|
||||||
describe('test generating versionIds', () => {
|
describe('VersionID', () => {
|
||||||
|
describe('test generating versionIds', () => {
|
||||||
const count = 1000;
|
const count = 1000;
|
||||||
const vids = Array(count).fill(null);
|
const vids = Array(count).fill(null);
|
||||||
for (let i = 0; i < count; i++) {
|
for (let i = 0; i < count; i++) {
|
||||||
|
@ -36,4 +38,31 @@ describe('test generating versionIds', () => {
|
||||||
assert.strictEqual(vids.length, count);
|
assert.strictEqual(vids.length, count);
|
||||||
assert.deepStrictEqual(vids, decoded);
|
assert.deepStrictEqual(vids, decoded);
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('::hasRepGroupId', () => {
|
||||||
|
it('should find if version id has replication group id', () => {
|
||||||
|
const repGroupId1 = 'ZENKO ';
|
||||||
|
const vid1 = VID.generateVersionId(randkey(15), repGroupId1);
|
||||||
|
// generate random 7 characters
|
||||||
|
const repGroupId2 = crypto.randomBytes(4).toString('hex').slice(1);
|
||||||
|
const vid2 = VID.generateVersionId(randkey(15), repGroupId2);
|
||||||
|
assert.strictEqual(VID.hasRepGroupId(vid1, 'fAlSe'), false);
|
||||||
|
assert.strictEqual(VID.hasRepGroupId(vid2, 'nope012'), false);
|
||||||
|
assert.strictEqual(VID.hasRepGroupId(vid1, repGroupId1), true);
|
||||||
|
assert.strictEqual(VID.hasRepGroupId(vid2, repGroupId2), true);
|
||||||
|
|
||||||
|
// to compare against production version ids with existing default
|
||||||
|
// replication group id
|
||||||
|
const repGroupIdDefault = 'RG001 ';
|
||||||
|
[
|
||||||
|
'98445675956517999999RG001 14.124.3',
|
||||||
|
'99999999999999999999RG001 ',
|
||||||
|
'98445608101957999999RG001 24.14',
|
||||||
|
].forEach(v => {
|
||||||
|
assert.strictEqual(
|
||||||
|
VID.hasRepGroupId(v, repGroupIdDefault), true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue