Compare commits
No commits in common. "b7b4f1828d5a81c9085e460070ae214265d380cd" and "f11ccbfefac5efe99370f13417152338de0844cc" have entirely different histories.
b7b4f1828d
...
f11ccbfefa
|
@ -172,37 +172,6 @@ class MetadataWrapper {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
_handleJobFuncResult(bucket, key, params, res, log, cb) {
|
|
||||||
const { retry, objVal } = res;
|
|
||||||
if (retry) {
|
|
||||||
// Apply exponential timeout.
|
|
||||||
return this.safePutObjectMD(bucket, key, params, log, cb);
|
|
||||||
}
|
|
||||||
if (objVal === undefined) {
|
|
||||||
return cb();
|
|
||||||
}
|
|
||||||
// If the given tag is not matched, then use exponential backoff to retry.
|
|
||||||
return this.putObjectMD(bucket, key, objVal, params, log, cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
safePutObjectMD(bucket, key, params, log, cb) {
|
|
||||||
// Read the MD of the given bucket and key.
|
|
||||||
this.client.getObject(bucket, key, params, log, (err, objMD, tag) => {
|
|
||||||
if (err) {
|
|
||||||
log.debug('error from metadata', {
|
|
||||||
implName: this.implName,
|
|
||||||
error: err,
|
|
||||||
});
|
|
||||||
// TODO: Are there special cases we need to handle?
|
|
||||||
return cb(err);
|
|
||||||
}
|
|
||||||
// TODO: Add validation of the jobFunc.
|
|
||||||
// Perform the jobFunc action on the newly fetched MD.
|
|
||||||
const res = params.jobFunc(bucket, key, tag, objMD, log);
|
|
||||||
return this._handleJobFuncResult(bucket, key, params, res, log, cb);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
putObjectMD(bucketName, objName, objVal, params, log, cb) {
|
putObjectMD(bucketName, objName, objVal, params, log, cb) {
|
||||||
log.debug('putting object in metadata');
|
log.debug('putting object in metadata');
|
||||||
const value = typeof objVal.getValue === 'function' ?
|
const value = typeof objVal.getValue === 'function' ?
|
||||||
|
|
|
@ -356,9 +356,7 @@ class MongoClientInterface {
|
||||||
_id: vObjName,
|
_id: vObjName,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
_id: vObjName,
|
_id: vObjName, value: objVal,
|
||||||
tag: MongoUtils.getTag(),
|
|
||||||
value: objVal,
|
|
||||||
},
|
},
|
||||||
upsert: true,
|
upsert: true,
|
||||||
},
|
},
|
||||||
|
@ -380,9 +378,7 @@ class MongoClientInterface {
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
_id: objName,
|
_id: objName, value: objVal,
|
||||||
tag: MongoUtils.getTag(),
|
|
||||||
value: objVal,
|
|
||||||
},
|
},
|
||||||
upsert: true,
|
upsert: true,
|
||||||
},
|
},
|
||||||
|
@ -423,18 +419,14 @@ class MongoClientInterface {
|
||||||
const versionId = generateVersionId(this.replicationGroupId);
|
const versionId = generateVersionId(this.replicationGroupId);
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
objVal.versionId = versionId;
|
objVal.versionId = versionId;
|
||||||
const query = { _id: objName };
|
c.update({
|
||||||
const update = {
|
_id: objName,
|
||||||
|
}, {
|
||||||
_id: objName,
|
_id: objName,
|
||||||
tag: MongoUtils.getTag(),
|
|
||||||
value: objVal,
|
value: objVal,
|
||||||
};
|
}, {
|
||||||
const options = { upsert: true };
|
upsert: true,
|
||||||
if (params.condPut && params.condPut.tag) {
|
}, err => {
|
||||||
query.tag = params.condPut.tag;
|
|
||||||
options.upsert = false;
|
|
||||||
}
|
|
||||||
c.update(query, update, options, err => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
log.error(
|
||||||
'putObjectVerCase2: error putting object version',
|
'putObjectVerCase2: error putting object version',
|
||||||
|
@ -459,19 +451,12 @@ class MongoClientInterface {
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
objVal.versionId = params.versionId;
|
objVal.versionId = params.versionId;
|
||||||
const vObjName = formatVersionKey(objName, params.versionId);
|
const vObjName = formatVersionKey(objName, params.versionId);
|
||||||
const query = { _id: objName };
|
c.findOne({ _id: objName }, (err, checkObj) => {
|
||||||
if (params.condPut && params.condPut.tag) {
|
|
||||||
query.tag = params.condPut.tag;
|
|
||||||
}
|
|
||||||
c.findOne(query, (err, checkObj) => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('putObjectVerCase3: mongoDB error finding object');
|
log.error('putObjectVerCase3: mongoDB error finding object');
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
const objUpsert = !checkObj;
|
const objUpsert = !checkObj;
|
||||||
if (objUpsert && params.condPut.tag) {
|
|
||||||
return cb();
|
|
||||||
}
|
|
||||||
c.bulkWrite([{
|
c.bulkWrite([{
|
||||||
updateOne: {
|
updateOne: {
|
||||||
filter: {
|
filter: {
|
||||||
|
@ -480,7 +465,6 @@ class MongoClientInterface {
|
||||||
update: {
|
update: {
|
||||||
$set: {
|
$set: {
|
||||||
_id: vObjName,
|
_id: vObjName,
|
||||||
tag: MongoUtils.getTag(),
|
|
||||||
value: objVal,
|
value: objVal,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -496,7 +480,6 @@ class MongoClientInterface {
|
||||||
update: {
|
update: {
|
||||||
$set: {
|
$set: {
|
||||||
_id: objName,
|
_id: objName,
|
||||||
tag: MongoUtils.getTag(),
|
|
||||||
value: objVal,
|
value: objVal,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -529,18 +512,14 @@ class MongoClientInterface {
|
||||||
* Put object when versioning is not enabled
|
* Put object when versioning is not enabled
|
||||||
*/
|
*/
|
||||||
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
|
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
|
||||||
const query = { _id: objName };
|
c.update({
|
||||||
const update = {
|
_id: objName,
|
||||||
|
}, {
|
||||||
_id: objName,
|
_id: objName,
|
||||||
tag: MongoUtils.getTag(),
|
|
||||||
value: objVal,
|
value: objVal,
|
||||||
};
|
}, {
|
||||||
const options = { upsert: true };
|
upsert: true,
|
||||||
if (params.condPut && params.condPut.tag) {
|
}, err => {
|
||||||
query.tag = params.condPut.tag;
|
|
||||||
options.upsert = false;
|
|
||||||
}
|
|
||||||
c.update(query, update, options, err => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
log.error(
|
||||||
'putObjectNoVer: error putting obect with no versioning',
|
'putObjectNoVer: error putting obect with no versioning',
|
||||||
|
@ -597,7 +576,7 @@ class MongoClientInterface {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
MongoUtils.unserialize(doc.value);
|
MongoUtils.unserialize(doc.value);
|
||||||
return cb(null, doc.value, doc.tag);
|
return cb(null, doc.value);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
const uuidv4 = require('uuid/v4');
|
|
||||||
|
|
||||||
function escape(obj) {
|
function escape(obj) {
|
||||||
return JSON.parse(JSON.stringify(obj).
|
return JSON.parse(JSON.stringify(obj).
|
||||||
|
@ -28,8 +27,4 @@ function unserialize(objMD) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function getTag() {
|
module.exports = { escape, unescape, serialize, unserialize };
|
||||||
return uuidv4();
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = { escape, unescape, serialize, unserialize, getTag };
|
|
||||||
|
|
Loading…
Reference in New Issue