From af2429b47e9bdc8ba9dd9992083d03e94f09a9d1 Mon Sep 17 00:00:00 2001 From: Julian Kunkel Date: Thu, 13 Aug 2020 16:25:36 +0100 Subject: [PATCH] Fix s3-4c implementation (#247) * Ported S3-4c version to current AIORI interface. * S3-4c: add crypto dependency * S3: Store username/host in options. --- src/Makefile.am | 1 + src/aiori-S3-4c.c | 724 ++++++++++++++++++++-------------------------- src/ior.c | 1 - src/ior.h | 4 +- 4 files changed, 310 insertions(+), 420 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 9531850..0adbf32 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -105,6 +105,7 @@ extraLDADD += -lcurl extraLDADD += -lxml2 extraLDADD += -laws4c extraLDADD += -laws4c_extra +extraLDADD += -lcrypto endif if USE_S3_LIBS3_AIORI diff --git a/src/aiori-S3-4c.c b/src/aiori-S3-4c.c index 11cc549..f34fadb 100755 --- a/src/aiori-S3-4c.c +++ b/src/aiori-S3-4c.c @@ -91,16 +91,6 @@ #include #include -/* -#ifdef HAVE_LUSTRE_USER -#include -#endif -*/ - -#include "ior.h" -#include "aiori.h" -#include "iordef.h" - #include #include // from libxml2 @@ -109,20 +99,17 @@ #include "aws4c.h" // extended vers of "aws4c" lib for S3 via libcurl #include "aws4c_extra.h" // utilities, e.g. for parsing XML in responses +#include "ior.h" +#include "aiori.h" +#include "aiori-debug.h" +extern int rank; +extern MPI_Comm testComm; - -/* buffer is used to generate URLs, err_msgs, etc */ #define BUFF_SIZE 1024 -static char buff[BUFF_SIZE]; - const int ETAG_SIZE = 32; - CURLcode rc; -/* Any objects we create or delete will be under this bucket */ -const char* bucket_name = "ior"; - /* TODO: The following stuff goes into options! */ /* REST/S3 variables */ // CURL* curl; /* for libcurl "easy" fns (now managed by aws4c) */ @@ -130,6 +117,9 @@ const char* bucket_name = "ior"; # define IOR_CURL_NOCONTINUE 0x02 # define IOR_CURL_S3_EMC_EXT 0x04 /* allow EMC extensions to S3? */ +#define MAX_UPLOAD_ID_SIZE 256 /* TODO don't know the actual value */ + + #ifdef USE_S3_4C_AIORI # include # include "aws4c.h" @@ -138,29 +128,47 @@ const char* bucket_name = "ior"; typedef void IOBuf; /* unused, but needs a type */ #endif - IOBuf* io_buf; /* aws4c places parsed header values here */ - IOBuf* etags; /* accumulate ETags for N:1 parts */ + +typedef struct { + /* Any objects we create or delete will be under this bucket */ + char* bucket_name; + char* user; + char* host; + /* Runtime data, this data isn't yet safe to allow concurrent access to multiple files, only open one file at a time */ + int curl_flags; + IOBuf* io_buf; /* aws4c places parsed header values here */ + IOBuf* etags; /* accumulate ETags for N:1 parts */ + size_t part_number; + char UploadId[MAX_UPLOAD_ID_SIZE]; /* key for multi-part-uploads */ + int written; /* did we write to the file */ +} s3_options_t; /////////////////////////////////////////////// +static aiori_xfer_hint_t * hints = NULL; + +static void S3_xfer_hints(aiori_xfer_hint_t * params){ + hints = params; +} + /**************************** P R O T O T Y P E S *****************************/ -static void* S3_Create(char*, IOR_param_t*); -static void* S3_Open(char*, IOR_param_t*); -static IOR_offset_t S3_Xfer(int, void*, IOR_size_t*, IOR_offset_t, IOR_param_t*); -static void S3_Close(void*, IOR_param_t*); +static aiori_fd_t* S3_Create(char *path, int iorflags, aiori_mod_opt_t * options); +static aiori_fd_t* S3_Open(char *path, int flags, aiori_mod_opt_t * options); +static IOR_offset_t S3_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options); +static void S3_Close(aiori_fd_t * afd, aiori_mod_opt_t * options); -static void* EMC_Create(char*, IOR_param_t*); -static void* EMC_Open(char*, IOR_param_t*); -static IOR_offset_t EMC_Xfer(int, void*, IOR_size_t*, IOR_offset_t, IOR_param_t*); -static void EMC_Close(void*, IOR_param_t*); - -static void S3_Delete(char*, IOR_param_t*); -static void S3_Fsync(void*, IOR_param_t*); -static IOR_offset_t S3_GetFileSize(IOR_param_t*, MPI_Comm, char*); -static void S3_init(void * options); -static void S3_finalize(void * options); -static int S3_check_params(IOR_param_t *); +static aiori_fd_t* EMC_Create(char *path, int iorflags, aiori_mod_opt_t * options); +static aiori_fd_t* EMC_Open(char *path, int flags, aiori_mod_opt_t * options); +static IOR_offset_t EMC_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options); +static void EMC_Close(aiori_fd_t * afd, aiori_mod_opt_t * options); +static void S3_Delete(char *path, aiori_mod_opt_t * options); +static void S3_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * options); +static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName); +static void S3_init(aiori_mod_opt_t * options); +static void S3_finalize(aiori_mod_opt_t * options); +static int S3_check_params(aiori_mod_opt_t * options); +static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values); /************************** D E C L A R A T I O N S ***************************/ @@ -173,6 +181,7 @@ ior_aiori_t s3_4c_aiori = { .create = S3_Create, .open = S3_Open, .xfer = S3_Xfer, + .xfer_hints = S3_xfer_hints, .close = S3_Close, .delete = S3_Delete, .get_version = aiori_get_version, @@ -180,7 +189,9 @@ ior_aiori_t s3_4c_aiori = { .get_file_size = S3_GetFileSize, .initialize = S3_init, .finalize = S3_finalize, - .check_params = S3_check_params + .check_params = S3_check_params, + .get_options = S3_options, + .enable_mdtest = true }; // "S3", plus EMC-extensions enabled @@ -193,7 +204,7 @@ ior_aiori_t s3_plus_aiori = { .xfer = S3_Xfer, .close = S3_Close, .delete = S3_Delete, - .set_version = S3_SetVersion, + .get_version = aiori_get_version, .fsync = S3_Fsync, .get_file_size = S3_GetFileSize, .initialize = S3_init, @@ -210,7 +221,7 @@ ior_aiori_t s3_emc_aiori = { .xfer = EMC_Xfer, .close = EMC_Close, .delete = S3_Delete, - .set_version = S3_SetVersion, + .get_version = aiori_get_version, .fsync = S3_Fsync, .get_file_size = S3_GetFileSize, .initialize = S3_init, @@ -218,26 +229,50 @@ ior_aiori_t s3_emc_aiori = { }; -static void S3_init(void * options){ +static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){ + s3_options_t * o = malloc(sizeof(s3_options_t)); + if (init_values != NULL){ + memcpy(o, init_values, sizeof(s3_options_t)); + }else{ + memset(o, 0, sizeof(s3_options_t)); + } + + *init_backend_options = (aiori_mod_opt_t*) o; + o->bucket_name = "ior"; + + option_help h [] = { + {0, "S3-4c.user", "The username (in ~/.awsAuth).", OPTION_OPTIONAL_ARGUMENT, 's', & o->user}, + {0, "S3-4C.host", "The host optionally followed by:port.", OPTION_OPTIONAL_ARGUMENT, 's', & o->host}, + {0, "S3-4c.bucket-name", "The name of the bucket.", OPTION_OPTIONAL_ARGUMENT, 's', & o->bucket_name}, + LAST_OPTION + }; + option_help * help = malloc(sizeof(h)); + memcpy(help, h, sizeof(h)); + return help; +} + + +static void S3_init(aiori_mod_opt_t * options){ /* This is supposed to be done before *any* threads are created. * Could MPI_Init() create threads (or call multi-threaded * libraries)? We'll assume so. */ AWS4C_CHECK( aws_init() ); } -static void S3_finalize(void * options){ +static void S3_finalize(aiori_mod_opt_t * options){ /* done once per program, after exiting all threads. * NOTE: This fn doesn't return a value that can be checked for success. */ aws_cleanup(); } -static int S3_check_params(IOR_param_t * test){ +static int S3_check_params(aiori_mod_opt_t * test){ + if(! hints) return 0; /* N:1 and N:N */ - IOR_offset_t NtoN = test->filePerProc; + IOR_offset_t NtoN = hints->filePerProc; IOR_offset_t Nto1 = ! NtoN; - IOR_offset_t s = test->segmentCount; - IOR_offset_t t = test->transferSize; - IOR_offset_t b = test->blockSize; + IOR_offset_t s = hints->segmentCount; + IOR_offset_t t = hints->transferSize; + IOR_offset_t b = hints->blockSize; if (Nto1 && (s != 1) && (b != t)) { ERR("N:1 (strided) requires xfer-size == block-size"); @@ -292,15 +327,15 @@ static int S3_check_params(IOR_param_t * test){ */ -static void s3_connect( IOR_param_t* param ) { - if (param->verbose >= VERBOSE_2) { - printf("-> s3_connect\n"); /* DEBUGGING */ - } +static void s3_connect( s3_options_t* param ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> s3_connect\n"); /* DEBUGGING */ + //} if ( param->curl_flags & IOR_CURL_INIT ) { - if (param->verbose >= VERBOSE_2) { - printf("<- s3_connect [nothing to do]\n"); /* DEBUGGING */ - } + //if (param->verbose >= VERBOSE_2) { + // printf("<- s3_connect [nothing to do]\n"); /* DEBUGGING */ + //} return; } @@ -318,8 +353,8 @@ static void s3_connect( IOR_param_t* param ) { // NOTE: These inits could be done in init_IORParam_t(), in ior.c, but // would require conditional compilation, there. - aws_set_debug(param->verbose >= 4); - aws_read_config(getenv("USER")); // requires ~/.awsAuth + aws_set_debug(0); // param->verbose >= 4 + aws_read_config(param->user); // requires ~/.awsAuth aws_reuse_connections(1); // initialize IOBufs. These are basically dynamically-extensible @@ -346,8 +381,8 @@ static void s3_connect( IOR_param_t* param ) { // snprintf(buff, BUFF_SIZE, "10.140.0.%d", 15 + (rank % 4)); // s3_set_host(buff); - snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4)); - s3_set_host(buff); + //snprintf(options->buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4)); + //s3_set_host(options->buff); #else /* @@ -366,23 +401,25 @@ static void s3_connect( IOR_param_t* param ) { // s3_set_host( "10.143.0.1:80"); #endif + s3_set_host(param->host); + // make sure test-bucket exists - s3_set_bucket((char*)bucket_name); + s3_set_bucket((char*) param->bucket_name); if (rank == 0) { AWS4C_CHECK( s3_head(param->io_buf, "") ); if ( param->io_buf->code == 404 ) { // "404 Not Found" - printf(" bucket '%s' doesn't exist\n", bucket_name); + printf(" bucket '%s' doesn't exist\n", param->bucket_name); AWS4C_CHECK( s3_put(param->io_buf, "") ); /* creates URL as bucket + obj */ AWS4C_CHECK_OK( param->io_buf ); // assure "200 OK" - printf("created bucket '%s'\n", bucket_name); + printf("created bucket '%s'\n", param->bucket_name); } else { // assure "200 OK" AWS4C_CHECK_OK( param->io_buf ); } } - MPI_CHECK(MPI_Barrier(param->testComm), "barrier error"); + MPI_CHECK(MPI_Barrier(testComm), "barrier error"); // Maybe allow EMC extensions to S3 @@ -391,24 +428,22 @@ static void s3_connect( IOR_param_t* param ) { // don't perform these inits more than once param->curl_flags |= IOR_CURL_INIT; - - if (param->verbose >= VERBOSE_2) { - printf("<- s3_connect [success]\n"); - } + //if (param->verbose >= VERBOSE_2) { + // printf("<- s3_connect [success]\n"); + //} } static void -s3_disconnect( IOR_param_t* param ) { - if (param->verbose >= VERBOSE_2) { - printf("-> s3_disconnect\n"); - } - +s3_disconnect( s3_options_t* param ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> s3_disconnect\n"); + //} // nothing to do here, if using new aws4c ... - if (param->verbose >= VERBOSE_2) { - printf("<- s3_disconnect\n"); - } + //if (param->verbose >= VERBOSE_2) { + // printf("<- s3_disconnect\n"); + //} } @@ -416,8 +451,7 @@ s3_disconnect( IOR_param_t* param ) { // After finalizing an S3 multi-part-upload, you must reset some things // before you can use multi-part-upload again. This will also avoid (one // particular set of) memory-leaks. -void -s3_MPU_reset(IOR_param_t* param) { +void s3_MPU_reset(s3_options_t* param) { aws_iobuf_reset(param->io_buf); aws_iobuf_reset(param->etags); param->part_number = 0; @@ -453,46 +487,44 @@ s3_MPU_reset(IOR_param_t* param) { * */ -static -void * -S3_Create_Or_Open_internal(char* testFileName, - IOR_param_t* param, - unsigned char createFile, - int multi_part_upload_p ) { +static aiori_fd_t * S3_Create_Or_Open_internal(char* testFileName, int openFlags, s3_options_t* param, int multi_part_upload_p ) { + unsigned char createFile = openFlags & IOR_CREAT; - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Create_Or_Open('%s', ,%d, %d)\n", - testFileName, createFile, multi_part_upload_p); - } + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_Create_Or_Open('%s', ,%d, %d)\n", + // testFileName, createFile, multi_part_upload_p); + //} /* initialize curl, if needed */ s3_connect( param ); /* Check for unsupported flags */ - if ( param->openFlags & IOR_EXCL ) { - fprintf( stdout, "Opening in Exclusive mode is not implemented in S3\n" ); - } - if ( param->useO_DIRECT == TRUE ) { - fprintf( stdout, "Direct I/O mode is not implemented in S3\n" ); - } + //if ( param->openFlags & IOR_EXCL ) { + // fprintf( stdout, "Opening in Exclusive mode is not implemented in S3\n" ); + //} + //if ( param->useO_DIRECT == TRUE ) { + // fprintf( stdout, "Direct I/O mode is not implemented in S3\n" ); + //} // easier to think - int n_to_n = param->filePerProc; + int n_to_n = hints->filePerProc; int n_to_1 = ! n_to_n; /* check whether object needs reset to zero-length */ int needs_reset = 0; if (! multi_part_upload_p) needs_reset = 1; /* so "append" can work */ - else if ( param->openFlags & IOR_TRUNC ) + else if ( openFlags & IOR_TRUNC ) needs_reset = 1; /* so "append" can work */ else if (createFile) { // AWS4C_CHECK( s3_head(param->io_buf, testFileName) ); // if ( ! AWS4C_OK(param->io_buf) ) needs_reset = 1; } - - if ( param->open == WRITE ) { + char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */ + param->written = 0; + if ( openFlags & IOR_WRONLY || openFlags & IOR_RDWR ) { + param->written = 1; /* initializations for N:1 or N:N writes using multi-part upload */ if (multi_part_upload_p) { @@ -522,23 +554,21 @@ S3_Create_Or_Open_internal(char* testFileName, response->first->len, NULL, NULL, 0); if (doc == NULL) - ERR_SIMPLE("Rank0 Failed to find POST response\n"); + ERR("Rank0 Failed to find POST response\n"); // navigate parsed XML-tree to find UploadId xmlNode* root_element = xmlDocGetRootElement(doc); const char* upload_id = find_element_named(root_element, (char*)"UploadId"); if (! upload_id) - ERR_SIMPLE("couldn't find 'UploadId' in returned XML\n"); + ERR("couldn't find 'UploadId' in returned XML\n"); - if (param->verbose >= VERBOSE_3) - printf("got UploadId = '%s'\n", upload_id); + //if (param->verbose >= VERBOSE_3) + // printf("got UploadId = '%s'\n", upload_id); const size_t upload_id_len = strlen(upload_id); if (upload_id_len > MAX_UPLOAD_ID_SIZE) { - snprintf(buff, BUFF_SIZE, - "UploadId length %d exceeds expected max (%d)", - upload_id_len, MAX_UPLOAD_ID_SIZE); - ERR_SIMPLE(buff); + snprintf(buff, BUFF_SIZE, "UploadId length %zd exceeds expected max (%d)", upload_id_len, MAX_UPLOAD_ID_SIZE); + ERR(buff); } // save the UploadId we found @@ -551,16 +581,15 @@ S3_Create_Or_Open_internal(char* testFileName, // For N:1, share UploadId across all ranks if (n_to_1) - MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm); + MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, testComm); } else // N:1, and we're not rank0. recv UploadID from Rank 0 - MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm); + MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, testComm); } /* initializations for N:N or N:1 writes using EMC byte-range extensions */ else { - /* maybe reset to zero-length, so "append" can work */ if (needs_reset) { @@ -576,84 +605,48 @@ S3_Create_Or_Open_internal(char* testFileName, } } - - if (param->verbose >= VERBOSE_2) { - printf("<- S3_Create_Or_Open\n"); - } - return ((void *) testFileName ); + //if (param->verbose >= VERBOSE_2) { + // printf("<- S3_Create_Or_Open\n"); + //} + return ((aiori_fd_t *) testFileName ); } +static aiori_fd_t * S3_Create( char *testFileName, int iorflags, aiori_mod_opt_t * param ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_Create\n"); + //} - -static -void * -S3_Create( char *testFileName, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Create\n"); - } - - if (param->verbose >= VERBOSE_2) { - printf("<- S3_Create\n"); - } - return S3_Create_Or_Open_internal( testFileName, param, TRUE, TRUE ); -} -static -void * -EMC_Create( char *testFileName, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_2) { - printf("-> EMC_Create\n"); - } - - if (param->verbose >= VERBOSE_2) { - printf("<- EMC_Create\n"); - } - return S3_Create_Or_Open_internal( testFileName, param, TRUE, FALSE ); + //if (param->verbose >= VERBOSE_2) { + // printf("<- S3_Create\n"); + //} + return S3_Create_Or_Open_internal( testFileName, iorflags, (s3_options_t*) param, TRUE ); } +static aiori_fd_t * EMC_Create( char *testFileName, int iorflags, aiori_mod_opt_t * param ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> EMC_Create\n"); + //} - - - - -static -void * -S3_Open( char *testFileName, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Open\n"); - } - - if ( param->openFlags & IOR_CREAT ) { - if (param->verbose >= VERBOSE_2) { - printf("<- S3_Open( ... TRUE)\n"); - } - return S3_Create_Or_Open_internal( testFileName, param, TRUE, TRUE ); - } - else { - if (param->verbose >= VERBOSE_2) { - printf("<- S3_Open( ... FALSE)\n"); - } - return S3_Create_Or_Open_internal( testFileName, param, FALSE, TRUE ); - } + //if (param->verbose >= VERBOSE_2) { + // printf("<- EMC_Create\n"); + //} + return S3_Create_Or_Open_internal( testFileName, iorflags, (s3_options_t*) param, FALSE ); } -static -void * -EMC_Open( char *testFileName, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Open\n"); - } - if ( param->openFlags & IOR_CREAT ) { - if (param->verbose >= VERBOSE_2) { - printf("<- EMC_Open( ... TRUE)\n"); - } - return S3_Create_Or_Open_internal( testFileName, param, TRUE, FALSE ); - } - else { - if (param->verbose >= VERBOSE_2) { - printf("<- EMC_Open( ... FALSE)\n"); - } - return S3_Create_Or_Open_internal( testFileName, param, FALSE, FALSE ); - } +static aiori_fd_t * S3_Open( char *testFileName, int flags, aiori_mod_opt_t * param ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_Open\n"); + //} + + return S3_Create_Or_Open_internal( testFileName, flags, (s3_options_t*) param, TRUE ); +} + +static aiori_fd_t * EMC_Open( char *testFileName, int flags, aiori_mod_opt_t * param ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_Open\n"); + //} + + return S3_Create_Or_Open_internal( testFileName, flags, (s3_options_t*) param, FALSE ); } @@ -730,39 +723,35 @@ EMC_Open( char *testFileName, IOR_param_t * param ) { */ -static -IOR_offset_t -S3_Xfer_internal(int access, - void* file, +static IOR_offset_t S3_Xfer_internal(int access, + aiori_fd_t* file, IOR_size_t* buffer, IOR_offset_t length, - IOR_param_t* param, + IOR_offset_t offset, + s3_options_t* param, int multi_part_upload_p ) { - - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n", - access, (char*)file, buffer, length, param); - } + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n", + // access, (char*)file, buffer, length, param); + //} char* fname = (char*)file; /* see NOTE above S3_Create_Or_Open() */ size_t remaining = (size_t)length; char* data_ptr = (char *)buffer; - off_t offset = param->offset; // easier to think - int n_to_n = param->filePerProc; + int n_to_n = hints->filePerProc; int n_to_1 = (! n_to_n); - int segmented = (param->segmentCount == 1); + int segmented = (hints->segmentCount == 1); if (access == WRITE) { /* WRITE */ - - if (verbose >= VERBOSE_3) { - fprintf( stdout, "rank %d writing length=%lld to offset %lld\n", - rank, - remaining, - param->offset + length - remaining); - } + //if (verbose >= VERBOSE_3) { + // fprintf( stdout, "rank %d writing length=%lld to offset %lld\n", + // rank, + // remaining, + // param->offset + length - remaining); + //} if (multi_part_upload_p) { @@ -790,11 +779,11 @@ S3_Xfer_internal(int access, size_t part_number; if (n_to_1) { if (segmented) { // segmented - size_t parts_per_rank = param->blockSize / param->transferSize; + size_t parts_per_rank = hints->blockSize / hints->transferSize; part_number = (rank * parts_per_rank) + param->part_number; } else // strided - part_number = (param->part_number * param->numTasks) + rank; + part_number = (param->part_number * hints->numTasks) + rank; } else part_number = param->part_number; @@ -804,14 +793,15 @@ S3_Xfer_internal(int access, // if (verbose >= VERBOSE_3) { // fprintf( stdout, "rank %d of %d writing (%s,%s) part_number %lld\n", // rank, - // param->numTasks, + // hints->numTasks, // (n_to_1 ? "N:1" : "N:N"), // (segmented ? "segmented" : "strided"), // part_number); // } + char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */ snprintf(buff, BUFF_SIZE, - "%s?partNumber=%d&uploadId=%s", + "%s?partNumber=%zd&uploadId=%s", fname, part_number, param->UploadId); // For performance, we append directly into the linked list @@ -838,16 +828,16 @@ S3_Xfer_internal(int access, // } // } - if (verbose >= VERBOSE_3) { - fprintf( stdout, "rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n", - rank, - param->numTasks, - (n_to_1 ? "N:1" : "N:N"), - (segmented ? "segmented" : "strided"), - offset, - part_number, - param->io_buf->eTag); // incl quote-marks at [0] and [len-1] - } + //if (verbose >= VERBOSE_3) { + // fprintf( stdout, "rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n", + // rank, + // hints->numTasks, + // (n_to_1 ? "N:1" : "N:N"), + // (segmented ? "segmented" : "strided"), + // offset, + // part_number, + // param->io_buf->eTag); // incl quote-marks at [0] and [len-1] + //} if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */ fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n", rank, ETAG_SIZE); @@ -862,9 +852,9 @@ S3_Xfer_internal(int access, param->io_buf->eTag +1, strlen(param->io_buf->eTag) -2); // DEBUGGING - if (verbose >= VERBOSE_4) { - printf("rank %d: part %d = ETag %s\n", rank, part_number, param->io_buf->eTag); - } + //if (verbose >= VERBOSE_4) { + // printf("rank %d: part %d = ETag %s\n", rank, part_number, param->io_buf->eTag); + //} // drop ptrs to , in param->io_buf aws_iobuf_reset(param->io_buf); @@ -885,7 +875,7 @@ S3_Xfer_internal(int access, // than empty storage. aws_iobuf_reset(param->io_buf); aws_iobuf_append_static(param->io_buf, data_ptr, remaining); - AWS4C_CHECK ( s3_put(param->io_buf, file) ); + AWS4C_CHECK ( s3_put(param->io_buf, (char*) file) ); AWS4C_CHECK_OK( param->io_buf ); // drop ptrs to , in param->io_buf @@ -893,18 +883,18 @@ S3_Xfer_internal(int access, } - if ( param->fsyncPerWrite == TRUE ) { + if ( hints->fsyncPerWrite == TRUE ) { WARN("S3 doesn't support 'fsync'" ); /* does it? */ } } else { /* READ or CHECK */ - if (verbose >= VERBOSE_3) { - fprintf( stdout, "rank %d reading from offset %lld\n", - rank, - param->offset + length - remaining ); - } + //if (verbose >= VERBOSE_3) { + // fprintf( stdout, "rank %d reading from offset %lld\n", + // rank, + // hints->offset + length - remaining ); + //} // read specific byte-range from the object // [This is included in the "pure" S3 spec.] @@ -917,43 +907,45 @@ S3_Xfer_internal(int access, // libcurl writefunction, invoked via aws4c. aws_iobuf_reset(param->io_buf); aws_iobuf_extend_static(param->io_buf, data_ptr, remaining); - AWS4C_CHECK( s3_get(param->io_buf, file) ); + AWS4C_CHECK( s3_get(param->io_buf, (char*) file) ); if (param->io_buf->code != 206) { /* '206 Partial Content' */ + char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */ snprintf(buff, BUFF_SIZE, "Unexpected result (%d, '%s')", param->io_buf->code, param->io_buf->result); - ERR_SIMPLE(buff); + ERR(buff); } // drop refs to , in param->io_buf aws_iobuf_reset(param->io_buf); } - - if (param->verbose >= VERBOSE_2) { - printf("<- S3_Xfer\n"); - } + //if (verbose >= VERBOSE_2) { + // printf("<- S3_Xfer\n"); + //} return ( length ); } -static -IOR_offset_t -S3_Xfer(int access, - void* file, +static IOR_offset_t S3_Xfer(int access, + aiori_fd_t* file, IOR_size_t* buffer, IOR_offset_t length, - IOR_param_t* param ) { - S3_Xfer_internal(access, file, buffer, length, param, TRUE); + IOR_offset_t offset, + aiori_mod_opt_t* param ) { + S3_Xfer_internal(access, file, buffer, length, offset, (s3_options_t*) param, TRUE); } + + static IOR_offset_t EMC_Xfer(int access, - void* file, + aiori_fd_t* file, IOR_size_t* buffer, IOR_offset_t length, - IOR_param_t* param ) { - S3_Xfer_internal(access, file, buffer, length, param, FALSE); + IOR_offset_t offset, + aiori_mod_opt_t* param ) { + S3_Xfer_internal(access, file, buffer, length, offset, (s3_options_t*) param, FALSE); } @@ -992,16 +984,10 @@ EMC_Xfer(int access, * MPI_COMM_WORLD. */ -static -void -S3_Fsync( void *fd, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Fsync [no-op]\n"); - } - - if (param->verbose >= VERBOSE_2) { - printf("<- S3_Fsync\n"); - } +static void S3_Fsync( aiori_fd_t *fd, aiori_mod_opt_t * param ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_Fsync [no-op]\n"); + //} } @@ -1030,29 +1016,17 @@ S3_Fsync( void *fd, IOR_param_t * param ) { * See S3_Fsync() for some possible considerations. */ -static -void -S3_Close_internal( void* fd, - IOR_param_t* param, - int multi_part_upload_p ) { +static void S3_Close_internal(aiori_fd_t* fd, s3_options_t* param, int multi_part_upload_p) { char* fname = (char*)fd; /* see NOTE above S3_Create_Or_Open() */ // easier to think - int n_to_n = param->filePerProc; + int n_to_n = hints->filePerProc; int n_to_1 = (! n_to_n); - int segmented = (param->segmentCount == 1); - - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Close('%s', ,%d) %s\n", - fname, - multi_part_upload_p, - ((n_to_n) ? "N:N" : ((segmented) ? "N:1(seg)" : "N:1(str)"))); - } - - if (param->open == WRITE) { + int segmented = (hints->segmentCount == 1); + if (param->written) { // finalizing Multi-Part Upload (for N:1 or N:N) if (multi_part_upload_p) { @@ -1078,11 +1052,11 @@ S3_Close_internal( void* fd, // Everybody should have the same number of ETags (?) size_t etag_count_max = 0; /* highest number on any proc */ MPI_Allreduce(&etags_per_rank, &etag_count_max, - 1, mpi_size_t, MPI_MAX, param->testComm); + 1, mpi_size_t, MPI_MAX, testComm); if (etags_per_rank != etag_count_max) { - printf("Rank %d: etag count mismatch: max:%d, mine:%d\n", + printf("Rank %d: etag count mismatch: max:%zd, mine:%zd\n", rank, etag_count_max, etags_per_rank); - MPI_Abort(param->testComm, 1); + MPI_Abort(testComm, 1); } // collect ETag data at Rank0 @@ -1095,26 +1069,25 @@ S3_Close_internal( void* fd, int j; int rnk; - char* etag_vec = (char*)malloc((param->numTasks * etag_data_size) +1); + char* etag_vec = (char*)malloc((hints->numTasks * etag_data_size) +1); if (! etag_vec) { - fprintf(stderr, "rank 0 failed to malloc %d bytes\n", - param->numTasks * etag_data_size); - MPI_Abort(param->testComm, 1); + fprintf(stderr, "rank 0 failed to malloc %zd bytes\n", + hints->numTasks * etag_data_size); + MPI_Abort(testComm, 1); } MPI_Gather(etag_data, etag_data_size, MPI_BYTE, etag_vec, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); // --- debugging: show the gathered etag data // (This shows the raw concatenated etag-data from each node.) - if (param->verbose >= VERBOSE_4) { - - printf("rank 0: gathered %d etags from all ranks:\n", etags_per_rank); + if (verbose >= VERBOSE_4) { + printf("rank 0: gathered %zd etags from all ranks:\n", etags_per_rank); etag_ptr=etag_vec; - for (rnk=0; rnknumTasks; ++rnk) { + for (rnk=0; rnk < hints->numTasks; ++rnk) { printf("\t[%d]: '", rnk); int ii; - for (ii=0; iinumTasks; + i_max = hints->numTasks; j_max = etags_per_rank; start_multiplier = etag_data_size; /* one rank's-worth of Etag data */ stride = ETAG_SIZE; /* one ETag */ } else { // strided i_max = etags_per_rank; - j_max = param->numTasks; + j_max = hints->numTasks; start_multiplier = ETAG_SIZE; /* one ETag */ stride = etag_data_size; /* one rank's-worth of Etag data */ } @@ -1203,7 +1176,7 @@ S3_Close_internal( void* fd, char etag[ETAG_SIZE +1]; memcpy(etag, etag_ptr, ETAG_SIZE); etag[ETAG_SIZE] = 0; - + char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */ // write XML for next part, with Etag ... snprintf(buff, BUFF_SIZE, " \n" @@ -1221,15 +1194,11 @@ S3_Close_internal( void* fd, // write XML tail ... aws_iobuf_append_str(xml, "\n"); - } - - else { + } else { MPI_Gather(etag_data, etag_data_size, MPI_BYTE, NULL, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); } - } - - else { /* N:N */ + } else { /* N:N */ xml = aws_iobuf_new(); aws_iobuf_growth_size(xml, 1024 * 8); @@ -1241,6 +1210,7 @@ S3_Close_internal( void* fd, char etag[ETAG_SIZE +1]; int part = 0; int i; + char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */ for (i=0; i\n"); } - - // send request to finalize MPU if (n_to_n || (rank == 0)) { // DEBUGGING: show the XML we constructed - if (param->verbose >= VERBOSE_3) + if (verbose >= VERBOSE_3) debug_iobuf(xml, 1, 1); - + char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */ // --- POST our XML to the server. snprintf(buff, BUFF_SIZE, "%s?uploadId=%s", @@ -1300,42 +1268,36 @@ S3_Close_internal( void* fd, // N:1 file until rank0 has finished the S3 multi-part finalize. // The object will not appear to exist, until then. if (n_to_1) - MPI_CHECK(MPI_Barrier(param->testComm), "barrier error"); - } - else { + MPI_CHECK(MPI_Barrier(testComm), "barrier error"); + } else { // No finalization is needed, when using EMC's byte-range writing // support. However, we do need to make sure everyone has // finished writing, before anyone starts reading. if (n_to_1) { - MPI_CHECK(MPI_Barrier(param->testComm), "barrier error"); - if (param->verbose >= VERBOSE_2) - printf("rank %d: passed barrier\n", rank); - } - } + MPI_CHECK(MPI_Barrier(testComm), "barrier error"); + //if (verbose >= VERBOSE_2) + // printf("rank %d: passed barrier\n", rank); + //} + } + } // After writing, reset the CURL connection, so that caches won't be // used for reads. aws_reset_connection(); } - - if (param->verbose >= VERBOSE_2) { - printf("<- S3_Close\n"); - } + //if (param->verbose >= VERBOSE_2) { + // printf("<- S3_Close\n"); + //} } -static -void -S3_Close( void* fd, - IOR_param_t* param ) { - S3_Close_internal(fd, param, TRUE); +static void S3_Close( aiori_fd_t* fd, aiori_mod_opt_t* param ) { + S3_Close_internal(fd, (s3_options_t*) param, TRUE); } -static -void -EMC_Close( void* fd, - IOR_param_t* param ) { - S3_Close_internal(fd, param, FALSE); + +static void EMC_Close( aiori_fd_t* fd, aiori_mod_opt_t* param ) { + S3_Close_internal(fd, (s3_options_t*) param, FALSE); } @@ -1349,13 +1311,36 @@ EMC_Close( void* fd, * successfully read. */ -static -void -S3_Delete( char *testFileName, IOR_param_t * param ) { +static void S3_Delete( char *testFileName, aiori_mod_opt_t * options ) { + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_Delete(%s)\n", testFileName); + //} + /* maybe initialize curl */ + s3_options_t * param = (s3_options_t*) options; + s3_connect(param ); - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Delete(%s)\n", testFileName); - } +#if 0 + // EMC BUG: If file was written with appends, and is deleted, + // Then any future recreation will result in an object that can't be read. + // this + AWS4C_CHECK( s3_delete(param->io_buf, testFileName) ); +#else + // just replace with a zero-length object for now + aws_iobuf_reset(param->io_buf); + AWS4C_CHECK ( s3_put(param->io_buf, testFileName) ); +#endif + + AWS4C_CHECK_OK( param->io_buf ); + //if (verbose >= VERBOSE_2) + // printf("<- S3_Delete\n"); +} + + +static void EMC_Delete( char *testFileName, aiori_mod_opt_t * options ) { + s3_options_t * param = (s3_options_t*) options; + //if (param->verbose >= VERBOSE_2) { + // printf("-> EMC_Delete(%s)\n", testFileName); + //} /* maybe initialize curl */ s3_connect( param ); @@ -1372,45 +1357,10 @@ S3_Delete( char *testFileName, IOR_param_t * param ) { #endif AWS4C_CHECK_OK( param->io_buf ); - - if (param->verbose >= VERBOSE_2) - printf("<- S3_Delete\n"); + //if (param->verbose >= VERBOSE_2) + // printf("<- EMC_Delete\n"); } - -static -void -EMC_Delete( char *testFileName, IOR_param_t * param ) { - - if (param->verbose >= VERBOSE_2) { - printf("-> EMC_Delete(%s)\n", testFileName); - } - - /* maybe initialize curl */ - s3_connect( param ); - -#if 0 - // EMC BUG: If file was written with appends, and is deleted, - // Then any future recreation will result in an object that can't be read. - // this - AWS4C_CHECK( s3_delete(param->io_buf, testFileName) ); -#else - // just replace with a zero-length object for now - aws_iobuf_reset(param->io_buf); - AWS4C_CHECK ( s3_put(param->io_buf, testFileName) ); -#endif - - AWS4C_CHECK_OK( param->io_buf ); - - if (param->verbose >= VERBOSE_2) - printf("<- EMC_Delete\n"); -} - - - - - - /* * HTTP HEAD returns meta-data for a "file". * @@ -1420,15 +1370,11 @@ EMC_Delete( char *testFileName, IOR_param_t * param ) { * request more data than the header actually takes? */ -static -IOR_offset_t -S3_GetFileSize(IOR_param_t * param, - MPI_Comm testComm, - char * testFileName) { - - if (param->verbose >= VERBOSE_2) { - printf("-> S3_GetFileSize(%s)\n", testFileName); - } +static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char * testFileName) { + s3_options_t * param = (s3_options_t*) options; + //if (param->verbose >= VERBOSE_2) { + // printf("-> S3_GetFileSize(%s)\n", testFileName); + //} IOR_offset_t aggFileSizeFromStat; /* i.e. "long long int" */ IOR_offset_t tmpMin, tmpMax, tmpSum; @@ -1442,63 +1388,9 @@ S3_GetFileSize(IOR_param_t * param, if ( ! AWS4C_OK(param->io_buf) ) { fprintf(stderr, "rank %d: couldn't stat '%s': %s\n", rank, testFileName, param->io_buf->result); - MPI_Abort(param->testComm, 1); + MPI_Abort(testComm, 1); } aggFileSizeFromStat = param->io_buf->contentLen; - if (param->verbose >= VERBOSE_2) { - printf("\trank %d: file-size %llu\n", rank, aggFileSizeFromStat); - } - - if ( param->filePerProc == TRUE ) { - if (param->verbose >= VERBOSE_2) { - printf("\tall-reduce (1)\n"); - } - MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, - &tmpSum, /* sum */ - 1, - MPI_LONG_LONG_INT, - MPI_SUM, - testComm ), - "cannot total data moved" ); - - aggFileSizeFromStat = tmpSum; - } - else { - if (param->verbose >= VERBOSE_2) { - printf("\tall-reduce (2a)\n"); - } - MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, - &tmpMin, /* min */ - 1, - MPI_LONG_LONG_INT, - MPI_MIN, - testComm ), - "cannot total data moved" ); - - if (param->verbose >= VERBOSE_2) { - printf("\tall-reduce (2b)\n"); - } - MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, - &tmpMax, /* max */ - 1, - MPI_LONG_LONG_INT, - MPI_MAX, - testComm ), - "cannot total data moved" ); - - if ( tmpMin != tmpMax ) { - if ( rank == 0 ) { - WARN( "inconsistent file size by different tasks" ); - } - - /* incorrect, but now consistent across tasks */ - aggFileSizeFromStat = tmpMin; - } - } - - if (param->verbose >= VERBOSE_2) { - printf("<- S3_GetFileSize [%llu]\n", aggFileSizeFromStat); - } return ( aggFileSizeFromStat ); } diff --git a/src/ior.c b/src/ior.c index 196f6ec..6601125 100755 --- a/src/ior.c +++ b/src/ior.c @@ -246,7 +246,6 @@ void init_IOR_Param_t(IOR_param_t * p) p->hdfs_block_size = 0; p->URI = NULL; - p->part_number = 0; } static void diff --git a/src/ior.h b/src/ior.h index ac258e0..843884d 100755 --- a/src/ior.h +++ b/src/ior.h @@ -168,9 +168,7 @@ typedef struct int hdfs_block_size; /* internal blk-size. (0 gets default) */ char* URI; /* "path" to target object */ - size_t part_number; /* multi-part upload increment (PER-RANK!) */ - char* UploadId; /* key for multi-part-uploads */ - + /* RADOS variables */ rados_t rados_cluster; /* RADOS cluster handle */ rados_ioctx_t rados_ioctx; /* I/O context for our pool in the RADOS cluster */