Compare commits

...

5 Commits

2 changed files with 55 additions and 2 deletions

View File

@ -1,6 +1,6 @@
import errors, { ArsenalError } from '../errors'; import errors, { ArsenalError } from '../errors';
import { Version } from './Version'; import { Version } from './Version';
import { generateVersionId as genVID } from './VersionID'; import { generateVersionId as genVID, getInfVid } from './VersionID';
import WriteCache from './WriteCache'; import WriteCache from './WriteCache';
import WriteGatheringManager from './WriteGatheringManager'; import WriteGatheringManager from './WriteGatheringManager';
@ -485,10 +485,13 @@ export default class VersioningRequestProcessor {
ops.push({ key: versionKey, value: request.value }); ops.push({ key: versionKey, value: request.value });
} }
if (data === undefined || if (data === undefined ||
(Version.from(data).getVersionId() ?? '') >= versionId) { (Version.from(data).getVersionId() ?? '') >= versionId ||
(versionId === getInfVid(this.replicationGroupId) && !Version.from(data).getVersionId())) {
// master does not exist or is not newer than put // master does not exist or is not newer than put
// version and needs to be updated as well. // version and needs to be updated as well.
// Note that older versions have a greater version ID. // Note that older versions have a greater version ID.
// If an infinite version ID is used and the master does not have a versionId,
// then the master is updated.
ops.push({ key: request.key, value: request.value }); ops.push({ key: request.key, value: request.value });
} else if (request.options.isNull) { } else if (request.options.isNull) {
logger.debug('create or update null key'); logger.debug('create or update null key');

View File

@ -1,5 +1,6 @@
const assert = require('assert'); const assert = require('assert');
const async = require('async'); const async = require('async');
const { getInfVid } = require('../../../lib/versioning/VersionID');
const Version = require('../../../lib/versioning/Version').Version; const Version = require('../../../lib/versioning/Version').Version;
@ -254,6 +255,55 @@ describe('test VRP', () => {
}], }],
done); done);
}); });
it('should update master if a infinite versionId is passed and the master does not have a versionId', done => {
async.waterfall([next => {
const request = {
db: 'foo',
key: 'bar',
value: '{"qux":"quz"}',
options: { versioning: false },
};
vsp.put(request, logger, next);
},
(res, next) => {
// should overwrite the master
const request = {
db: 'foo',
key: 'bar',
value: '{"qux":"quz2"}',
options: { versioning: true, versionId: getInfVid('PARIS') },
};
vsp.put(request, logger, next);
},
(_, next) => {
// fetch master
const request = {
db: 'foo',
key: 'bar',
};
vsp.get(request, logger, next);
},
(res, next) => {
// check that master has been updated
assert.strictEqual(JSON.parse(res).qux, 'quz2');
next();
},
(next) => {
// check that the version is accessible
const request = {
db: 'foo',
key: 'bar',
versionId: getInfVid('PARIS'),
};
vsp.get(request, logger, next);
},
(res, next) => {
assert.strictEqual(JSON.parse(res).qux, 'quz2');
next();
}],
done);
});
}); });