Compare commits

..

No commits in common. "aa7abc0caeb23c28505514df3ae5ed16f46fb9ce" and "160fe96b187b00830d2bbb80ef719c8990971f98" have entirely different histories.

4 changed files with 29 additions and 187 deletions

View File

@ -319,14 +319,6 @@ class MetadataWrapper {
return this.client.getUUID(log, cb);
}
getReplicationGroupId(log, cb) {
if (!this.client.getReplicationGroupID) {
log.debug('returning default replication group id');
return cb(null, '');
}
return this.client.getReplicationGroupID(log, cb);
}
getDiskUsage(log, cb) {
if (!this.client.getDiskUsage) {
log.debug('returning empty disk usage as fallback', {

View File

@ -10,7 +10,6 @@
* We use proper atomic operations when needed.
*/
const async = require('async');
const crypto = require('crypto');
const { EventEmitter } = require('events');
const constants = require('../../../constants');
@ -34,7 +33,6 @@ const USERSBUCKET = '__usersbucket';
const METASTORE = '__metastore';
const INFOSTORE = '__infostore';
const __UUID = 'uuid';
const __REP_GROUP_ID = 'repGroupID';
const PENSIEVE = 'PENSIEVE';
const ASYNC_REPAIR_TIMEOUT = 15000;
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
@ -131,16 +129,7 @@ class MongoClientInterface {
this.db = client.db(this.database, {
ignoreUndefined: true,
});
return async.series([
next => this.usersBucketHack(next),
next => this._setupReplicationGroupID((err, repGroupId) => {
if (err) {
return next(err);
}
this.replicationGroupId = repGroupId;
return next();
}),
], cb);
return this.usersBucketHack(cb);
});
}
usersBucketHack(cb) {
@ -519,53 +508,6 @@ class MongoClientInterface {
});
}
/*
* In this case the caller provides a versionId. This function will
* save the versionId object as is and will set PHD on master. We rely
* on the natural ordering of versions.
* This is specifically used by backbeat Ingestion process.
*/
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
// eslint-disable-next-line
objVal.versionId = params.versionId;
const vObjName = formatVersionKey(objName, params.versionId);
const mst = generatePHDVersion(params.versionId);
return c.bulkWrite([{
updateOne: {
filter: {
_id: vObjName,
},
update: {
$set: {
_id: vObjName,
value: objVal,
},
},
upsert: true,
},
}, {
updateOne: {
filter: {
_id: objName,
},
update: {
_id: objName, value: mst,
},
upsert: true,
},
}], {
ordered: 1,
}, err => {
if (err) {
log.error('putObjectVerCase4: error putting object version', {
error: err.message,
});
return cb(errors.InternalError);
}
return cb(null, `{"versionId": "${objVal.versionId}"}`);
});
}
/*
* Put object when versioning is not enabled
*/
@ -597,12 +539,9 @@ class MongoClientInterface {
} else if (params && params.versionId === '') {
return this.putObjectVerCase2(c, bucketName, objName, objVal,
params, log, cb);
} else if (params && params.versionId && !params.usePHD) {
} else if (params && params.versionId) {
return this.putObjectVerCase3(c, bucketName, objName, objVal,
params, log, cb);
} else if (params && params.versionId && params.usePHD) {
return this.putObjectVerCase4(c, bucketName, objName, objVal,
params, log, cb);
}
return this.putObjectNoVer(c, bucketName, objName, objVal,
params, log, cb);
@ -998,18 +937,18 @@ class MongoClientInterface {
});
}
writeToInfostoreIfNotExists(id, value, log, cb) {
writeUUIDIfNotExists(uuid, log, cb) {
const i = this.getCollection(INFOSTORE);
i.insert({
_id: id,
value,
_id: __UUID,
value: uuid,
}, {}, err => {
if (err) {
if (err.code === 11000) {
// duplicate key error
return cb(errors.KeyAlreadyExists);
}
log.error(`writeToInfostoreIfNotExists: error writing to ${id}`,
log.error('writeUUIDIfNotExists: error writing UUID',
{ error: err.message });
return cb(errors.InternalError);
}
@ -1024,7 +963,7 @@ class MongoClientInterface {
*/
getUUID(log, cb) {
const uuid = initialInstanceID || Uuid.v4();
this.writeToInfostoreIfNotExists(__UUID, uuid, log, err => {
this.writeUUIDIfNotExists(uuid, log, err => {
if (err) {
if (err === errors.InternalError) {
log.error('getUUID: error getting UUID',
@ -1037,47 +976,6 @@ class MongoClientInterface {
});
}
_setupReplicationGroupID(cb) {
// random 7 characters
const repGroupId = crypto.randomBytes(4).toString('hex').slice(1);
this.writeToInfostoreIfNotExists(__REP_GROUP_ID, repGroupId,
this.logger, err => {
if (err) {
if (err === errors.InternalError) {
this.logger.error('_setupReplicationGroupID: error ' +
'getting replication group id', {
error: err.message,
});
return cb(err);
}
return this.getReplicationGroupID(this.logger, cb);
}
// need to save to instance
this.replicationGroupId = repGroupId;
return cb(null, repGroupId);
});
}
getReplicationGroupID(log, cb) {
const i = this.getCollection(INFOSTORE);
i.findOne({
_id: __REP_GROUP_ID,
}, {}, (err, doc) => {
if (err) {
log.error('getReplicationGroupID: error reading ' +
'replication group id', {
error: err.message,
});
return cb(errors.InternalError);
}
if (!doc) {
return cb(errors.NoSuchKey);
// return cb(null, this.replicationGroupId);
}
return cb(null, doc.value);
});
}
getDiskUsage(cb) {
// FIXME: for basic one server deployment the infrastructure
// configurator shall set a path to the actual MongoDB volume.

View File

@ -56,19 +56,6 @@ function getInfVid(replicationGroupId) {
padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId);
}
/**
* Identify if decoded version id contains given replication group id
* @param {string} version - decoded version id
* @param {string} replicationGroupId - replication group id to check
* @return {boolean} true if version id contains replication group id
*/
function hasRepGroupId(version, replicationGroupId) {
const rgStartIndex = LENGTH_TS + LENGTH_SEQ;
const rgEndIndex = rgStartIndex + LENGTH_RG;
return version.substring(rgStartIndex, rgEndIndex) === replicationGroupId;
}
// internal state of the module
let lastTimestamp = 0; // epoch of the last versionId
let lastSeq = 0; // sequential number of the last versionId
@ -165,10 +152,4 @@ function decode(str) {
}
}
module.exports = {
generateVersionId,
getInfVid,
encode,
decode,
hasRepGroupId,
};
module.exports = { generateVersionId, getInfVid, encode, decode };

View File

@ -1,6 +1,5 @@
const VID = require('../../../lib/versioning/VersionID.js');
const assert = require('assert');
const crypto = require('crypto');
function randkey(length) {
let key = '';
@ -10,8 +9,7 @@ function randkey(length) {
return key;
}
describe('VersionID', () => {
describe('test generating versionIds', () => {
describe('test generating versionIds', () => {
const count = 1000;
const vids = Array(count).fill(null);
for (let i = 0; i < count; i++) {
@ -38,31 +36,4 @@ describe('VersionID', () => {
assert.strictEqual(vids.length, count);
assert.deepStrictEqual(vids, decoded);
});
});
describe('::hasRepGroupId', () => {
it('should find if version id has replication group id', () => {
const repGroupId1 = 'ZENKO ';
const vid1 = VID.generateVersionId(randkey(15), repGroupId1);
// generate random 7 characters
const repGroupId2 = crypto.randomBytes(4).toString('hex').slice(1);
const vid2 = VID.generateVersionId(randkey(15), repGroupId2);
assert.strictEqual(VID.hasRepGroupId(vid1, 'fAlSe'), false);
assert.strictEqual(VID.hasRepGroupId(vid2, 'nope012'), false);
assert.strictEqual(VID.hasRepGroupId(vid1, repGroupId1), true);
assert.strictEqual(VID.hasRepGroupId(vid2, repGroupId2), true);
// to compare against production version ids with existing default
// replication group id
const repGroupIdDefault = 'RG001 ';
[
'98445675956517999999RG001 14.124.3',
'99999999999999999999RG001 ',
'98445608101957999999RG001 24.14',
].forEach(v => {
assert.strictEqual(
VID.hasRepGroupId(v, repGroupIdDefault), true);
});
});
});
});