S3: Partial implementation.
parent
81a7a3ab81
commit
2a3838c360
|
@ -11,166 +11,346 @@
|
|||
#include <unistd.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <libs3.h>
|
||||
|
||||
#include "ior.h"
|
||||
#include "aiori.h"
|
||||
#include "aiori-debug.h"
|
||||
#include "utilities.h"
|
||||
|
||||
|
||||
static aiori_xfer_hint_t * hints = NULL;
|
||||
|
||||
static void s3_xfer_hints(aiori_xfer_hint_t * params){
|
||||
hints = params;
|
||||
}
|
||||
|
||||
/************************** O P T I O N S *****************************/
|
||||
typedef struct {
|
||||
uint64_t delay_creates;
|
||||
uint64_t delay_xfer;
|
||||
int delay_rank_0_only;
|
||||
} dummy_options_t;
|
||||
int bucket_per_dir;
|
||||
char * access_key;
|
||||
char * secret_key;
|
||||
char * host;
|
||||
char * bucket_prefix;
|
||||
char * bucket_prefix_cur;
|
||||
char * locationConstraint;
|
||||
|
||||
static char * current = (char*) 1;
|
||||
int dont_suffix;
|
||||
int s3_compatible;
|
||||
int use_ssl;
|
||||
S3BucketContext bucket_context;
|
||||
S3Protocol s3_protocol;
|
||||
} s3_options_t;
|
||||
|
||||
static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
|
||||
dummy_options_t * o = malloc(sizeof(dummy_options_t));
|
||||
s3_options_t * o = malloc(sizeof(s3_options_t));
|
||||
if (init_values != NULL){
|
||||
memcpy(o, init_values, sizeof(dummy_options_t));
|
||||
memcpy(o, init_values, sizeof(s3_options_t));
|
||||
}else{
|
||||
memset(o, 0, sizeof(dummy_options_t));
|
||||
memset(o, 0, sizeof(s3_options_t));
|
||||
}
|
||||
|
||||
*init_backend_options = (aiori_mod_opt_t*) o;
|
||||
o->bucket_prefix = "ior";
|
||||
|
||||
option_help h [] = {
|
||||
{0, "dummy.delay-create", "Delay per create in usec", OPTION_OPTIONAL_ARGUMENT, 'l', & o->delay_creates},
|
||||
{0, "dummy.delay-xfer", "Delay per xfer in usec", OPTION_OPTIONAL_ARGUMENT, 'l', & o->delay_xfer},
|
||||
{0, "dummy.delay-only-rank0", "Delay only Rank0", OPTION_FLAG, 'd', & o->delay_rank_0_only},
|
||||
LAST_OPTION
|
||||
//{0, "S3.bucket-per-directory", "Use one bucket to map one dir, otherwise only one bucket is used.", OPTION_FLAG, 'd', & o->bucket_per_dir},
|
||||
{0, "S3.bucket-name-prefix", "The name of the bucket (when using without -b), otherwise it is used as prefix.", OPTION_OPTIONAL_ARGUMENT, 's', & o->bucket_prefix},
|
||||
{0, "S3.dont-suffix-bucket", "If not selected, then a hash will be added to the bucket name to increase uniqueness.", OPTION_FLAG, 'd', & o->dont_suffix },
|
||||
{0, "S3.s3-compatible", "to be selected when using S3 compatible storage", OPTION_FLAG, 'd', & o->s3_compatible },
|
||||
{0, "S3.use-ssl", "used to specify that SSL is needed for the connection", OPTION_FLAG, 'd', & o->use_ssl },
|
||||
{0, "S3.host", "The host optionally followed by:port.", OPTION_OPTIONAL_ARGUMENT, 's', & o->host},
|
||||
{0, "S3.secret-key", "The secret key.", OPTION_OPTIONAL_ARGUMENT, 's', & o->secret_key},
|
||||
{0, "S3.access-key", "The access key.", OPTION_OPTIONAL_ARGUMENT, 's', & o->access_key},
|
||||
LAST_OPTION
|
||||
};
|
||||
option_help * help = malloc(sizeof(h));
|
||||
memcpy(help, h, sizeof(h));
|
||||
return help;
|
||||
}
|
||||
|
||||
static int count_init = 0;
|
||||
|
||||
static aiori_fd_t *S3_Create(char *testFileName, int iorflags, aiori_mod_opt_t * options)
|
||||
{
|
||||
if(count_init <= 0){
|
||||
ERR("S3 missing initialization in create\n");
|
||||
}
|
||||
if(verbose > 4){
|
||||
fprintf(out_logfile, "S3 create: %s = %p\n", testFileName, current);
|
||||
}
|
||||
dummy_options_t * o = (dummy_options_t*) options;
|
||||
if (o->delay_creates){
|
||||
if (! o->delay_rank_0_only || (o->delay_rank_0_only && rank == 0)){
|
||||
struct timespec wait = { o->delay_creates / 1000 / 1000, 1000l * (o->delay_creates % 1000000)};
|
||||
nanosleep( & wait, NULL);
|
||||
static void def_file_name(s3_options_t * o, char * out_name, char const * path){
|
||||
// duplicate path except "/"
|
||||
while(*path != 0){
|
||||
char c = *path;
|
||||
if(((c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') )){
|
||||
*out_name = *path;
|
||||
out_name++;
|
||||
}
|
||||
path++;
|
||||
}
|
||||
return (aiori_fd_t*) current++;
|
||||
*out_name = '\0';
|
||||
}
|
||||
|
||||
static aiori_fd_t *S3_Open(char *testFileName, int flags, aiori_mod_opt_t * options)
|
||||
{
|
||||
if(count_init <= 0){
|
||||
ERR("S3 missing initialization in open\n");
|
||||
static void def_bucket_name(s3_options_t * o, char * out_name, char const * path){
|
||||
// S3_MAX_BUCKET_NAME_SIZE
|
||||
if (o->bucket_per_dir){
|
||||
out_name += sprintf(out_name, "%s-", o->bucket_prefix_cur);
|
||||
}
|
||||
if(verbose > 4){
|
||||
fprintf(out_logfile, "S3 open: %s = %p\n", testFileName, current);
|
||||
}
|
||||
return (aiori_fd_t*) current++;
|
||||
}
|
||||
|
||||
static void S3_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * options)
|
||||
{
|
||||
if(verbose > 4){
|
||||
fprintf(out_logfile, "S3 fsync %p\n", fd);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void S3_Sync(aiori_mod_opt_t * options)
|
||||
{
|
||||
}
|
||||
|
||||
static void S3_Close(aiori_fd_t *fd, aiori_mod_opt_t * options)
|
||||
{
|
||||
if(verbose > 4){
|
||||
fprintf(out_logfile, "S3 close %p\n", fd);
|
||||
}
|
||||
}
|
||||
|
||||
static void S3_Delete(char *testFileName, aiori_mod_opt_t * options)
|
||||
{
|
||||
if(verbose > 4){
|
||||
fprintf(out_logfile, "S3 delete: %s\n", testFileName);
|
||||
// duplicate path except "/"
|
||||
while(*path != 0){
|
||||
char c = *path;
|
||||
if(((c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') )){
|
||||
*out_name = *path;
|
||||
out_name++;
|
||||
}
|
||||
path++;
|
||||
}
|
||||
*out_name = '\0';
|
||||
|
||||
// S3Status S3_validate_bucket_name(const char *bucketName, S3UriStyle uriStyle);
|
||||
}
|
||||
|
||||
struct data_handling{
|
||||
char * buf;
|
||||
int64_t size;
|
||||
};
|
||||
|
||||
static S3Status s3status = S3StatusInterrupted;
|
||||
static S3ErrorDetails s3error = {NULL};
|
||||
|
||||
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData){
|
||||
s3status = S3StatusOK;
|
||||
return s3status;
|
||||
}
|
||||
|
||||
static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) {
|
||||
s3status = status;
|
||||
if (error == NULL){
|
||||
s3error.message = NULL;
|
||||
}else{
|
||||
s3error = *error;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
#define CHECK_ERROR if (s3status != S3StatusOK){ \
|
||||
EWARNF("S3 \"%s\": %s - %s", S3_get_status_name(s3status), s3error.message, s3error.furtherDetails); \
|
||||
}
|
||||
|
||||
|
||||
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData){
|
||||
printf("CALLBACK\n");
|
||||
struct data_handling * dh = (struct data_handling *) callbackData;
|
||||
const int64_t size = dh->size > bufferSize ? bufferSize : dh->size;
|
||||
if(size == 0) return 0;
|
||||
memcpy(buffer, dh->buf, size);
|
||||
dh->buf += size;
|
||||
dh->size -= size;
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
static S3PutObjectHandler putObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & putObjectDataCallback };
|
||||
|
||||
static S3ResponseHandler responseHandler = { &responsePropertiesCallback, &responseCompleteCallback };
|
||||
|
||||
static char * S3_getVersion()
|
||||
{
|
||||
return "0.5";
|
||||
}
|
||||
|
||||
static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName)
|
||||
static void S3_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * options)
|
||||
{
|
||||
if(verbose > 4){
|
||||
fprintf(out_logfile, "S3 getFileSize: %s\n", testFileName);
|
||||
}
|
||||
return 0;
|
||||
// Not needed
|
||||
}
|
||||
|
||||
static IOR_offset_t S3_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options){
|
||||
if(verbose > 4){
|
||||
fprintf(out_logfile, "S3 xfer: %p\n", file);
|
||||
}
|
||||
dummy_options_t * o = (dummy_options_t*) options;
|
||||
if (o->delay_xfer){
|
||||
if (! o->delay_rank_0_only || (o->delay_rank_0_only && rank == 0)){
|
||||
struct timespec wait = {o->delay_xfer / 1000 / 1000, 1000l * (o->delay_xfer % 1000000)};
|
||||
nanosleep( & wait, NULL);
|
||||
}
|
||||
}
|
||||
return length;
|
||||
|
||||
static void S3_Sync(aiori_mod_opt_t * options)
|
||||
{
|
||||
// Not needed
|
||||
}
|
||||
|
||||
static S3Status S3ListResponseCallback(const char *ownerId, const char *ownerDisplayName, const char *bucketName, int64_t creationDateSeconds, void *callbackData){
|
||||
uint64_t * count = (uint64_t*) callbackData;
|
||||
*count++;
|
||||
return S3StatusOK;
|
||||
}
|
||||
|
||||
static S3ListServiceHandler listhandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & S3ListResponseCallback};
|
||||
|
||||
static int S3_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options){
|
||||
stat->f_bsize = 1;
|
||||
stat->f_blocks = 1;
|
||||
stat->f_bfree = 1;
|
||||
stat->f_bavail = 1;
|
||||
stat->f_files = 1;
|
||||
stat->f_ffree = 1;
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
|
||||
// use the number of bucket as files
|
||||
uint64_t buckets = 0;
|
||||
S3_list_service(o->s3_protocol, o->access_key, o->secret_key, o->host,
|
||||
NULL, & listhandler, & buckets);
|
||||
stat->f_files = buckets;
|
||||
CHECK_ERROR
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static aiori_fd_t *S3_Create(char *testFileName, int iorflags, aiori_mod_opt_t * options)
|
||||
{
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
|
||||
return (aiori_fd_t*) 1;
|
||||
}
|
||||
|
||||
static aiori_fd_t *S3_Open(char *testFileName, int flags, aiori_mod_opt_t * options)
|
||||
{
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
|
||||
return (aiori_fd_t*) 1;
|
||||
}
|
||||
|
||||
static void S3_Close(aiori_fd_t *fd, aiori_mod_opt_t * options)
|
||||
{
|
||||
}
|
||||
|
||||
static void S3_Delete(char *path, aiori_mod_opt_t * options)
|
||||
{
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
char p[FILENAME_MAX];
|
||||
def_file_name(o, p, path);
|
||||
S3_delete_object(& o->bucket_context, p, NULL, & responseHandler, NULL);
|
||||
CHECK_ERROR
|
||||
}
|
||||
|
||||
static IOR_offset_t S3_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options){
|
||||
|
||||
return length;
|
||||
}
|
||||
|
||||
static int S3_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){
|
||||
return 0;
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
char p[FILENAME_MAX];
|
||||
|
||||
def_bucket_name(o, p, path);
|
||||
if (o->bucket_per_dir){
|
||||
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, p, S3CannedAclPrivate, o->locationConstraint, NULL, & responseHandler, NULL);
|
||||
CHECK_ERROR
|
||||
return 0;
|
||||
}else{
|
||||
struct data_handling dh = { .buf = NULL, .size = 0 };
|
||||
S3_put_object(& o->bucket_context, p, 0, NULL, NULL, & putObjectHandler, & dh);
|
||||
if (! o->s3_compatible){
|
||||
CHECK_ERROR
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int S3_rmdir (const char *path, aiori_mod_opt_t * options){
|
||||
return 0;
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
char p[FILENAME_MAX];
|
||||
|
||||
def_bucket_name(o, p, path);
|
||||
if (o->bucket_per_dir){
|
||||
S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, p, NULL, & responseHandler, NULL);
|
||||
CHECK_ERROR
|
||||
return 0;
|
||||
}else{
|
||||
S3_delete_object(& o->bucket_context, p, NULL, & responseHandler, NULL);
|
||||
CHECK_ERROR
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static S3Status statResponsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData){
|
||||
// check the size
|
||||
struct stat *buf = (struct stat*) callbackData;
|
||||
if(buf != NULL){
|
||||
buf->st_size = properties->contentLength;
|
||||
buf->st_mtime = properties->lastModified;
|
||||
}
|
||||
s3status = S3StatusOK;
|
||||
return s3status;
|
||||
}
|
||||
|
||||
static S3ResponseHandler statResponseHandler = { &statResponsePropertiesCallback, &responseCompleteCallback };
|
||||
|
||||
static int S3_access (const char *path, int mode, aiori_mod_opt_t * options){
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
char p[FILENAME_MAX];
|
||||
def_file_name(o, p, path);
|
||||
|
||||
S3_head_object(& o->bucket_context, p, NULL, & statResponseHandler, NULL);
|
||||
if (s3status != S3StatusOK){
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int S3_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options){
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
char p[FILENAME_MAX];
|
||||
def_file_name(o, p, path);
|
||||
|
||||
memset(buf, 0, sizeof(struct stat));
|
||||
S3_head_object(& o->bucket_context, p, NULL, & statResponseHandler, buf);
|
||||
if (s3status != S3StatusOK){
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName)
|
||||
{
|
||||
struct stat buf;
|
||||
if(S3_stat(testFileName, & buf, options) != 0) return -1;
|
||||
return buf.st_size;
|
||||
}
|
||||
|
||||
|
||||
static int S3_check_params(aiori_mod_opt_t * options){
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void S3_init(aiori_mod_opt_t * options){
|
||||
WARN("S3 initialized");
|
||||
count_init++;
|
||||
s3_options_t * o = (s3_options_t*) options;
|
||||
int ret = S3_initialize(NULL, S3_INIT_ALL, o->host);
|
||||
|
||||
// create a bucket id based on access-key using a trivial checksumming
|
||||
if(! o->dont_suffix){
|
||||
uint64_t c = 0;
|
||||
char * r = o->access_key;
|
||||
for(uint64_t pos = 1; (*r) != '\0' ; r++, pos*=10) {
|
||||
c += (*r) * pos;
|
||||
}
|
||||
int count = snprintf(NULL, 0, "%s%lu", o->bucket_prefix, c % 1000);
|
||||
char * old_prefix = o->bucket_prefix;
|
||||
o->bucket_prefix_cur = malloc(count + 1);
|
||||
sprintf(o->bucket_prefix_cur, "%s%lu", old_prefix, c % 1000);
|
||||
}else{
|
||||
o->bucket_prefix_cur = o->bucket_prefix;
|
||||
}
|
||||
|
||||
// init bucket context
|
||||
memset(& o->bucket_context, 0, sizeof(o->bucket_context));
|
||||
o->bucket_context.hostName = o->host;
|
||||
o->bucket_context.bucketName = o->bucket_prefix_cur;
|
||||
if (! o->bucket_per_dir){
|
||||
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->bucket_context.bucketName, S3CannedAclPrivate, o->locationConstraint, NULL, & responseHandler, NULL);
|
||||
CHECK_ERROR
|
||||
}
|
||||
|
||||
if (o->use_ssl){
|
||||
o->s3_protocol = S3ProtocolHTTPS;
|
||||
}else{
|
||||
o->s3_protocol = S3ProtocolHTTP;
|
||||
}
|
||||
o->bucket_context.protocol = o->s3_protocol;
|
||||
o->bucket_context.uriStyle = S3UriStylePath;
|
||||
o->bucket_context.accessKeyId = o->access_key;
|
||||
o->bucket_context.secretAccessKey = o->secret_key;
|
||||
|
||||
if ( ret != S3StatusOK ){
|
||||
FAIL("S3 error %s", S3_get_status_name(ret));
|
||||
}
|
||||
}
|
||||
|
||||
static void S3_final(aiori_mod_opt_t * options){
|
||||
WARN("S3 finalized");
|
||||
if(count_init <= 0){
|
||||
ERR("S3 invalid finalization\n");
|
||||
}
|
||||
count_init--;
|
||||
S3_deinitialize();
|
||||
}
|
||||
|
||||
|
||||
|
@ -184,6 +364,7 @@ ior_aiori_t S3_libS3_aiori = {
|
|||
.delete = S3_Delete,
|
||||
.get_version = S3_getVersion,
|
||||
.fsync = S3_Fsync,
|
||||
.xfer_hints = s3_xfer_hints,
|
||||
.get_file_size = S3_GetFileSize,
|
||||
.statfs = S3_statfs,
|
||||
.mkdir = S3_mkdir,
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Test basic S3 behavior using minio.
|
||||
|
||||
ROOT="$(dirname ${BASH_SOURCE[0]})"
|
||||
TYPE="basic"
|
||||
|
||||
cd $ROOT
|
||||
|
||||
if [[ ! -e minio ]] ; then
|
||||
wget https://dl.min.io/server/minio/release/linux-amd64/minio
|
||||
chmod +x minio
|
||||
fi
|
||||
|
||||
export MINIO_ACCESS_KEY=accesskey
|
||||
export MINIO_SECRET_KEY=secretkey
|
||||
|
||||
./minio --quiet server /dev/shm &
|
||||
|
||||
source $ROOT/test-lib.sh
|
||||
IOR 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey
|
||||
MDTEST 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey
|
||||
|
||||
kill -9 %1
|
Loading…
Reference in New Issue