LibS3 version that stores fragments as one object each.

master
Julian M. Kunkel 2020-06-30 18:36:49 +01:00
parent 2a3838c360
commit 154cf2cde7
3 changed files with 170 additions and 86 deletions

View File

@ -1,5 +1,7 @@
/*
* S3 implementation using the newer libs3
* https://github.com/bji/libs3
* Use one object per file chunk
*/
#ifdef HAVE_CONFIG_H
@ -27,14 +29,16 @@ static void s3_xfer_hints(aiori_xfer_hint_t * params){
/************************** O P T I O N S *****************************/
typedef struct {
int bucket_per_dir;
int bucket_per_file;
char * access_key;
char * secret_key;
char * host;
char * bucket_prefix;
char * bucket_prefix_cur;
char * locationConstraint;
char * authRegion;
int timeout;
int dont_suffix;
int s3_compatible;
int use_ssl;
@ -54,7 +58,7 @@ static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_m
o->bucket_prefix = "ior";
option_help h [] = {
//{0, "S3.bucket-per-directory", "Use one bucket to map one dir, otherwise only one bucket is used.", OPTION_FLAG, 'd', & o->bucket_per_dir},
{0, "S3.bucket-per-file", "Use one bucket to map one file, otherwise only one bucket is used to store all files.", OPTION_FLAG, 'd', & o->bucket_per_file},
{0, "S3.bucket-name-prefix", "The name of the bucket (when using without -b), otherwise it is used as prefix.", OPTION_OPTIONAL_ARGUMENT, 's', & o->bucket_prefix},
{0, "S3.dont-suffix-bucket", "If not selected, then a hash will be added to the bucket name to increase uniqueness.", OPTION_FLAG, 'd', & o->dont_suffix },
{0, "S3.s3-compatible", "to be selected when using S3 compatible storage", OPTION_FLAG, 'd', & o->s3_compatible },
@ -84,7 +88,7 @@ static void def_file_name(s3_options_t * o, char * out_name, char const * path){
static void def_bucket_name(s3_options_t * o, char * out_name, char const * path){
// S3_MAX_BUCKET_NAME_SIZE
if (o->bucket_per_dir){
if (o->bucket_per_file){
out_name += sprintf(out_name, "%s-", o->bucket_prefix_cur);
}
@ -103,7 +107,7 @@ static void def_bucket_name(s3_options_t * o, char * out_name, char const * path
}
struct data_handling{
char * buf;
IOR_size_t * buf;
int64_t size;
};
@ -126,24 +130,10 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro
}
#define CHECK_ERROR if (s3status != S3StatusOK){ \
EWARNF("S3 \"%s\": %s - %s", S3_get_status_name(s3status), s3error.message, s3error.furtherDetails); \
EWARNF("S3 %s:%d \"%s\": %s - %s", __FUNCTION__, __LINE__, S3_get_status_name(s3status), s3error.message, s3error.furtherDetails); \
}
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData){
printf("CALLBACK\n");
struct data_handling * dh = (struct data_handling *) callbackData;
const int64_t size = dh->size > bufferSize ? bufferSize : dh->size;
if(size == 0) return 0;
memcpy(buffer, dh->buf, size);
dh->buf += size;
dh->size -= size;
return size;
}
static S3PutObjectHandler putObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & putObjectDataCallback };
static S3ResponseHandler responseHandler = { &responsePropertiesCallback, &responseCompleteCallback };
static char * S3_getVersion()
@ -180,79 +170,54 @@ static int S3_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_op
// use the number of bucket as files
uint64_t buckets = 0;
S3_list_service(o->s3_protocol, o->access_key, o->secret_key, o->host,
NULL, & listhandler, & buckets);
S3_list_service(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host,
o->authRegion, NULL, o->timeout, & listhandler, & buckets);
stat->f_files = buckets;
CHECK_ERROR
return 0;
}
static aiori_fd_t *S3_Create(char *testFileName, int iorflags, aiori_mod_opt_t * options)
{
s3_options_t * o = (s3_options_t*) options;
return (aiori_fd_t*) 1;
static S3Status S3multipart_handler(const char *upload_id, void *callbackData){
*((char const**)(callbackData)) = upload_id;
return S3StatusOK;
}
static aiori_fd_t *S3_Open(char *testFileName, int flags, aiori_mod_opt_t * options)
{
s3_options_t * o = (s3_options_t*) options;
static S3MultipartInitialHandler multipart_handler = { {&responsePropertiesCallback, &responseCompleteCallback }, & S3multipart_handler};
return (aiori_fd_t*) 1;
typedef struct{
char * object;
} S3_fd_t;
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData){
struct data_handling * dh = (struct data_handling *) callbackData;
const int64_t size = dh->size > bufferSize ? bufferSize : dh->size;
if(size == 0) return 0;
memcpy(buffer, dh->buf, size);
dh->buf = (IOR_size_t*) ((char*)(dh->buf) + size);
dh->size -= size;
return size;
}
static void S3_Close(aiori_fd_t *fd, aiori_mod_opt_t * options)
{
}
static S3PutObjectHandler putObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & putObjectDataCallback };
static void S3_Delete(char *path, aiori_mod_opt_t * options)
static aiori_fd_t *S3_Create(char *path, int iorflags, aiori_mod_opt_t * options)
{
char * upload_id;
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
S3_delete_object(& o->bucket_context, p, NULL, & responseHandler, NULL);
CHECK_ERROR
}
S3_fd_t * fd = malloc(sizeof(S3_fd_t));
fd->object = strdup(p);
static IOR_offset_t S3_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options){
return length;
}
static int S3_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_bucket_name(o, p, path);
if (o->bucket_per_dir){
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, p, S3CannedAclPrivate, o->locationConstraint, NULL, & responseHandler, NULL);
CHECK_ERROR
return 0;
}else{
if(iorflags & IOR_CREAT){
struct data_handling dh = { .buf = NULL, .size = 0 };
S3_put_object(& o->bucket_context, p, 0, NULL, NULL, & putObjectHandler, & dh);
if (! o->s3_compatible){
CHECK_ERROR
}
return 0;
}
}
static int S3_rmdir (const char *path, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_bucket_name(o, p, path);
if (o->bucket_per_dir){
S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, p, NULL, & responseHandler, NULL);
S3_put_object(& o->bucket_context, p, 0, NULL, NULL, o->timeout, &putObjectHandler, & dh);
CHECK_ERROR
return 0;
}else{
S3_delete_object(& o->bucket_context, p, NULL, & responseHandler, NULL);
CHECK_ERROR
return 0;
}
return (aiori_fd_t*) fd;
}
@ -269,25 +234,135 @@ static S3Status statResponsePropertiesCallback(const S3ResponseProperties *prope
static S3ResponseHandler statResponseHandler = { &statResponsePropertiesCallback, &responseCompleteCallback };
static aiori_fd_t *S3_Open(char *path, int flags, aiori_mod_opt_t * options)
{
if(flags & IOR_CREAT){
return S3_Create(path, flags, options);
}
if(flags & IOR_WRONLY){
WARN("S3 IOR_WRONLY is not supported");
}
if(flags & IOR_RDWR){
WARN("S3 IOR_RDWR is not supported");
}
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
struct stat buf;
S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, & buf);
if (s3status != S3StatusOK){
return NULL;
}
S3_fd_t * fd = malloc(sizeof(S3_fd_t));
fd->object = strdup(p);
return (aiori_fd_t*) fd;
}
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData){
struct data_handling * dh = (struct data_handling *) callbackData;
const int64_t size = dh->size > bufferSize ? bufferSize : dh->size;
memcpy(dh->buf, buffer, size);
dh->buf = (IOR_size_t*) ((char*)(dh->buf) + size);
dh->size -= size;
return S3StatusOK;
}
static S3GetObjectHandler getObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & getObjectDataCallback };
static IOR_offset_t S3_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options){
S3_fd_t * fd = (S3_fd_t *) afd;
struct data_handling dh = { .buf = buffer, .size = length };
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
sprintf(p, "%s-%ld-%ld", fd->object, (long) offset, (long) length);
if(access == WRITE){
S3_put_object(& o->bucket_context, p, length, NULL, NULL, o->timeout, &putObjectHandler, & dh);
}else{
S3_get_object(& o->bucket_context, p, NULL, 0, length, NULL, o->timeout, &getObjectHandler, & dh);
}
if (! o->s3_compatible){
CHECK_ERROR
}
return length;
}
static void S3_Close(aiori_fd_t * afd, aiori_mod_opt_t * options)
{
S3_fd_t * fd = (S3_fd_t *) afd;
free(fd->object);
free(afd);
}
static void S3_Delete(char *path, aiori_mod_opt_t * options)
{
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
S3_delete_object(& o->bucket_context, p, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR
}
static int S3_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_bucket_name(o, p, path);
if (o->bucket_per_file){
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, S3CannedAclPrivate, o->locationConstraint, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR
return 0;
}else{
struct data_handling dh = { .buf = NULL, .size = 0 };
S3_put_object(& o->bucket_context, p, 0, NULL, NULL, o->timeout, & putObjectHandler, & dh);
if (! o->s3_compatible){
CHECK_ERROR
}
return 0;
}
}
static int S3_rmdir (const char *path, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_bucket_name(o, p, path);
if (o->bucket_per_file){
S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR
return 0;
}else{
S3_delete_object(& o->bucket_context, p, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR
return 0;
}
}
static int S3_access (const char *path, int mode, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
S3_head_object(& o->bucket_context, p, NULL, & statResponseHandler, NULL);
S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, NULL);
if (s3status != S3StatusOK){
return -1;
}
return 0;
}
static int S3_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options){
static int S3_stat(const char *path, struct stat *buf, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
memset(buf, 0, sizeof(struct stat));
S3_head_object(& o->bucket_context, p, NULL, & statResponseHandler, buf);
S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, buf);
if (s3status != S3StatusOK){
return -1;
}
@ -303,12 +378,18 @@ static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName
static int S3_check_params(aiori_mod_opt_t * options){
if(hints->blockSize != hints->transferSize){
ERR("S3 Blocksize must be transferSize");
}
return 0;
}
static void S3_init(aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
int ret = S3_initialize(NULL, S3_INIT_ALL, o->host);
if(ret != S3StatusOK)
FAIL("Could not initialize S3 library");
// create a bucket id based on access-key using a trivial checksumming
if(! o->dont_suffix){
@ -329,8 +410,8 @@ static void S3_init(aiori_mod_opt_t * options){
memset(& o->bucket_context, 0, sizeof(o->bucket_context));
o->bucket_context.hostName = o->host;
o->bucket_context.bucketName = o->bucket_prefix_cur;
if (! o->bucket_per_dir){
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->bucket_context.bucketName, S3CannedAclPrivate, o->locationConstraint, NULL, & responseHandler, NULL);
if (! o->bucket_per_file){
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, o->bucket_context.bucketName, o->authRegion, S3CannedAclPrivate, o->locationConstraint, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR
}

View File

@ -5,20 +5,23 @@
ROOT="$(dirname ${BASH_SOURCE[0]})"
TYPE="basic"
cd $ROOT
if [[ ! -e minio ]] ; then
if [[ ! -e $ROOT/minio ]] ; then
wget https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x minio
mv minio $ROOT
chmod +x $ROOT/minio
fi
export MINIO_ACCESS_KEY=accesskey
export MINIO_SECRET_KEY=secretkey
./minio --quiet server /dev/shm &
$ROOT/minio --quiet server /dev/shm &
export IOR_EXTRA="-o test"
export MDTEST_EXTRA="-d test"
source $ROOT/test-lib.sh
IOR 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey
MDTEST 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey
I=100 # Start with this ID
IOR 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey -b $((10*1024*1024)) -t $((10*1024*1024))
MDTEST 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey
kill -9 %1

View File

@ -40,7 +40,7 @@ I=0
function IOR(){
RANKS=$1
shift
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/ior ${@} ${IOR_EXTRA} -o ${IOR_TMP}/ior"
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/ior ${@} -o ${IOR_TMP}/ior ${IOR_EXTRA}"
$WHAT 1>"${IOR_OUT}/test_out.$I" 2>&1
if [[ $? != 0 ]]; then
echo -n "ERR"
@ -56,7 +56,7 @@ function MDTEST(){
RANKS=$1
shift
rm -rf ${IOR_TMP}/mdest
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/mdtest ${@} ${MDTEST_EXTRA} -d ${IOR_TMP}/mdest -V=4"
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/mdtest ${@} -d ${IOR_TMP}/mdest ${MDTEST_EXTRA} -V=4"
$WHAT 1>"${IOR_OUT}/test_out.$I" 2>&1
if [[ $? != 0 ]]; then
echo -n "ERR"