Compare commits

...

2 Commits

Author SHA1 Message Date
Bennett Buchanan f83ef8f06a [squash] Fix lint errors 2017-12-19 16:23:51 -08:00
Rached Ben Mustapha 8d2337f2b4 FT: Add Azure versioning PUT and DELETE 2017-12-19 16:23:51 -08:00
6 changed files with 501 additions and 54 deletions

View File

@ -116,7 +116,7 @@ const constants = {
// for external backends, don't call unless at least 1 minute
// (60,000 milliseconds) since last call
externalBackendHealthCheckInterval: 60000,
versioningNotImplBackends: { azure: true },
versioningNotImplBackends: {},
mpuMDStoredExternallyBackend: { aws_s3: true },
/* eslint-enable camelcase */
mpuMDStoredOnS3Backend: { azure: true },

View File

@ -1,3 +1,4 @@
const async = require('async');
const { errors, s3middleware } = require('arsenal');
const azure = require('azure-storage');
const createLogger = require('../multipleBackendLogger');
@ -41,6 +42,89 @@ class AzureClient {
}
}
createSnapshot(key, options, log, cb) {
this._errorWrapper('put', 'createBlobSnapshot',
[this._azureContainerName, key, options, (err, snapshot) => {
if (err) {
logHelper(log, 'error', 'could not get blob snapshot', err,
this._dataStoreName);
return cb(errors.ServiceUnavailable
.customizeDescription('error returned from ' +
`Azure: ${err.message}`));
}
return cb(null, key, snapshot);
}], log, cb);
}
deleteBlob(key, options, log, cb) {
return this._errorWrapper('delete', 'deleteBlobIfExists',
[this._azureContainerName, key, options, err => {
if (err) {
logHelper(log, 'error', 'error deleting object from ' +
'Azure datastore', err, this._dataStoreName);
return cb(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return cb(null, key);
}], log, cb);
}
handleVersion(keyContext, key, options, log, cb) {
const { bucketName, isDeleteMarker } = keyContext;
if (isDeleteMarker) {
return cb();
}
return metadata.getBucket(bucketName, log, (err, bucket) => {
if (err) {
return cb(err);
}
if (bucket.isVersioningEnabled()) {
return this.createSnapshot(key, options, log, cb);
}
return cb(null, key);
});
}
isLastVersion(key, params, log, cb) {
const bucketName = this._azureContainerName;
return metadata.listObject(bucketName, params, log, (err, data) => {
if (err) {
return cb(err);
}
if (data.Versions.some(version => version.key === key)) {
return cb(null, false);
}
if (data.IsTruncated) {
const nextParams = Object.assign({}, params, {
keyMarker: data.NextKeyMarker,
versionIdMarker: data.NextVersionIdMarker,
});
return this.isLastVersion(key, nextParams, log, cb);
}
return cb(null, true);
});
}
handleVersionDelete(snapshotId, key, log, cb) {
const params = {
listingType: 'DelimiterVersions',
maxKeys: 1000,
prefix: key,
};
return this.isLastVersion(key, params, log, (err, isLastVersion) => {
if (err) {
return cb(err);
}
if (isLastVersion) {
// In addition to the snapshot, also delete the blob.
return async.each([{ snapshotId }, {}], (options, next) =>
this.deleteBlob(key, options, log, next), cb);
}
return this.deleteBlob(key, { snapshotId }, log, cb);
});
}
_createAzureKey(requestBucketName, requestObjectKey,
bucketMatch) {
if (bucketMatch) {
@ -139,7 +223,8 @@ class AzureClient {
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
return this.handleVersion(keyContext, azureKey, options,
log, callback);
}], log, callback);
}
return this._errorWrapper('put', 'createBlockBlobFromStream',
@ -152,14 +237,17 @@ class AzureClient {
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
return this.handleVersion(keyContext, azureKey, options,
log, callback);
}], log, callback);
});
}
head(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
const { key, azureStreamingOptions } = objectGetInfo;
const { key, azureStreamingOptions, dataStoreVersionId } =
objectGetInfo;
azureStreamingOptions.snapshotId = dataStoreVersionId;
return this._errorWrapper('head', 'getBlobProperties',
[this._azureContainerName, key, azureStreamingOptions,
err => {
@ -185,7 +273,9 @@ class AzureClient {
get(objectGetInfo, range, reqUids, callback) {
const log = createLogger(reqUids);
// for backwards compatibility
const { key, response, azureStreamingOptions } = objectGetInfo;
const { key, response, azureStreamingOptions,
dataStoreVersionId } = objectGetInfo;
azureStreamingOptions.snapshotId = dataStoreVersionId;
this._errorWrapper('get', 'getBlobToStream',
[this._azureContainerName, key, response, azureStreamingOptions,
err => {
@ -203,19 +293,15 @@ class AzureClient {
// for backwards compatibility
const key = typeof objectGetInfo === 'string' ? objectGetInfo :
objectGetInfo.key;
return this._errorWrapper('delete', 'deleteBlobIfExists',
[this._azureContainerName, key,
err => {
if (err) {
const log = createLogger(reqUids);
logHelper(log, 'error', 'error deleting object from ' +
'Azure datastore', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback();
}], log, callback);
if (!key) {
return callback();
}
const snapshotId = typeof objectGetInfo === 'string' ? undefined :
objectGetInfo.dataStoreVersionId;
if (snapshotId) {
return this.handleVersionDelete(snapshotId, key, log, callback);
}
return this.deleteBlob(key, {}, log, callback);
}
healthcheck(location, callback, flightCheckOnStartUp) {

View File

@ -250,7 +250,8 @@ const services = {
if (err) {
return cb(err, res);
}
cb(null, res); // this is smart
// Data and metadata are deleted in the background.
cb(null, res);
log.trace('deleteObject: metadata delete OK');
const deleteLog = logger.newRequestLogger();
if (objectMD.location === null) {

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const async = require('async');
const crypto = require('crypto');
const BucketUtility = require('../../../lib/utility/bucket-util');
const withV4 = require('../../support/withV4');
@ -11,6 +12,11 @@ const {
getAzureKeys,
azureLocation,
azureLocationMismatch,
enableVersioning,
assertVersionedObj,
deleteAllSnapShots,
deleteAllBlobs,
expectedETag,
} = require('../utils');
const keyObject = 'deleteazure';
@ -25,6 +31,10 @@ const nonExistingId = process.env.AWS_ON_AIR ?
'MhhyTHhmZ4cxSi4Y9SMe5P7UJAz7HLJ9' :
'3939393939393939393936493939393939393939756e6437';
function getAzureMD5(content) {
return crypto.createHash('md5').update(content).digest('base64');
}
describeSkipIfNotMultiple('Multiple backend delete object from Azure',
function testSuite() {
this.timeout(250000);
@ -247,5 +257,239 @@ function testSuite() {
});
});
});
describe('versioning behavior', () => {
const keyPrefix = crypto.createHash('md5')
.update(Math.random().toString())
.digest('hex');
const body1 = 'a';
let key;
let masterVersionID;
before(done => enableVersioning(s3, azureContainerName, done));
beforeEach(done => {
key = `${keyPrefix}/versioned-blob-${Date.now()}`;
async.waterfall([
next =>
s3.putObject({
Bucket: azureContainerName,
Key: key,
Body: body1,
Metadata: {
'scal-location-constraint': azureLocation,
},
}, next),
(data, next) => {
masterVersionID = data.VersionId;
assertVersionedObj(s3, azureContainerName, key,
data.VersionId, body1, next);
},
], done);
});
afterEach(done =>
deleteAllSnapShots(azureClient, azureContainerName, keyPrefix,
err => {
if (err) {
return done(err);
}
return deleteAllBlobs(azureClient, azureContainerName,
keyPrefix, done);
}));
it('should put delete marker on master version', done =>
async.series([
next =>
s3.deleteObject({
Bucket: azureContainerName,
Key: key,
}, (err, data) => {
assert.strictEqual(err, null);
assert.strictEqual(data.DeleteMarker, 'true');
return next();
}),
next =>
s3.getObject({
Bucket: azureContainerName,
Key: key,
}, err => {
assert.strictEqual(err.code, 'NoSuchKey');
return next();
}),
next => {
const options = { include: 'snapshots' };
azureClient.listBlobsSegmentedWithPrefix(azureContainerName,
keyPrefix, null, options, (err, result) => {
if (err) {
return done(err);
}
// One snapshot of the previous version exists and
// a zero-byte blob denoting the "delete marker".
assert.strictEqual(result.entries.length, 2);
const snapshots = result.entries.filter(entry =>
entry.snapshot !== undefined);
assert.strictEqual(snapshots.length, 1);
const { contentMD5 } = snapshots[0].contentSettings;
assert.strictEqual(contentMD5, getAzureMD5(body1));
return next();
});
},
next =>
azureClient.getBlobProperties(azureContainerName, key,
(err, result) => {
const { contentSettings } = result;
assert.strictEqual(
contentSettings.contentMD5, getAzureMD5(''));
return next();
}),
], done));
it('should restore version if deleting a delete marker', done => {
let deleteMarkerVersionId;
async.series([
next =>
s3.deleteObject({
Bucket: azureContainerName,
Key: key,
}, (err, data) => {
assert.strictEqual(err, null);
assert.strictEqual(data.DeleteMarker, 'true');
deleteMarkerVersionId = data.VersionId;
return next();
}),
next =>
s3.getObject({
Bucket: azureContainerName,
Key: key,
}, err => {
assert.notStrictEqual(err, null);
assert.strictEqual(err.code, 'NoSuchKey');
return next();
}),
next =>
s3.deleteObject({
Bucket: azureContainerName,
Key: key,
VersionId: deleteMarkerVersionId,
}, err => {
assert.strictEqual(err, null);
return next();
}),
next =>
s3.getObject({
Bucket: azureContainerName,
Key: key,
}, (err, data) => {
assert.strictEqual(err, null);
assert.strictEqual(
data.ETag, expectedETag(data.Body));
setTimeout(() => next(), 3000);
}),
next =>
// The blob should be equivalent to the latest snapshot.
azureClient.getBlobProperties(azureContainerName, key,
(err, result) => {
const { contentSettings } = result;
assert.strictEqual(
contentSettings.contentMD5, getAzureMD5(''));
return next();
}),
], done);
});
it('should delete snapshot if deleting a version', done => {
const body2 = 'b';
let secondSnapshot;
async.waterfall([
next =>
s3.putObject({
Bucket: azureContainerName,
Key: key,
Body: body2,
Metadata: {
'scal-location-constraint': azureLocation,
},
}, next),
(data, next) =>
assertVersionedObj(s3, azureContainerName, key,
data.VersionId, body2, next),
next => {
const options = { include: 'snapshots' };
azureClient.listBlobsSegmentedWithPrefix(
azureContainerName, keyPrefix, null, options,
(err, result) => {
if (err) {
return done(err);
}
assert.strictEqual(result.entries.length, 3);
const snapshots = result.entries.filter(
entry => entry.snapshot !== undefined);
assert.strictEqual(snapshots.length, 2);
secondSnapshot = snapshots[1];
assert.strictEqual(
snapshots[0].contentSettings.contentMD5,
getAzureMD5(body1));
assert.strictEqual(
snapshots[1].contentSettings.contentMD5,
getAzureMD5(body2));
return next();
});
},
next =>
s3.deleteObject({
Bucket: azureContainerName,
Key: key,
VersionId: masterVersionID,
}, err => {
assert.strictEqual(err, null);
setTimeout(() => next(), azureTimeout);
}),
next => {
const options = { include: 'snapshots' };
azureClient.listBlobsSegmentedWithPrefix(
azureContainerName, keyPrefix, null, options,
(err, result) => {
if (err) {
return done(err);
}
assert.strictEqual(result.entries.length, 2);
const snapshots = result.entries.filter(
entry => entry.snapshot !== undefined);
assert.strictEqual(snapshots.length, 1);
assert.deepStrictEqual(
secondSnapshot, snapshots[0]);
return next();
});
},
], done);
});
it('should delete snapshot and blob if deleting last version',
done =>
async.waterfall([
next =>
s3.deleteObject({
Bucket: azureContainerName,
Key: key,
VersionId: masterVersionID,
}, err => {
assert.strictEqual(err, null);
setTimeout(() => next(), azureTimeout);
}),
next => {
const options = { include: 'snapshots' };
azureClient.listBlobsSegmentedWithPrefix(
azureContainerName, keyPrefix, null, options,
(err, result) => {
if (err) {
return done(err);
}
assert.strictEqual(result.entries.length, 0);
return next();
});
},
], done));
});
});
});

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const async = require('async');
const crypto = require('crypto');
const withV4 = require('../../support/withV4');
const BucketUtility = require('../../../lib/utility/bucket-util');
@ -13,12 +14,17 @@ const {
fileLocation,
azureLocation,
azureLocationMismatch,
enableVersioning,
putObject,
assertVersionedObj,
deleteAllSnapShots,
deleteAllBlobs,
assertNoSnapshots,
} = require('../utils');
const keyObject = 'putazure';
const azureClient = getAzureClient();
const azureContainerName = getAzureContainerName(azureLocation);
const { versioningEnabled } = require('../../../lib/utility/versioning-util');
const normalBody = Buffer.from('I am a body', 'utf8');
const normalMD5 = 'be747eb4b75517bf6b3cf7c5fbb62f3a';
@ -76,20 +82,6 @@ describeF() {
},
}, done));
it('should return a NotImplemented error if try to put ' +
'versioning to bucket with Azure location', done => {
const params = {
Bucket: azureContainerName,
VersioningConfiguration: {
Status: 'Enabled',
},
};
s3.putBucketVersioning(params, err => {
assert.strictEqual(err.code, 'NotImplemented');
done();
});
});
it('should put an object to Azure, with no object location ' +
'header, based on bucket location', function it(done) {
const params = {
@ -173,26 +165,6 @@ describeF() {
});
});
it('should return error NotImplemented putting a ' +
'version to Azure', function itF(done) {
s3.putBucketVersioning({
Bucket: azureContainerName,
VersioningConfiguration: versioningEnabled,
}, err => {
assert.equal(err, null, 'Expected success, ' +
`got error ${err}`);
const params = { Bucket: azureContainerName,
Key: this.test.keyName,
Body: normalBody,
Metadata: { 'scal-location-constraint':
azureLocation } };
s3.putObject(params, err => {
assert.strictEqual(err.code, 'NotImplemented');
done();
});
});
});
it('should put two objects to Azure with same ' +
'key, and newest object should be returned', function itF(done) {
const params = {
@ -308,5 +280,82 @@ describeF() {
});
});
});
describe('versioning behavior', () => {
const keyPrefix = crypto.createHash('md5')
.update(Math.random().toString())
.digest('hex');
const key = `${keyPrefix}/versioned-blob-${Date.now()}`;
beforeEach(done =>
s3.createBucket({
Bucket: azureContainerName,
CreateBucketConfiguration: {
LocationConstraint: azureLocation,
},
}, done));
it('should allow putting bucket versioning', done =>
enableVersioning(s3, azureContainerName, done));
it('should not create a snapshot of a zero-byte object when ' +
'versioning is not enabled', done =>
async.series([
next =>
putObject(s3, azureContainerName, key, undefined, next),
next =>
assertNoSnapshots(azureClient, azureContainerName,
keyPrefix, next),
], done));
it('should not create a snapshot when versioning is not enabled',
done =>
async.series([
next =>
putObject(s3, azureContainerName, key, 'a', next),
next =>
assertNoSnapshots(azureClient, azureContainerName,
keyPrefix, next),
], done));
describe('put object', () => {
beforeEach(done =>
enableVersioning(s3, azureContainerName, done));
afterEach(done =>
deleteAllSnapShots(azureClient, azureContainerName,
keyPrefix, err => {
if (err) {
return done(err);
}
return deleteAllBlobs(azureClient, azureContainerName,
keyPrefix, done);
}));
it('should create a snapshot for a zero-byte object', done =>
async.waterfall([
next => putObject(s3, azureContainerName, key,
undefined, next),
(data, next) =>
assertVersionedObj(s3, azureContainerName, key,
data.VersionId, '', next),
], done));
it('should create snapshots for the associated version IDs',
done => async.waterfall([
next =>
putObject(s3, azureContainerName, key, 'a', next),
(data1, next) =>
putObject(s3, azureContainerName, key, 'b',
(err, data2) => next(err, data1, data2)),
(data1, data2, next) =>
assertVersionedObj(s3, azureContainerName, key,
data1.VersionId, 'a', err => next(err, data2)),
(data2, next) =>
assertVersionedObj(s3, azureContainerName, key,
data2.VersionId, 'b', next),
], done));
});
});
});
});

