Compare commits

...

2 Commits

Author SHA1 Message Date
bbuchanan9 b7b4f1828d feature: ZENKO-1439 Conditional PUT logic 2019-02-20 11:12:15 -08:00
bbuchanan9 6fb650aa78 feature: ZENKO-1438 Add conditional MD puts 2019-02-20 11:12:15 -08:00
3 changed files with 74 additions and 17 deletions

View File

@ -172,6 +172,37 @@ class MetadataWrapper {
}); });
} }
_handleJobFuncResult(bucket, key, params, res, log, cb) {
const { retry, objVal } = res;
if (retry) {
// Apply exponential timeout.
return this.safePutObjectMD(bucket, key, params, log, cb);
}
if (objVal === undefined) {
return cb();
}
// If the given tag is not matched, then use exponential backoff to retry.
return this.putObjectMD(bucket, key, objVal, params, log, cb);
}
safePutObjectMD(bucket, key, params, log, cb) {
// Read the MD of the given bucket and key.
this.client.getObject(bucket, key, params, log, (err, objMD, tag) => {
if (err) {
log.debug('error from metadata', {
implName: this.implName,
error: err,
});
// TODO: Are there special cases we need to handle?
return cb(err);
}
// TODO: Add validation of the jobFunc.
// Perform the jobFunc action on the newly fetched MD.
const res = params.jobFunc(bucket, key, tag, objMD, log);
return this._handleJobFuncResult(bucket, key, params, res, log, cb);
});
}
putObjectMD(bucketName, objName, objVal, params, log, cb) { putObjectMD(bucketName, objName, objVal, params, log, cb) {
log.debug('putting object in metadata'); log.debug('putting object in metadata');
const value = typeof objVal.getValue === 'function' ? const value = typeof objVal.getValue === 'function' ?

View File

@ -356,7 +356,9 @@ class MongoClientInterface {
_id: vObjName, _id: vObjName,
}, },
update: { update: {
_id: vObjName, value: objVal, _id: vObjName,
tag: MongoUtils.getTag(),
value: objVal,
}, },
upsert: true, upsert: true,
}, },
@ -378,7 +380,9 @@ class MongoClientInterface {
], ],
}, },
update: { update: {
_id: objName, value: objVal, _id: objName,
tag: MongoUtils.getTag(),
value: objVal,
}, },
upsert: true, upsert: true,
}, },
@ -419,14 +423,18 @@ class MongoClientInterface {
const versionId = generateVersionId(this.replicationGroupId); const versionId = generateVersionId(this.replicationGroupId);
// eslint-disable-next-line // eslint-disable-next-line
objVal.versionId = versionId; objVal.versionId = versionId;
c.update({ const query = { _id: objName };
_id: objName, const update = {
}, {
_id: objName, _id: objName,
tag: MongoUtils.getTag(),
value: objVal, value: objVal,
}, { };
upsert: true, const options = { upsert: true };
}, err => { if (params.condPut && params.condPut.tag) {
query.tag = params.condPut.tag;
options.upsert = false;
}
c.update(query, update, options, err => {
if (err) { if (err) {
log.error( log.error(
'putObjectVerCase2: error putting object version', 'putObjectVerCase2: error putting object version',
@ -451,12 +459,19 @@ class MongoClientInterface {
// eslint-disable-next-line // eslint-disable-next-line
objVal.versionId = params.versionId; objVal.versionId = params.versionId;
const vObjName = formatVersionKey(objName, params.versionId); const vObjName = formatVersionKey(objName, params.versionId);
c.findOne({ _id: objName }, (err, checkObj) => { const query = { _id: objName };
if (params.condPut && params.condPut.tag) {
query.tag = params.condPut.tag;
}
c.findOne(query, (err, checkObj) => {
if (err) { if (err) {
log.error('putObjectVerCase3: mongoDB error finding object'); log.error('putObjectVerCase3: mongoDB error finding object');
return cb(errors.InternalError); return cb(errors.InternalError);
} }
const objUpsert = !checkObj; const objUpsert = !checkObj;
if (objUpsert && params.condPut.tag) {
return cb();
}
c.bulkWrite([{ c.bulkWrite([{
updateOne: { updateOne: {
filter: { filter: {
@ -465,6 +480,7 @@ class MongoClientInterface {
update: { update: {
$set: { $set: {
_id: vObjName, _id: vObjName,
tag: MongoUtils.getTag(),
value: objVal, value: objVal,
}, },
}, },
@ -480,6 +496,7 @@ class MongoClientInterface {
update: { update: {
$set: { $set: {
_id: objName, _id: objName,
tag: MongoUtils.getTag(),
value: objVal, value: objVal,
}, },
}, },
@ -512,14 +529,18 @@ class MongoClientInterface {
* Put object when versioning is not enabled * Put object when versioning is not enabled
*/ */
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) { putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
c.update({ const query = { _id: objName };
_id: objName, const update = {
}, {
_id: objName, _id: objName,
tag: MongoUtils.getTag(),
value: objVal, value: objVal,
}, { };
upsert: true, const options = { upsert: true };
}, err => { if (params.condPut && params.condPut.tag) {
query.tag = params.condPut.tag;
options.upsert = false;
}
c.update(query, update, options, err => {
if (err) { if (err) {
log.error( log.error(
'putObjectNoVer: error putting obect with no versioning', 'putObjectNoVer: error putting obect with no versioning',
@ -576,7 +597,7 @@ class MongoClientInterface {
return undefined; return undefined;
} }
MongoUtils.unserialize(doc.value); MongoUtils.unserialize(doc.value);
return cb(null, doc.value); return cb(null, doc.value, doc.tag);
}); });
} }

View File

@ -1,3 +1,4 @@
const uuidv4 = require('uuid/v4');
function escape(obj) { function escape(obj) {
return JSON.parse(JSON.stringify(obj). return JSON.parse(JSON.stringify(obj).
@ -27,4 +28,8 @@ function unserialize(objMD) {
} }
} }
module.exports = { escape, unescape, serialize, unserialize }; function getTag() {
return uuidv4();
}
module.exports = { escape, unescape, serialize, unserialize, getTag };