diff --git a/src/aiori-S3-libs3.c b/src/aiori-S3-libs3.c index 7d5bfb1..2267c53 100644 --- a/src/aiori-S3-libs3.c +++ b/src/aiori-S3-libs3.c @@ -1,5 +1,7 @@ /* * S3 implementation using the newer libs3 +* https://github.com/bji/libs3 +* Use one object per file chunk */ #ifdef HAVE_CONFIG_H @@ -27,14 +29,16 @@ static void s3_xfer_hints(aiori_xfer_hint_t * params){ /************************** O P T I O N S *****************************/ typedef struct { - int bucket_per_dir; + int bucket_per_file; char * access_key; char * secret_key; char * host; char * bucket_prefix; char * bucket_prefix_cur; char * locationConstraint; + char * authRegion; + int timeout; int dont_suffix; int s3_compatible; int use_ssl; @@ -54,7 +58,7 @@ static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_m o->bucket_prefix = "ior"; option_help h [] = { - //{0, "S3.bucket-per-directory", "Use one bucket to map one dir, otherwise only one bucket is used.", OPTION_FLAG, 'd', & o->bucket_per_dir}, + {0, "S3.bucket-per-file", "Use one bucket to map one file, otherwise only one bucket is used to store all files.", OPTION_FLAG, 'd', & o->bucket_per_file}, {0, "S3.bucket-name-prefix", "The name of the bucket (when using without -b), otherwise it is used as prefix.", OPTION_OPTIONAL_ARGUMENT, 's', & o->bucket_prefix}, {0, "S3.dont-suffix-bucket", "If not selected, then a hash will be added to the bucket name to increase uniqueness.", OPTION_FLAG, 'd', & o->dont_suffix }, {0, "S3.s3-compatible", "to be selected when using S3 compatible storage", OPTION_FLAG, 'd', & o->s3_compatible }, @@ -84,7 +88,7 @@ static void def_file_name(s3_options_t * o, char * out_name, char const * path){ static void def_bucket_name(s3_options_t * o, char * out_name, char const * path){ // S3_MAX_BUCKET_NAME_SIZE - if (o->bucket_per_dir){ + if (o->bucket_per_file){ out_name += sprintf(out_name, "%s-", o->bucket_prefix_cur); } @@ -103,7 +107,7 @@ static void def_bucket_name(s3_options_t * o, char * out_name, char const * path } struct data_handling{ - char * buf; + IOR_size_t * buf; int64_t size; }; @@ -126,24 +130,10 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro } #define CHECK_ERROR if (s3status != S3StatusOK){ \ - EWARNF("S3 \"%s\": %s - %s", S3_get_status_name(s3status), s3error.message, s3error.furtherDetails); \ + EWARNF("S3 %s:%d \"%s\": %s - %s", __FUNCTION__, __LINE__, S3_get_status_name(s3status), s3error.message, s3error.furtherDetails); \ } -static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData){ - printf("CALLBACK\n"); - struct data_handling * dh = (struct data_handling *) callbackData; - const int64_t size = dh->size > bufferSize ? bufferSize : dh->size; - if(size == 0) return 0; - memcpy(buffer, dh->buf, size); - dh->buf += size; - dh->size -= size; - - return size; -} - -static S3PutObjectHandler putObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & putObjectDataCallback }; - static S3ResponseHandler responseHandler = { &responsePropertiesCallback, &responseCompleteCallback }; static char * S3_getVersion() @@ -180,79 +170,54 @@ static int S3_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_op // use the number of bucket as files uint64_t buckets = 0; - S3_list_service(o->s3_protocol, o->access_key, o->secret_key, o->host, - NULL, & listhandler, & buckets); + S3_list_service(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, + o->authRegion, NULL, o->timeout, & listhandler, & buckets); stat->f_files = buckets; CHECK_ERROR return 0; } -static aiori_fd_t *S3_Create(char *testFileName, int iorflags, aiori_mod_opt_t * options) -{ - s3_options_t * o = (s3_options_t*) options; - - return (aiori_fd_t*) 1; +static S3Status S3multipart_handler(const char *upload_id, void *callbackData){ + *((char const**)(callbackData)) = upload_id; + return S3StatusOK; } -static aiori_fd_t *S3_Open(char *testFileName, int flags, aiori_mod_opt_t * options) -{ - s3_options_t * o = (s3_options_t*) options; +static S3MultipartInitialHandler multipart_handler = { {&responsePropertiesCallback, &responseCompleteCallback }, & S3multipart_handler}; - return (aiori_fd_t*) 1; +typedef struct{ + char * object; +} S3_fd_t; + +static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData){ + struct data_handling * dh = (struct data_handling *) callbackData; + const int64_t size = dh->size > bufferSize ? bufferSize : dh->size; + if(size == 0) return 0; + memcpy(buffer, dh->buf, size); + dh->buf = (IOR_size_t*) ((char*)(dh->buf) + size); + dh->size -= size; + + return size; } -static void S3_Close(aiori_fd_t *fd, aiori_mod_opt_t * options) -{ -} +static S3PutObjectHandler putObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & putObjectDataCallback }; -static void S3_Delete(char *path, aiori_mod_opt_t * options) +static aiori_fd_t *S3_Create(char *path, int iorflags, aiori_mod_opt_t * options) { + char * upload_id; s3_options_t * o = (s3_options_t*) options; char p[FILENAME_MAX]; def_file_name(o, p, path); - S3_delete_object(& o->bucket_context, p, NULL, & responseHandler, NULL); - CHECK_ERROR -} + S3_fd_t * fd = malloc(sizeof(S3_fd_t)); + fd->object = strdup(p); -static IOR_offset_t S3_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options){ - - return length; -} - -static int S3_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){ - s3_options_t * o = (s3_options_t*) options; - char p[FILENAME_MAX]; - - def_bucket_name(o, p, path); - if (o->bucket_per_dir){ - S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, p, S3CannedAclPrivate, o->locationConstraint, NULL, & responseHandler, NULL); - CHECK_ERROR - return 0; - }else{ + if(iorflags & IOR_CREAT){ struct data_handling dh = { .buf = NULL, .size = 0 }; - S3_put_object(& o->bucket_context, p, 0, NULL, NULL, & putObjectHandler, & dh); - if (! o->s3_compatible){ - CHECK_ERROR - } - return 0; - } -} - -static int S3_rmdir (const char *path, aiori_mod_opt_t * options){ - s3_options_t * o = (s3_options_t*) options; - char p[FILENAME_MAX]; - - def_bucket_name(o, p, path); - if (o->bucket_per_dir){ - S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, p, NULL, & responseHandler, NULL); + S3_put_object(& o->bucket_context, p, 0, NULL, NULL, o->timeout, &putObjectHandler, & dh); CHECK_ERROR - return 0; - }else{ - S3_delete_object(& o->bucket_context, p, NULL, & responseHandler, NULL); - CHECK_ERROR - return 0; } + + return (aiori_fd_t*) fd; } @@ -269,25 +234,135 @@ static S3Status statResponsePropertiesCallback(const S3ResponseProperties *prope static S3ResponseHandler statResponseHandler = { &statResponsePropertiesCallback, &responseCompleteCallback }; +static aiori_fd_t *S3_Open(char *path, int flags, aiori_mod_opt_t * options) +{ + if(flags & IOR_CREAT){ + return S3_Create(path, flags, options); + } + if(flags & IOR_WRONLY){ + WARN("S3 IOR_WRONLY is not supported"); + } + if(flags & IOR_RDWR){ + WARN("S3 IOR_RDWR is not supported"); + } + + s3_options_t * o = (s3_options_t*) options; + char p[FILENAME_MAX]; + def_file_name(o, p, path); + + struct stat buf; + S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, & buf); + if (s3status != S3StatusOK){ + return NULL; + } + + S3_fd_t * fd = malloc(sizeof(S3_fd_t)); + fd->object = strdup(p); + + return (aiori_fd_t*) fd; +} + +static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData){ + struct data_handling * dh = (struct data_handling *) callbackData; + const int64_t size = dh->size > bufferSize ? bufferSize : dh->size; + memcpy(dh->buf, buffer, size); + dh->buf = (IOR_size_t*) ((char*)(dh->buf) + size); + dh->size -= size; + + return S3StatusOK; +} + +static S3GetObjectHandler getObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & getObjectDataCallback }; + +static IOR_offset_t S3_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options){ + S3_fd_t * fd = (S3_fd_t *) afd; + struct data_handling dh = { .buf = buffer, .size = length }; + + s3_options_t * o = (s3_options_t*) options; + char p[FILENAME_MAX]; + sprintf(p, "%s-%ld-%ld", fd->object, (long) offset, (long) length); + if(access == WRITE){ + S3_put_object(& o->bucket_context, p, length, NULL, NULL, o->timeout, &putObjectHandler, & dh); + }else{ + S3_get_object(& o->bucket_context, p, NULL, 0, length, NULL, o->timeout, &getObjectHandler, & dh); + } + if (! o->s3_compatible){ + CHECK_ERROR + } + return length; +} + + +static void S3_Close(aiori_fd_t * afd, aiori_mod_opt_t * options) +{ + S3_fd_t * fd = (S3_fd_t *) afd; + free(fd->object); + free(afd); +} + +static void S3_Delete(char *path, aiori_mod_opt_t * options) +{ + s3_options_t * o = (s3_options_t*) options; + char p[FILENAME_MAX]; + def_file_name(o, p, path); + S3_delete_object(& o->bucket_context, p, NULL, o->timeout, & responseHandler, NULL); + CHECK_ERROR +} + +static int S3_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){ + s3_options_t * o = (s3_options_t*) options; + char p[FILENAME_MAX]; + + def_bucket_name(o, p, path); + if (o->bucket_per_file){ + S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, S3CannedAclPrivate, o->locationConstraint, NULL, o->timeout, & responseHandler, NULL); + CHECK_ERROR + return 0; + }else{ + struct data_handling dh = { .buf = NULL, .size = 0 }; + S3_put_object(& o->bucket_context, p, 0, NULL, NULL, o->timeout, & putObjectHandler, & dh); + if (! o->s3_compatible){ + CHECK_ERROR + } + return 0; + } +} + +static int S3_rmdir (const char *path, aiori_mod_opt_t * options){ + s3_options_t * o = (s3_options_t*) options; + char p[FILENAME_MAX]; + + def_bucket_name(o, p, path); + if (o->bucket_per_file){ + S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, NULL, o->timeout, & responseHandler, NULL); + CHECK_ERROR + return 0; + }else{ + S3_delete_object(& o->bucket_context, p, NULL, o->timeout, & responseHandler, NULL); + CHECK_ERROR + return 0; + } +} + static int S3_access (const char *path, int mode, aiori_mod_opt_t * options){ s3_options_t * o = (s3_options_t*) options; char p[FILENAME_MAX]; def_file_name(o, p, path); - S3_head_object(& o->bucket_context, p, NULL, & statResponseHandler, NULL); + S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, NULL); if (s3status != S3StatusOK){ return -1; } return 0; } -static int S3_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options){ +static int S3_stat(const char *path, struct stat *buf, aiori_mod_opt_t * options){ s3_options_t * o = (s3_options_t*) options; char p[FILENAME_MAX]; def_file_name(o, p, path); memset(buf, 0, sizeof(struct stat)); - S3_head_object(& o->bucket_context, p, NULL, & statResponseHandler, buf); + S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, buf); if (s3status != S3StatusOK){ return -1; } @@ -303,12 +378,18 @@ static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName static int S3_check_params(aiori_mod_opt_t * options){ + if(hints->blockSize != hints->transferSize){ + ERR("S3 Blocksize must be transferSize"); + } + return 0; } static void S3_init(aiori_mod_opt_t * options){ s3_options_t * o = (s3_options_t*) options; int ret = S3_initialize(NULL, S3_INIT_ALL, o->host); + if(ret != S3StatusOK) + FAIL("Could not initialize S3 library"); // create a bucket id based on access-key using a trivial checksumming if(! o->dont_suffix){ @@ -329,8 +410,8 @@ static void S3_init(aiori_mod_opt_t * options){ memset(& o->bucket_context, 0, sizeof(o->bucket_context)); o->bucket_context.hostName = o->host; o->bucket_context.bucketName = o->bucket_prefix_cur; - if (! o->bucket_per_dir){ - S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->bucket_context.bucketName, S3CannedAclPrivate, o->locationConstraint, NULL, & responseHandler, NULL); + if (! o->bucket_per_file){ + S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, o->bucket_context.bucketName, o->authRegion, S3CannedAclPrivate, o->locationConstraint, NULL, o->timeout, & responseHandler, NULL); CHECK_ERROR } diff --git a/testing/s3.sh b/testing/s3.sh index d1bf50b..2e79e29 100755 --- a/testing/s3.sh +++ b/testing/s3.sh @@ -5,20 +5,23 @@ ROOT="$(dirname ${BASH_SOURCE[0]})" TYPE="basic" -cd $ROOT - -if [[ ! -e minio ]] ; then +if [[ ! -e $ROOT/minio ]] ; then wget https://dl.min.io/server/minio/release/linux-amd64/minio - chmod +x minio + mv minio $ROOT + chmod +x $ROOT/minio fi export MINIO_ACCESS_KEY=accesskey export MINIO_SECRET_KEY=secretkey -./minio --quiet server /dev/shm & +$ROOT/minio --quiet server /dev/shm & +export IOR_EXTRA="-o test" +export MDTEST_EXTRA="-d test" source $ROOT/test-lib.sh -IOR 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey -MDTEST 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey + +I=100 # Start with this ID +IOR 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey -b $((10*1024*1024)) -t $((10*1024*1024)) +MDTEST 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey kill -9 %1 diff --git a/testing/test-lib.sh b/testing/test-lib.sh index 444873d..e35b245 100644 --- a/testing/test-lib.sh +++ b/testing/test-lib.sh @@ -40,7 +40,7 @@ I=0 function IOR(){ RANKS=$1 shift - WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/ior ${@} ${IOR_EXTRA} -o ${IOR_TMP}/ior" + WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/ior ${@} -o ${IOR_TMP}/ior ${IOR_EXTRA}" $WHAT 1>"${IOR_OUT}/test_out.$I" 2>&1 if [[ $? != 0 ]]; then echo -n "ERR" @@ -56,7 +56,7 @@ function MDTEST(){ RANKS=$1 shift rm -rf ${IOR_TMP}/mdest - WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/mdtest ${@} ${MDTEST_EXTRA} -d ${IOR_TMP}/mdest -V=4" + WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/mdtest ${@} -d ${IOR_TMP}/mdest ${MDTEST_EXTRA} -V=4" $WHAT 1>"${IOR_OUT}/test_out.$I" 2>&1 if [[ $? != 0 ]]; then echo -n "ERR"