Compare commits

...

6 Commits

Author SHA1 Message Date
philipyoo aa7abc0cae fix and test 2019-04-10 14:06:57 -07:00
philipyoo 52fbdadf5a ft: ZENKO-1585 check replication group id
Add helper method to check if a decoded version id contains
a given replication group id.
2019-04-10 14:06:44 -07:00
philipyoo 9998d8d332 update fix 2019-04-10 14:06:23 -07:00
philipyoo 5f5cbb90f9 test 2019-04-10 14:06:13 -07:00
philipyoo 9b652bc2eb ft: ZENKO-1585 editable replication group id 2019-04-10 14:06:03 -07:00
philipyoo 807349ad4b bf: ZENKO-1718 ingestion mongo putObjectVerCase4
Add a putObjectVerCase to MongoClientInterface for ingestion
use-cases. Remove management of master versions in the
ingestion process and rely on the natural ordering of
objects stored in mongo. Get and Delete operations will
rely on internal MongoClientInterface methods for performing
relevant operations. To do this, we set PHD on master for
each object version ingested.
2019-04-10 13:14:52 -07:00
4 changed files with 187 additions and 29 deletions

View File

@ -319,6 +319,14 @@ class MetadataWrapper {
return this.client.getUUID(log, cb); return this.client.getUUID(log, cb);
} }
getReplicationGroupId(log, cb) {
if (!this.client.getReplicationGroupID) {
log.debug('returning default replication group id');
return cb(null, '');
}
return this.client.getReplicationGroupID(log, cb);
}
getDiskUsage(log, cb) { getDiskUsage(log, cb) {
if (!this.client.getDiskUsage) { if (!this.client.getDiskUsage) {
log.debug('returning empty disk usage as fallback', { log.debug('returning empty disk usage as fallback', {

View File

@ -10,6 +10,7 @@
* We use proper atomic operations when needed. * We use proper atomic operations when needed.
*/ */
const async = require('async'); const async = require('async');
const crypto = require('crypto');
const { EventEmitter } = require('events'); const { EventEmitter } = require('events');
const constants = require('../../../constants'); const constants = require('../../../constants');
@ -33,6 +34,7 @@ const USERSBUCKET = '__usersbucket';
const METASTORE = '__metastore'; const METASTORE = '__metastore';
const INFOSTORE = '__infostore'; const INFOSTORE = '__infostore';
const __UUID = 'uuid'; const __UUID = 'uuid';
const __REP_GROUP_ID = 'repGroupID';
const PENSIEVE = 'PENSIEVE'; const PENSIEVE = 'PENSIEVE';
const ASYNC_REPAIR_TIMEOUT = 15000; const ASYNC_REPAIR_TIMEOUT = 15000;
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
@ -129,7 +131,16 @@ class MongoClientInterface {
this.db = client.db(this.database, { this.db = client.db(this.database, {
ignoreUndefined: true, ignoreUndefined: true,
}); });
return this.usersBucketHack(cb); return async.series([
next => this.usersBucketHack(next),
next => this._setupReplicationGroupID((err, repGroupId) => {
if (err) {
return next(err);
}
this.replicationGroupId = repGroupId;
return next();
}),
], cb);
}); });
} }
usersBucketHack(cb) { usersBucketHack(cb) {
@ -508,6 +519,53 @@ class MongoClientInterface {
}); });
} }
/*
* In this case the caller provides a versionId. This function will
* save the versionId object as is and will set PHD on master. We rely
* on the natural ordering of versions.
* This is specifically used by backbeat Ingestion process.
*/
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
// eslint-disable-next-line
objVal.versionId = params.versionId;
const vObjName = formatVersionKey(objName, params.versionId);
const mst = generatePHDVersion(params.versionId);
return c.bulkWrite([{
updateOne: {
filter: {
_id: vObjName,
},
update: {
$set: {
_id: vObjName,
value: objVal,
},
},
upsert: true,
},
}, {
updateOne: {
filter: {
_id: objName,
},
update: {
_id: objName, value: mst,
},
upsert: true,
},
}], {
ordered: 1,
}, err => {
if (err) {
log.error('putObjectVerCase4: error putting object version', {
error: err.message,
});
return cb(errors.InternalError);
}
return cb(null, `{"versionId": "${objVal.versionId}"}`);
});
}
/* /*
* Put object when versioning is not enabled * Put object when versioning is not enabled
*/ */
@ -539,9 +597,12 @@ class MongoClientInterface {
} else if (params && params.versionId === '') { } else if (params && params.versionId === '') {
return this.putObjectVerCase2(c, bucketName, objName, objVal, return this.putObjectVerCase2(c, bucketName, objName, objVal,
params, log, cb); params, log, cb);
} else if (params && params.versionId) { } else if (params && params.versionId && !params.usePHD) {
return this.putObjectVerCase3(c, bucketName, objName, objVal, return this.putObjectVerCase3(c, bucketName, objName, objVal,
params, log, cb); params, log, cb);
} else if (params && params.versionId && params.usePHD) {
return this.putObjectVerCase4(c, bucketName, objName, objVal,
params, log, cb);
} }
return this.putObjectNoVer(c, bucketName, objName, objVal, return this.putObjectNoVer(c, bucketName, objName, objVal,
params, log, cb); params, log, cb);
@ -937,18 +998,18 @@ class MongoClientInterface {
}); });
} }
writeUUIDIfNotExists(uuid, log, cb) { writeToInfostoreIfNotExists(id, value, log, cb) {
const i = this.getCollection(INFOSTORE); const i = this.getCollection(INFOSTORE);
i.insert({ i.insert({
_id: __UUID, _id: id,
value: uuid, value,
}, {}, err => { }, {}, err => {
if (err) { if (err) {
if (err.code === 11000) { if (err.code === 11000) {
// duplicate key error // duplicate key error
return cb(errors.KeyAlreadyExists); return cb(errors.KeyAlreadyExists);
} }
log.error('writeUUIDIfNotExists: error writing UUID', log.error(`writeToInfostoreIfNotExists: error writing to ${id}`,
{ error: err.message }); { error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
@ -963,7 +1024,7 @@ class MongoClientInterface {
*/ */
getUUID(log, cb) { getUUID(log, cb) {
const uuid = initialInstanceID || Uuid.v4(); const uuid = initialInstanceID || Uuid.v4();
this.writeUUIDIfNotExists(uuid, log, err => { this.writeToInfostoreIfNotExists(__UUID, uuid, log, err => {
if (err) { if (err) {
if (err === errors.InternalError) { if (err === errors.InternalError) {
log.error('getUUID: error getting UUID', log.error('getUUID: error getting UUID',
@ -976,6 +1037,47 @@ class MongoClientInterface {
}); });
} }
_setupReplicationGroupID(cb) {
// random 7 characters
const repGroupId = crypto.randomBytes(4).toString('hex').slice(1);
this.writeToInfostoreIfNotExists(__REP_GROUP_ID, repGroupId,
this.logger, err => {
if (err) {
if (err === errors.InternalError) {
this.logger.error('_setupReplicationGroupID: error ' +
'getting replication group id', {
error: err.message,
});
return cb(err);
}
return this.getReplicationGroupID(this.logger, cb);
}
// need to save to instance
this.replicationGroupId = repGroupId;
return cb(null, repGroupId);
});
}
getReplicationGroupID(log, cb) {
const i = this.getCollection(INFOSTORE);
i.findOne({
_id: __REP_GROUP_ID,
}, {}, (err, doc) => {
if (err) {
log.error('getReplicationGroupID: error reading ' +
'replication group id', {
error: err.message,
});
return cb(errors.InternalError);
}
if (!doc) {
return cb(errors.NoSuchKey);
// return cb(null, this.replicationGroupId);
}
return cb(null, doc.value);
});
}
getDiskUsage(cb) { getDiskUsage(cb) {
// FIXME: for basic one server deployment the infrastructure // FIXME: for basic one server deployment the infrastructure
// configurator shall set a path to the actual MongoDB volume. // configurator shall set a path to the actual MongoDB volume.

View File

@ -56,6 +56,19 @@ function getInfVid(replicationGroupId) {
padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId); padLeft(MAX_SEQ, TEMPLATE_SEQ) + repGroupId);
} }
/**
* Identify if decoded version id contains given replication group id
* @param {string} version - decoded version id
* @param {string} replicationGroupId - replication group id to check
* @return {boolean} true if version id contains replication group id
*/
function hasRepGroupId(version, replicationGroupId) {
const rgStartIndex = LENGTH_TS + LENGTH_SEQ;
const rgEndIndex = rgStartIndex + LENGTH_RG;
return version.substring(rgStartIndex, rgEndIndex) === replicationGroupId;
}
// internal state of the module // internal state of the module
let lastTimestamp = 0; // epoch of the last versionId let lastTimestamp = 0; // epoch of the last versionId
let lastSeq = 0; // sequential number of the last versionId let lastSeq = 0; // sequential number of the last versionId
@ -152,4 +165,10 @@ function decode(str) {
} }
} }
module.exports = { generateVersionId, getInfVid, encode, decode }; module.exports = {
generateVersionId,
getInfVid,
encode,
decode,
hasRepGroupId,
};

View File

@ -1,5 +1,6 @@
const VID = require('../../../lib/versioning/VersionID.js'); const VID = require('../../../lib/versioning/VersionID.js');
const assert = require('assert'); const assert = require('assert');
const crypto = require('crypto');
function randkey(length) { function randkey(length) {
let key = ''; let key = '';
@ -9,31 +10,59 @@ function randkey(length) {
return key; return key;
} }
describe('test generating versionIds', () => { describe('VersionID', () => {
const count = 1000; describe('test generating versionIds', () => {
const vids = Array(count).fill(null); const count = 1000;
for (let i = 0; i < count; i++) { const vids = Array(count).fill(null);
vids[i] = VID.generateVersionId(randkey(15), 'PARIS');
}
process.env.VID_CRYPTO_PASSWORD = randkey(64);
it('sorted in reversed chronological and alphabetical order', () => {
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
if (i !== 0) { vids[i] = VID.generateVersionId(randkey(15), 'PARIS');
assert(vids[i - 1] > vids[i],
'previous VersionID is higher than its next');
}
} }
process.env.VID_CRYPTO_PASSWORD = randkey(64);
it('sorted in reversed chronological and alphabetical order', () => {
for (let i = 0; i < count; i++) {
if (i !== 0) {
assert(vids[i - 1] > vids[i],
'previous VersionID is higher than its next');
}
}
});
it('should return error decoding non-hex string versionIds', () => {
assert(VID.decode('foo') instanceof Error);
});
it('should encode and decode versionIds', () => {
const encoded = vids.map(vid => VID.encode(vid));
const decoded = encoded.map(vid => VID.decode(vid));
assert.strictEqual(vids.length, count);
assert.deepStrictEqual(vids, decoded);
});
}); });
it('should return error decoding non-hex string versionIds', () => { describe('::hasRepGroupId', () => {
assert(VID.decode('foo') instanceof Error); it('should find if version id has replication group id', () => {
}); const repGroupId1 = 'ZENKO ';
const vid1 = VID.generateVersionId(randkey(15), repGroupId1);
// generate random 7 characters
const repGroupId2 = crypto.randomBytes(4).toString('hex').slice(1);
const vid2 = VID.generateVersionId(randkey(15), repGroupId2);
assert.strictEqual(VID.hasRepGroupId(vid1, 'fAlSe'), false);
assert.strictEqual(VID.hasRepGroupId(vid2, 'nope012'), false);
assert.strictEqual(VID.hasRepGroupId(vid1, repGroupId1), true);
assert.strictEqual(VID.hasRepGroupId(vid2, repGroupId2), true);
it('should encode and decode versionIds', () => { // to compare against production version ids with existing default
const encoded = vids.map(vid => VID.encode(vid)); // replication group id
const decoded = encoded.map(vid => VID.decode(vid)); const repGroupIdDefault = 'RG001 ';
assert.strictEqual(vids.length, count); [
assert.deepStrictEqual(vids, decoded); '98445675956517999999RG001 14.124.3',
'99999999999999999999RG001 ',
'98445608101957999999RG001 24.14',
].forEach(v => {
assert.strictEqual(
VID.hasRepGroupId(v, repGroupIdDefault), true);
});
});
}); });
}); });