View File

@ -212,6 +212,73 @@ utils.putNullVersionsToAws = (s3, bucket, key, versions, cb) => {
});
};
utils.putObject = (s3, bucket, key, body, cb) => {
s3.putObject({
Bucket: bucket,
Key: key,
Body: body,
}, cb);
};
utils.assertVersionedObj = (s3, bucket, key, versionId, expectedBody, cb) => {
s3.getObject({
Bucket: bucket,
Key: key,
VersionId: versionId,
}, (err, data) => {
_assertErrorResult(err, null, 'getting version from azure');
assert.strictEqual(data.Body.toString(), expectedBody);
cb();
});
};
utils.listSnapshotsWithPrefix = (azure, container, prefix, cb) => {
const options = { include: 'snapshots' };
azure.listBlobsSegmentedWithPrefix(container, prefix, null, options,
(err, result) => {
if (err) {
return cb(err);
}
const snapshotBlobs =
result.entries.filter(entry => entry.snapshot !== undefined);
return cb(err, snapshotBlobs);
});
};
utils.deleteAllSnapShots = (azure, container, prefix, cb) => {
utils.listSnapshotsWithPrefix(azure, container, prefix, (err, blobs) => {
if (err) {
return cb(err);
}
return async.each(blobs, (blob, next) => {
const options = { snapshotId: blob.snapshot };
azure.deleteBlob(container, blob.name, options, next);
}, cb);
});
};
utils.deleteAllBlobs = (azure, container, prefix, cb) => {
azure.listBlobsSegmentedWithPrefix(container, prefix, null, {},
(err, result) => {
if (err) {
return cb(err);
}
return async.each(result.entries, (entry, next) =>
azure.deleteBlob(container, entry.name, {}, next),
cb);
});
};
utils.assertNoSnapshots = (azure, container, prefix, cb) => {
utils.listSnapshotsWithPrefix(azure, container, prefix, (err, blobs) => {
if (err) {
return cb(err);
}
assert.strictEqual(blobs.length, 0);
return cb();
});
};
utils.getAndAssertResult = (s3, params, cb) => {
const { bucket, key, body, versionId, expectedVersionId, expectedTagCount,
expectedError } = params;