Compare commits
2 Commits
d9bf780296
...
44252379cf
Author | SHA1 | Date |
---|---|---|
JianqinWang | 44252379cf | |
JianqinWang | 0ee2d2a723 |
|
@ -0,0 +1,347 @@
|
|||
const errors = require('../../errors');
|
||||
|
||||
const BucketInfo = require('../../models/BucketInfo');
|
||||
|
||||
const BucketClientInterface = require('./bucketclient/BucketClientInterface');
|
||||
const BucketFileInterface = require('./file/BucketFileInterface');
|
||||
const MongoClientInterface = require('./mongoclient/MongoClientInterface');
|
||||
const metastore = require('./in_memory/metastore');
|
||||
|
||||
let CdmiMetadata;
|
||||
try {
|
||||
CdmiMetadata = require('cdmiclient').CdmiMetadata;
|
||||
} catch (err) {
|
||||
CdmiMetadata = null;
|
||||
}
|
||||
|
||||
let bucketNotificationHook;
|
||||
|
||||
/** _parseListEntries - parse the values returned in a listing by metadata
|
||||
* @param {object[]} entries - Version or Content entries in a metadata listing
|
||||
* @param {string} entries[].key - metadata key
|
||||
* @param {string} entries[].value - stringified object metadata
|
||||
* @return {object} - mapped array with parsed value or JSON parsing err
|
||||
*/
|
||||
function _parseListEntries(entries) {
|
||||
return entries.map(entry => {
|
||||
const tmp = JSON.parse(entry.value);
|
||||
return {
|
||||
key: entry.key,
|
||||
value: {
|
||||
Size: tmp['content-length'],
|
||||
ETag: tmp['content-md5'],
|
||||
VersionId: tmp.versionId,
|
||||
IsNull: tmp.isNull,
|
||||
IsDeleteMarker: tmp.isDeleteMarker,
|
||||
LastModified: tmp['last-modified'],
|
||||
Owner: {
|
||||
DisplayName: tmp['owner-display-name'],
|
||||
ID: tmp['owner-id'],
|
||||
},
|
||||
StorageClass: tmp['x-amz-storage-class'],
|
||||
// MPU listing properties
|
||||
Initiated: tmp.initiated,
|
||||
Initiator: tmp.initiator,
|
||||
EventualStorageBucket: tmp.eventualStorageBucket,
|
||||
partLocations: tmp.partLocations,
|
||||
creationDate: tmp.creationDate,
|
||||
},
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/** parseListEntries - parse the values returned in a listing by metadata
|
||||
* @param {object[]} entries - Version or Content entries in a metadata listing
|
||||
* @param {string} entries[].key - metadata key
|
||||
* @param {string} entries[].value - stringified object metadata
|
||||
* @return {(object|Error)} - mapped array with parsed value or JSON parsing err
|
||||
*/
|
||||
function parseListEntries(entries) {
|
||||
// wrap private function in a try/catch clause
|
||||
// just in case JSON parsing throws an exception
|
||||
try {
|
||||
return _parseListEntries(entries);
|
||||
} catch (e) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
|
||||
class MetadataWrapper {
|
||||
constructor(clientName, params, bucketclient, logger) {
|
||||
if (clientName === 'mem') {
|
||||
this.client = metastore;
|
||||
this.implName = 'memorybucket';
|
||||
} else if (clientName === 'file') {
|
||||
this.client = new BucketFileInterface(params, logger);
|
||||
this.implName = 'bucketfile';
|
||||
} else if (clientName === 'scality') {
|
||||
this.client = new BucketClientInterface(params, bucketclient,
|
||||
logger);
|
||||
this.implName = 'bucketclient';
|
||||
} else if (clientName === 'mongodb') {
|
||||
this.client = new MongoClientInterface({
|
||||
replicaSetHosts: params.mongodb.replicaSetHosts,
|
||||
writeConcern: params.mongodb.writeConcern,
|
||||
replicaSet: params.mongodb.replicaSet,
|
||||
readPreference: params.mongodb.readPreference,
|
||||
database: params.mongodb.database,
|
||||
replicationGroupId: params.replicationGroupId,
|
||||
path: params.mongodb.path,
|
||||
logger,
|
||||
});
|
||||
this.implName = 'mongoclient';
|
||||
} else if (clientName === 'cdmi') {
|
||||
if (!CdmiMetadata) {
|
||||
throw new Error('Unauthorized backend');
|
||||
}
|
||||
|
||||
this.client = new CdmiMetadata({
|
||||
path: params.cdmi.path,
|
||||
host: params.cdmi.host,
|
||||
port: params.cdmi.port,
|
||||
readonly: params.cdmi.readonly,
|
||||
});
|
||||
this.implName = 'cdmi';
|
||||
}
|
||||
}
|
||||
|
||||
setup(done) {
|
||||
if (this.client.setup) {
|
||||
return this.client.setup(done);
|
||||
}
|
||||
return process.nextTick(() => done);
|
||||
}
|
||||
|
||||
createBucket(bucketName, bucketMD, log, cb) {
|
||||
log.debug('creating bucket in metadata');
|
||||
this.client.createBucket(bucketName, bucketMD, log, err => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
error: err });
|
||||
return cb(err);
|
||||
}
|
||||
log.trace('bucket created in metadata');
|
||||
if (bucketNotificationHook) {
|
||||
setTimeout(bucketNotificationHook);
|
||||
}
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
updateBucket(bucketName, bucketMD, log, cb) {
|
||||
log.debug('updating bucket in metadata');
|
||||
this.client.putBucketAttributes(bucketName, bucketMD, log, err => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
error: err });
|
||||
return cb(err);
|
||||
}
|
||||
log.trace('bucket updated in metadata');
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
getBucket(bucketName, log, cb) {
|
||||
log.debug('getting bucket from metadata');
|
||||
this.client.getBucketAttributes(bucketName, log, (err, data) => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
error: err });
|
||||
return cb(err);
|
||||
}
|
||||
log.trace('bucket retrieved from metadata');
|
||||
return cb(err, BucketInfo.fromObj(data));
|
||||
});
|
||||
}
|
||||
|
||||
deleteBucket(bucketName, log, cb) {
|
||||
log.debug('deleting bucket from metadata');
|
||||
this.client.deleteBucket(bucketName, log, err => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
error: err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('Deleted bucket from Metadata');
|
||||
if (bucketNotificationHook) {
|
||||
setTimeout(bucketNotificationHook);
|
||||
}
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
putObjectMD(bucketName, objName, objVal, params, log, cb) {
|
||||
log.debug('putting object in metadata');
|
||||
const value = typeof objVal.getValue === 'function' ?
|
||||
objVal.getValue() : objVal;
|
||||
this.client.putObject(bucketName, objName, value, params, log,
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
error: err });
|
||||
return cb(err);
|
||||
}
|
||||
if (data) {
|
||||
log.debug('object version successfully put in metadata',
|
||||
{ version: data });
|
||||
} else {
|
||||
log.debug('object successfully put in metadata');
|
||||
}
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
getBucketAndObjectMD(bucketName, objName, params, log, cb) {
|
||||
log.debug('getting bucket and object from metadata',
|
||||
{ database: bucketName, object: objName });
|
||||
this.client.getBucketAndObject(bucketName, objName, params, log,
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('bucket and object retrieved from metadata',
|
||||
{ database: bucketName, object: objName });
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
getObjectMD(bucketName, objName, params, log, cb) {
|
||||
log.debug('getting object from metadata');
|
||||
this.client.getObject(bucketName, objName, params, log, (err, data) => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('object retrieved from metadata');
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
deleteObjectMD(bucketName, objName, params, log, cb) {
|
||||
log.debug('deleting object from metadata');
|
||||
this.client.deleteObject(bucketName, objName, params, log, err => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('object deleted from metadata');
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
listObject(bucketName, listingParams, log, cb) {
|
||||
if (listingParams.listingType === undefined) {
|
||||
// eslint-disable-next-line
|
||||
listingParams.listingType = 'Delimiter';
|
||||
}
|
||||
this.client.listObject(bucketName, listingParams, log, (err, data) => {
|
||||
log.debug('getting object listing from metadata');
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('object listing retrieved from metadata');
|
||||
if (listingParams.listingType === 'DelimiterVersions') {
|
||||
// eslint-disable-next-line
|
||||
data.Versions = parseListEntries(data.Versions);
|
||||
if (data.Versions instanceof Error) {
|
||||
log.error('error parsing metadata listing', {
|
||||
error: data.Versions,
|
||||
listingType: listingParams.listingType,
|
||||
method: 'listObject',
|
||||
});
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null, data);
|
||||
}
|
||||
// eslint-disable-next-line
|
||||
data.Contents = parseListEntries(data.Contents);
|
||||
if (data.Contents instanceof Error) {
|
||||
log.error('error parsing metadata listing', {
|
||||
error: data.Contents,
|
||||
listingType: listingParams.listingType,
|
||||
method: 'listObject',
|
||||
});
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null, data);
|
||||
});
|
||||
}
|
||||
|
||||
listMultipartUploads(bucketName, listingParams, log, cb) {
|
||||
this.client.listMultipartUploads(bucketName, listingParams, log,
|
||||
(err, data) => {
|
||||
log.debug('getting mpu listing from metadata');
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('mpu listing retrieved from metadata');
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
switch(newClient, cb) {
|
||||
this.client = newClient;
|
||||
return cb();
|
||||
}
|
||||
|
||||
getRaftBuckets(raftId, log, cb) {
|
||||
if (!this.client.getRaftBuckets) {
|
||||
return cb();
|
||||
}
|
||||
return this.client.getRaftBuckets(raftId, log, cb);
|
||||
}
|
||||
|
||||
checkHealth(log, cb) {
|
||||
if (!this.client.checkHealth) {
|
||||
const defResp = {};
|
||||
defResp[this.implName] = { code: 200, message: 'OK' };
|
||||
return cb(null, defResp);
|
||||
}
|
||||
return this.client.checkHealth(this.implName, log, cb);
|
||||
}
|
||||
|
||||
getUUID(log, cb) {
|
||||
if (!this.client.getUUID) {
|
||||
log.debug('returning empty uuid as fallback', {
|
||||
implName: this.implName });
|
||||
return cb(null, '');
|
||||
}
|
||||
return this.client.getUUID(log, cb);
|
||||
}
|
||||
|
||||
getDiskUsage(log, cb) {
|
||||
if (!this.client.getDiskUsage) {
|
||||
log.debug('returning empty disk usage as fallback', {
|
||||
implName: this.implName });
|
||||
return cb(null, {});
|
||||
}
|
||||
return this.client.getDiskUsage(cb);
|
||||
}
|
||||
|
||||
countItems(log, cb) {
|
||||
if (!this.client.countItems) {
|
||||
log.debug('returning zero item counts as fallback', {
|
||||
implName: this.implName });
|
||||
return cb(null, {
|
||||
buckets: 0,
|
||||
objects: 0,
|
||||
versions: 0,
|
||||
});
|
||||
}
|
||||
return this.client.countItems(log, cb);
|
||||
}
|
||||
|
||||
notifyBucketChange(cb) {
|
||||
bucketNotificationHook = cb;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = MetadataWrapper;
|
|
@ -0,0 +1,147 @@
|
|||
const assert = require('assert');
|
||||
|
||||
const BucketInfo = require('../../../models/BucketInfo');
|
||||
|
||||
class BucketClientInterface {
|
||||
constructor(params, bucketclient, logger) {
|
||||
assert(params.bucketdBootstrap.length > 0,
|
||||
'bucketd bootstrap list is empty');
|
||||
const bootstrap = params.bucketdBootstrap;
|
||||
const log = params.bucketdLog;
|
||||
if (params.https) {
|
||||
const { key, cert, ca } = params.https;
|
||||
logger.info('bucketclient configuration', {
|
||||
bootstrap,
|
||||
log,
|
||||
https: true,
|
||||
});
|
||||
this.client = new bucketclient.RESTClient(bootstrap, log, true,
|
||||
key, cert, ca);
|
||||
} else {
|
||||
logger.info('bucketclient configuration', {
|
||||
bootstrap,
|
||||
log,
|
||||
https: false,
|
||||
});
|
||||
this.client = new bucketclient.RESTClient(bootstrap, log);
|
||||
}
|
||||
}
|
||||
|
||||
createBucket(bucketName, bucketMD, log, cb) {
|
||||
this.client.createBucket(bucketName, log.getSerializedUids(),
|
||||
bucketMD.serialize(), cb);
|
||||
return null;
|
||||
}
|
||||
|
||||
getBucketAttributes(bucketName, log, cb) {
|
||||
this.client.getBucketAttributes(bucketName, log.getSerializedUids(),
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(err, BucketInfo.deSerialize(data));
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
getBucketAndObject(bucketName, objName, params, log, cb) {
|
||||
this.client.getBucketAndObject(bucketName, objName,
|
||||
log.getSerializedUids(), (err, data) => {
|
||||
if (err && (!err.NoSuchKey && !err.ObjNotFound)) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, JSON.parse(data));
|
||||
}, params);
|
||||
return null;
|
||||
}
|
||||
|
||||
getRaftBuckets(raftId, log, cb) {
|
||||
return this.client.getRaftBuckets(raftId, log.getSerializedUids(),
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, JSON.parse(data));
|
||||
});
|
||||
}
|
||||
|
||||
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
||||
this.client.putBucketAttributes(bucketName, log.getSerializedUids(),
|
||||
bucketMD.serialize(), cb);
|
||||
return null;
|
||||
}
|
||||
|
||||
deleteBucket(bucketName, log, cb) {
|
||||
this.client.deleteBucket(bucketName, log.getSerializedUids(), cb);
|
||||
return null;
|
||||
}
|
||||
|
||||
putObject(bucketName, objName, objVal, params, log, cb) {
|
||||
this.client.putObject(bucketName, objName, JSON.stringify(objVal),
|
||||
log.getSerializedUids(), cb, params);
|
||||
return null;
|
||||
}
|
||||
|
||||
getObject(bucketName, objName, params, log, cb) {
|
||||
this.client.getObject(bucketName, objName, log.getSerializedUids(),
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(err, JSON.parse(data));
|
||||
}, params);
|
||||
return null;
|
||||
}
|
||||
|
||||
deleteObject(bucketName, objName, params, log, cb) {
|
||||
this.client.deleteObject(bucketName, objName, log.getSerializedUids(),
|
||||
cb, params);
|
||||
return null;
|
||||
}
|
||||
|
||||
listObject(bucketName, params, log, cb) {
|
||||
this.client.listObject(bucketName, log.getSerializedUids(), params,
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(err, JSON.parse(data));
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
listMultipartUploads(bucketName, params, log, cb) {
|
||||
this.client.listObject(bucketName, log.getSerializedUids(), params,
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, JSON.parse(data));
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
checkHealth(implName, log, cb) {
|
||||
return this.client.healthcheck(log, (err, result) => {
|
||||
const respBody = {};
|
||||
if (err) {
|
||||
log.error(`error from ${implName}`, { error: err });
|
||||
respBody[implName] = {
|
||||
error: err,
|
||||
};
|
||||
// error returned as null so async parallel doesn't return
|
||||
// before all backends are checked
|
||||
return cb(null, respBody);
|
||||
}
|
||||
const parseResult = JSON.parse(result);
|
||||
respBody[implName] = {
|
||||
code: 200,
|
||||
message: 'OK',
|
||||
body: parseResult,
|
||||
};
|
||||
return cb(null, respBody);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = BucketClientInterface;
|
Loading…
Reference in New Issue