Compare commits
No commits in common. "aa7abc0caeb23c28505514df3ae5ed16f46fb9ce" and "160fe96b187b00830d2bbb80ef719c8990971f98" have entirely different histories.
aa7abc0cae
...
160fe96b18
|
@ -319,14 +319,6 @@ class MetadataWrapper {
|
||||||
return this.client.getUUID(log, cb);
|
return this.client.getUUID(log, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
getReplicationGroupId(log, cb) {
|
|
||||||
if (!this.client.getReplicationGroupID) {
|
|
||||||
log.debug('returning default replication group id');
|
|
||||||
return cb(null, '');
|
|
||||||
}
|
|
||||||
return this.client.getReplicationGroupID(log, cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
getDiskUsage(log, cb) {
|
getDiskUsage(log, cb) {
|
||||||
if (!this.client.getDiskUsage) {
|
if (!this.client.getDiskUsage) {
|
||||||
log.debug('returning empty disk usage as fallback', {
|
log.debug('returning empty disk usage as fallback', {
|
||||||
|
|
|
@ -10,7 +10,6 @@
|
||||||
* We use proper atomic operations when needed.
|
* We use proper atomic operations when needed.
|
||||||
*/
|
*/
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const crypto = require('crypto');
|
|
||||||
|
|
||||||
const { EventEmitter } = require('events');
|
const { EventEmitter } = require('events');
|
||||||
const constants = require('../../../constants');
|
const constants = require('../../../constants');
|
||||||
|
@ -34,7 +33,6 @@ const USERSBUCKET = '__usersbucket';
|
||||||
const METASTORE = '__metastore';
|
const METASTORE = '__metastore';
|
||||||
const INFOSTORE = '__infostore';
|
const INFOSTORE = '__infostore';
|
||||||
const __UUID = 'uuid';
|
const __UUID = 'uuid';
|
||||||
const __REP_GROUP_ID = 'repGroupID';
|
|
||||||
const PENSIEVE = 'PENSIEVE';
|
const PENSIEVE = 'PENSIEVE';
|
||||||
const ASYNC_REPAIR_TIMEOUT = 15000;
|
const ASYNC_REPAIR_TIMEOUT = 15000;
|
||||||
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
|
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
|
||||||
|
@ -131,16 +129,7 @@ class MongoClientInterface {
|
||||||
this.db = client.db(this.database, {
|
this.db = client.db(this.database, {
|
||||||
ignoreUndefined: true,
|
ignoreUndefined: true,
|
||||||
});
|
});
|
||||||
return async.series([
|
return this.usersBucketHack(cb);
|
||||||
next => this.usersBucketHack(next),
|
|
||||||
next => this._setupReplicationGroupID((err, repGroupId) => {
|
|
||||||
if (err) {
|
|
||||||
return next(err);
|
|
||||||
}
|
|
||||||
this.replicationGroupId = repGroupId;
|
|
||||||
return next();
|
|
||||||
}),
|
|
||||||
], cb);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
usersBucketHack(cb) {
|
usersBucketHack(cb) {
|
||||||
|
@ -519,53 +508,6 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* In this case the caller provides a versionId. This function will
|
|
||||||
* save the versionId object as is and will set PHD on master. We rely
|
|
||||||
* on the natural ordering of versions.
|
|
||||||
* This is specifically used by backbeat Ingestion process.
|
|
||||||
*/
|
|
||||||
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
|
|
||||||
// eslint-disable-next-line
|
|
||||||
objVal.versionId = params.versionId;
|
|
||||||
const vObjName = formatVersionKey(objName, params.versionId);
|
|
||||||
const mst = generatePHDVersion(params.versionId);
|
|
||||||
return c.bulkWrite([{
|
|
||||||
updateOne: {
|
|
||||||
filter: {
|
|
||||||
_id: vObjName,
|
|
||||||
},
|
|
||||||
update: {
|
|
||||||
$set: {
|
|
||||||
_id: vObjName,
|
|
||||||
value: objVal,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
upsert: true,
|
|
||||||
},
|
|
||||||
}, {
|
|
||||||
updateOne: {
|
|
||||||
filter: {
|
|
||||||
_id: objName,
|
|
||||||
},
|
|
||||||
update: {
|
|
||||||
_id: objName, value: mst,
|
|
||||||
},
|
|
||||||
upsert: true,
|
|
||||||
},
|
|
||||||
}], {
|
|
||||||
ordered: 1,
|
|
||||||
}, err => {
|
|
||||||
if (err) {
|
|
||||||
log.error('putObjectVerCase4: error putting object version', {
|
|
||||||
error: err.message,
|
|
||||||
});
|
|
||||||
return cb(errors.InternalError);
|
|
||||||
}
|
|
||||||
return cb(null, `{"versionId": "${objVal.versionId}"}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Put object when versioning is not enabled
|
* Put object when versioning is not enabled
|
||||||
*/
|
*/
|
||||||
|
@ -597,12 +539,9 @@ class MongoClientInterface {
|
||||||
} else if (params && params.versionId === '') {
|
} else if (params && params.versionId === '') {
|
||||||
return this.putObjectVerCase2(c, bucketName, objName, objVal,
|
return this.putObjectVerCase2(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
} else if (params && params.versionId && !params.usePHD) {
|
} else if (params && params.versionId) {
|
||||||
return this.putObjectVerCase3(c, bucketName, objName, objVal,
|
return this.putObjectVerCase3(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
} else if (params && params.versionId && params.usePHD) {
|
|
||||||
return this.putObjectVerCase4(c, bucketName, objName, objVal,
|
|
||||||
params, log, cb);
|
|
||||||
}
|
}
|
||||||
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
|
@ -998,18 +937,18 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
writeToInfostoreIfNotExists(id, value, log, cb) {
|
writeUUIDIfNotExists(uuid, log, cb) {
|
||||||
const i = this.getCollection(INFOSTORE);
|
const i = this.getCollection(INFOSTORE);
|
||||||
i.insert({
|
i.insert({
|
||||||
_id: id,
|
_id: __UUID,
|
||||||
value,
|
value: uuid,
|
||||||
}, {}, err => {
|
}, {}, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err.code === 11000) {
|
if (err.code === 11000) {
|
||||||
// duplicate key error
|
// duplicate key error
|
||||||
return cb(errors.KeyAlreadyExists);
|
return cb(errors.KeyAlreadyExists);
|
||||||
}
|
}
|
||||||
log.error(`writeToInfostoreIfNotExists: error writing to ${id}`,
|
log.error('writeUUIDIfNotExists: error writing UUID',
|
||||||
{ error: err.message });
|
{ error: err.message });
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
|
@ -1024,7 +963,7 @@ class MongoClientInterface {
|
||||||
*/
|
*/
|
||||||
getUUID(log, cb) {
|
getUUID(log, cb) {
|
||||||
const uuid = initialInstanceID || Uuid.v4();
|
const uuid = initialInstanceID || Uuid.v4();
|
||||||
this.writeToInfostoreIfNotExists(__UUID, uuid, log, err => {
|
this.writeUUIDIfNotExists(uuid, log, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err === errors.InternalError) {
|
if (err === errors.InternalError) {
|
||||||
log.error('getUUID: error getting UUID',
|
log.error('getUUID: error getting UUID',
|
||||||
|
@ -1037,47 +976,6 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
_setupReplicationGroupID(cb) {
|
|
||||||
// random 7 characters
|
|
||||||
const repGroupId = crypto.randomBytes(4).toString('hex').slice(1);
|
|
||||||
this.writeToInfostoreIfNotExists(__REP_GROUP_ID, repGroupId,
|
|
||||||
this.logger, err => {
|
|
||||||
if (err) {
|
|
||||||
if (err === errors.InternalError) {
|
|
||||||
this.logger.error('_setupReplicationGroupID: error ' +
|
|
||||||
'getting replication group id', {
|
|
||||||
error: err.message,
|
|
||||||
});
|
|
||||||
return cb(err);
|
|
||||||
}
|
|
||||||
return this.getReplicationGroupID(this.logger, cb);
|
|
||||||
}
|
|
||||||
// need to save to instance
|
|
||||||
this.replicationGroupId = repGroupId;
|
|
||||||
return cb(null, repGroupId);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
getReplicationGroupID(log, cb) {
|
|
||||||
const i = this.getCollection(INFOSTORE);
|
|
||||||
i.findOne({
|
|
||||||
_id: __REP_GROUP_ID,
|
|
||||||
}, {}, (err, doc) => {
|
|
||||||
if (err) {
|
|
||||||
log.error('getReplicationGroupID: error reading ' +
|
|
||||||
'replication group id', {
|
|
||||||
error: err.message,
|
|
||||||
});
|
|
||||||
return cb(errors.InternalError);
|
|
||||||
}
|
|
||||||
if (!doc) {
|
|
||||||
return cb(errors.NoSuchKey);
|
|
||||||
// return cb(null, this.replicationGroupId);
|
|
||||||
}
|
|
||||||
return cb(null, doc.value);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
getDiskUsage(cb) {
|
getDiskUsage(cb) {
|
||||||
// FIXME: for basic one server deployment the infrastructure
|
// FIXME: for basic one server deployment the infrastructure
|
||||||
// configurator shall set a path to the actual MongoDB volume.
|
// configurator shall set a path to the actual MongoDB volume.
|
||||||
|
|
|
@ -56,19 +56,6 @@ function getInfVid(replicationGroupId) {
|
||||||
padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId);
|
padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Identify if decoded version id contains given replication group id
|
|
||||||
* @param {string} version - decoded version id
|
|
||||||
* @param {string} replicationGroupId - replication group id to check
|
|
||||||
* @return {boolean} true if version id contains replication group id
|
|
||||||
*/
|
|
||||||
function hasRepGroupId(version, replicationGroupId) {
|
|
||||||
const rgStartIndex = LENGTH_TS + LENGTH_SEQ;
|
|
||||||
const rgEndIndex = rgStartIndex + LENGTH_RG;
|
|
||||||
return version.substring(rgStartIndex, rgEndIndex) === replicationGroupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// internal state of the module
|
// internal state of the module
|
||||||
let lastTimestamp = 0; // epoch of the last versionId
|
let lastTimestamp = 0; // epoch of the last versionId
|
||||||
let lastSeq = 0; // sequential number of the last versionId
|
let lastSeq = 0; // sequential number of the last versionId
|
||||||
|
@ -165,10 +152,4 @@ function decode(str) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = { generateVersionId, getInfVid, encode, decode };
|
||||||
generateVersionId,
|
|
||||||
getInfVid,
|
|
||||||
encode,
|
|
||||||
decode,
|
|
||||||
hasRepGroupId,
|
|
||||||
};
|
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
const VID = require('../../../lib/versioning/VersionID.js');
|
const VID = require('../../../lib/versioning/VersionID.js');
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const crypto = require('crypto');
|
|
||||||
|
|
||||||
function randkey(length) {
|
function randkey(length) {
|
||||||
let key = '';
|
let key = '';
|
||||||
|
@ -10,7 +9,6 @@ function randkey(length) {
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
|
|
||||||
describe('VersionID', () => {
|
|
||||||
describe('test generating versionIds', () => {
|
describe('test generating versionIds', () => {
|
||||||
const count = 1000;
|
const count = 1000;
|
||||||
const vids = Array(count).fill(null);
|
const vids = Array(count).fill(null);
|
||||||
|
@ -39,30 +37,3 @@ describe('VersionID', () => {
|
||||||
assert.deepStrictEqual(vids, decoded);
|
assert.deepStrictEqual(vids, decoded);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('::hasRepGroupId', () => {
|
|
||||||
it('should find if version id has replication group id', () => {
|
|
||||||
const repGroupId1 = 'ZENKO ';
|
|
||||||
const vid1 = VID.generateVersionId(randkey(15), repGroupId1);
|
|
||||||
// generate random 7 characters
|
|
||||||
const repGroupId2 = crypto.randomBytes(4).toString('hex').slice(1);
|
|
||||||
const vid2 = VID.generateVersionId(randkey(15), repGroupId2);
|
|
||||||
assert.strictEqual(VID.hasRepGroupId(vid1, 'fAlSe'), false);
|
|
||||||
assert.strictEqual(VID.hasRepGroupId(vid2, 'nope012'), false);
|
|
||||||
assert.strictEqual(VID.hasRepGroupId(vid1, repGroupId1), true);
|
|
||||||
assert.strictEqual(VID.hasRepGroupId(vid2, repGroupId2), true);
|
|
||||||
|
|
||||||
// to compare against production version ids with existing default
|
|
||||||
// replication group id
|
|
||||||
const repGroupIdDefault = 'RG001 ';
|
|
||||||
[
|
|
||||||
'98445675956517999999RG001 14.124.3',
|
|
||||||
'99999999999999999999RG001 ',
|
|
||||||
'98445608101957999999RG001 24.14',
|
|
||||||
].forEach(v => {
|
|
||||||
assert.strictEqual(
|
|
||||||
VID.hasRepGroupId(v, repGroupIdDefault), true);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
Loading…
Reference in New Issue