Algorithms 'S3', 'S3_plus', and 'S3_EMC' all available.
These are variants on S3. S3 uses the "pure" S3 interface, e.g. using Multi-Part-Upload. The "plus" variant enables EMC-extensions in the aws4c library. This allows the N:N case to use "append", in the case where "transfer_size" != "block_size" for IOR. In pure S3, the N:N case will fail, because the EMC-extensions won't be enabled, and appending (which attempts to use the EMC byte-range tricks to do this) will throw an error. In the S3_EMC alg, N:1 uses EMCs other byte-range tricks to write different parts of an N:1 file, and also uses append to write the parts of an N:N file. Preliminary tests show these EMC extensions look to improve BW by ~20%. I put all three algs in aiori-S3.c, because it seemed some code was getting reused. Not sure if that's still going to make sense after the TBD, below. TBD: Recently realized that the "pure' S3 shouldn't be trying to use appends for anything. In the N:N case, it should just use MPU, within each file. Then, there's no need for S3_plus. We just have S3, which does MPU for all writes where transfer_size != block_size, and uses (standard) byte-range reads for reading. Then S3_EMC uses "appends for N:N writes, and byte-range writes for N:1 writes. This separates the code for the two algs a little more, but we might still want them in the same file.master
parent
2f066624f0
commit
b26f308191
379
src/aiori-S3.c
379
src/aiori-S3.c
|
@ -47,16 +47,49 @@
|
||||||
* OF SUCH DAMAGE.
|
* OF SUCH DAMAGE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/******************************************************************************\
|
/******************************************************************************
|
||||||
*
|
*
|
||||||
* Implement of abstract I/O interface for HDFS.
|
* Implementation of abstract IOR interface, for the Amazon S3 API.
|
||||||
*
|
* EMC/ViPR supports some useful extensions to S3, which we also implement
|
||||||
* HDFS has the added concept of a "File System Handle" which has to be
|
* here. There are 3 different mixes:
|
||||||
* connected before files are opened. We store this in the IOR_param_t
|
*
|
||||||
* object that is always passed to our functions. The thing that callers
|
* (1) "Pure S3" uses S3 "Multi-Part Upload" to do N:1 writes. N:N writes
|
||||||
* think of as the "fd" is an hdfsFile, (a pointer).
|
* fail, in the case where IOR "transfer-size" differs from
|
||||||
*
|
* "block-size', because this implies an "append", and append is not
|
||||||
\******************************************************************************/
|
* supported in S3. [TBD: The spec also says multi-part upload can't
|
||||||
|
* have any individual part greater than 5MB, or more then 10k total
|
||||||
|
* parts. Failing these conditions may produce obscure errors. Should
|
||||||
|
* we enforce? ]
|
||||||
|
*
|
||||||
|
* --> Select this option with the '-a S3' command-line arg to IOR
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* (2) "S3 + EMC append" uses S3 Multi-Part Upload for N:1, like pure S3,
|
||||||
|
* but also allows appends in the N:N case, via the EMC byte-range
|
||||||
|
* write support. This also does away with constraints on the number
|
||||||
|
* or size of parts to S3 Multi-Part Upload.
|
||||||
|
*
|
||||||
|
* --> Select this option with the '-a S3_plus' command-line arg to IOR.
|
||||||
|
* ior.c will then set the IOR_S3_EMC_EXT flag, which will cause
|
||||||
|
* s3_connect to initialize with EMC-extensions enabled.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* (3) "EMC S3 Extensions" uses the EMC byte-range support for N:1
|
||||||
|
* writes, eliminating Multi-Part Upload. EMC expects this will
|
||||||
|
* perform better than MPU, and it avoids some problems that are
|
||||||
|
* imposed by the S3 MPU spec. [See comments at EMC_Xfer().]
|
||||||
|
*
|
||||||
|
* --> Select this option with the '-a EMC_S3' command-line arg to IOR
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* NOTE: Putting EMC's S3-extensions in the same file with the S3 API
|
||||||
|
* allows us to share some code that would otherwise be duplicated
|
||||||
|
* (e.g. s3_connect(), etc). This should help us avoid losing bug
|
||||||
|
* fixes that are discovered in one interface or the other. In some
|
||||||
|
* cases, S3 is incapable of supporting all the needs of IOR. (For
|
||||||
|
* example, see notes about "append", above S3_Xfer().
|
||||||
|
*
|
||||||
|
******************************************************************************/
|
||||||
|
|
||||||
#ifdef HAVE_CONFIG_H
|
#ifdef HAVE_CONFIG_H
|
||||||
# include "config.h"
|
# include "config.h"
|
||||||
|
@ -88,18 +121,26 @@
|
||||||
|
|
||||||
|
|
||||||
/**************************** P R O T O T Y P E S *****************************/
|
/**************************** P R O T O T Y P E S *****************************/
|
||||||
static void *S3_Create(char *, IOR_param_t *);
|
static void* S3_Create(char*, IOR_param_t*);
|
||||||
static void *S3_Open(char *, IOR_param_t *);
|
static void* S3_Open(char*, IOR_param_t*);
|
||||||
static IOR_offset_t S3_Xfer(int, void *, IOR_size_t *,
|
static IOR_offset_t S3_Xfer(int, void*, IOR_size_t*, IOR_offset_t, IOR_param_t*);
|
||||||
IOR_offset_t, IOR_param_t *);
|
static void S3_Close(void*, IOR_param_t*);
|
||||||
static void S3_Close(void *, IOR_param_t *);
|
|
||||||
static void S3_Delete(char *, IOR_param_t *);
|
static void* EMC_Create(char*, IOR_param_t*);
|
||||||
static void S3_SetVersion(IOR_param_t *);
|
static void* EMC_Open(char*, IOR_param_t*);
|
||||||
static void S3_Fsync(void *, IOR_param_t *);
|
static IOR_offset_t EMC_Xfer(int, void*, IOR_size_t*, IOR_offset_t, IOR_param_t*);
|
||||||
static IOR_offset_t S3_GetFileSize(IOR_param_t *, MPI_Comm, char *);
|
static void EMC_Close(void*, IOR_param_t*);
|
||||||
|
|
||||||
|
static void S3_Delete(char*, IOR_param_t*);
|
||||||
|
static void S3_SetVersion(IOR_param_t*);
|
||||||
|
static void S3_Fsync(void*, IOR_param_t*);
|
||||||
|
static IOR_offset_t S3_GetFileSize(IOR_param_t*, MPI_Comm, char*);
|
||||||
|
|
||||||
/************************** D E C L A R A T I O N S ***************************/
|
/************************** D E C L A R A T I O N S ***************************/
|
||||||
|
|
||||||
|
// "Pure S3"
|
||||||
|
// N:1 writes use multi-part upload
|
||||||
|
// N:N fails if "transfer-size" != "block-size" (because that requires "append")
|
||||||
ior_aiori_t s3_aiori = {
|
ior_aiori_t s3_aiori = {
|
||||||
"S3",
|
"S3",
|
||||||
S3_Create,
|
S3_Create,
|
||||||
|
@ -112,6 +153,37 @@ ior_aiori_t s3_aiori = {
|
||||||
S3_GetFileSize
|
S3_GetFileSize
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// "S3", plus EMC-extensions enabled
|
||||||
|
// N:1 writes use multi-part upload
|
||||||
|
// N:N succeeds (because EMC-extensions support "append")
|
||||||
|
ior_aiori_t s3_plus_aiori = {
|
||||||
|
"S3_plus",
|
||||||
|
S3_Create,
|
||||||
|
S3_Open,
|
||||||
|
S3_Xfer,
|
||||||
|
S3_Close,
|
||||||
|
S3_Delete,
|
||||||
|
S3_SetVersion,
|
||||||
|
S3_Fsync,
|
||||||
|
S3_GetFileSize
|
||||||
|
};
|
||||||
|
|
||||||
|
// Use EMC-extensions for N:1 write, as well
|
||||||
|
// N:1 writes use EMC byte-range
|
||||||
|
// N:N succeeds because EMC-extensions support "append"
|
||||||
|
ior_aiori_t s3_emc_aiori = {
|
||||||
|
"S3_EMC",
|
||||||
|
EMC_Create,
|
||||||
|
EMC_Open,
|
||||||
|
EMC_Xfer,
|
||||||
|
EMC_Close,
|
||||||
|
S3_Delete,
|
||||||
|
S3_SetVersion,
|
||||||
|
S3_Fsync,
|
||||||
|
S3_GetFileSize
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/* modelled on similar macros in iordef.h */
|
/* modelled on similar macros in iordef.h */
|
||||||
#define CURL_ERR(MSG, CURL_ERRNO, PARAM) \
|
#define CURL_ERR(MSG, CURL_ERRNO, PARAM) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -214,7 +286,6 @@ s3_connect( IOR_param_t* param ) {
|
||||||
snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
|
snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
|
||||||
s3_set_host(buff);
|
s3_set_host(buff);
|
||||||
|
|
||||||
|
|
||||||
// make sure test-bucket exists
|
// make sure test-bucket exists
|
||||||
s3_set_bucket((char*)bucket_name);
|
s3_set_bucket((char*)bucket_name);
|
||||||
AWS4C_CHECK( s3_head(param->io_buf, "") );
|
AWS4C_CHECK( s3_head(param->io_buf, "") );
|
||||||
|
@ -229,6 +300,9 @@ s3_connect( IOR_param_t* param ) {
|
||||||
AWS4C_CHECK_OK( param->io_buf );
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Maybe allow EMC extensions to S3
|
||||||
|
s3_enable_EMC_extensions(param->curl_flags & IOR_CURL_S3_EMC_EXT);
|
||||||
|
|
||||||
// don't perform these inits more than once
|
// don't perform these inits more than once
|
||||||
param->curl_flags |= IOR_CURL_INIT;
|
param->curl_flags |= IOR_CURL_INIT;
|
||||||
|
|
||||||
|
@ -253,6 +327,18 @@ s3_disconnect( IOR_param_t* param ) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// After finalizing an S3 multi-part-upload, you must reset some things
|
||||||
|
// before you can use multi-part-upload again. This will also avoid (one
|
||||||
|
// particular set of) memory-leaks.
|
||||||
|
void
|
||||||
|
s3_MPU_reset(IOR_param_t* param) {
|
||||||
|
aws_iobuf_reset(param->io_buf);
|
||||||
|
aws_iobuf_reset(param->etags);
|
||||||
|
param->part_number = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* ---------------------------------------------------------------------------
|
/* ---------------------------------------------------------------------------
|
||||||
* direct support for the IOR S3 interface
|
* direct support for the IOR S3 interface
|
||||||
* ---------------------------------------------------------------------------
|
* ---------------------------------------------------------------------------
|
||||||
|
@ -282,9 +368,10 @@ s3_disconnect( IOR_param_t* param ) {
|
||||||
|
|
||||||
static
|
static
|
||||||
void *
|
void *
|
||||||
S3_Create_Or_Open(char* testFileName,
|
S3_Create_Or_Open_internal(char* testFileName,
|
||||||
IOR_param_t* param,
|
IOR_param_t* param,
|
||||||
unsigned char createFile ) {
|
unsigned char createFile,
|
||||||
|
int multi_part_upload_p ) {
|
||||||
|
|
||||||
if (param->verbose >= VERBOSE_2) {
|
if (param->verbose >= VERBOSE_2) {
|
||||||
printf("-> S3_Create_Or_Open\n");
|
printf("-> S3_Create_Or_Open\n");
|
||||||
|
@ -304,8 +391,10 @@ S3_Create_Or_Open(char* testFileName,
|
||||||
|
|
||||||
/* check whether object needs reset to zero-length */
|
/* check whether object needs reset to zero-length */
|
||||||
int needs_reset = 0;
|
int needs_reset = 0;
|
||||||
|
if (! multi_part_upload_p)
|
||||||
|
needs_reset = 1; /* so "append" can work */
|
||||||
if ( param->openFlags & IOR_TRUNC )
|
if ( param->openFlags & IOR_TRUNC )
|
||||||
needs_reset = 1;
|
needs_reset = 1; /* so "append" can work */
|
||||||
else if (createFile) {
|
else if (createFile) {
|
||||||
AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
|
AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
|
||||||
if ( ! AWS4C_OK(param->io_buf) )
|
if ( ! AWS4C_OK(param->io_buf) )
|
||||||
|
@ -318,18 +407,17 @@ S3_Create_Or_Open(char* testFileName,
|
||||||
/* initializations for N:N writes */
|
/* initializations for N:N writes */
|
||||||
if ( param->filePerProc ) {
|
if ( param->filePerProc ) {
|
||||||
|
|
||||||
/* maybe reset to zero-length */
|
/* maybe reset to zero-length, so "append" can work */
|
||||||
if (needs_reset) {
|
if (needs_reset) {
|
||||||
aws_iobuf_reset(param->io_buf);
|
aws_iobuf_reset(param->io_buf);
|
||||||
AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
|
AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
}
|
}
|
||||||
|
|
||||||
// MPI_CHECK(MPI_Barrier(param->testComm), "barrier error");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* initializations for N:1 writes */
|
/* initializations for N:1 writes using multi-part upload */
|
||||||
else {
|
else if (multi_part_upload_p) {
|
||||||
|
|
||||||
/* rank0 initiates multi-part upload. The response from the server
|
/* rank0 initiates multi-part upload. The response from the server
|
||||||
includes an "uploadId", which must be used by all ranks, when
|
includes an "uploadId", which must be used by all ranks, when
|
||||||
|
@ -346,6 +434,7 @@ S3_Create_Or_Open(char* testFileName,
|
||||||
snprintf(buff, BUFF_SIZE, "%s?uploads", testFileName);
|
snprintf(buff, BUFF_SIZE, "%s?uploads", testFileName);
|
||||||
IOBuf* response = aws_iobuf_new();
|
IOBuf* response = aws_iobuf_new();
|
||||||
AWS4C_CHECK( s3_post2(param->io_buf, buff, NULL, response) );
|
AWS4C_CHECK( s3_post2(param->io_buf, buff, NULL, response) );
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
|
|
||||||
// parse XML returned from server, into a tree structure
|
// parse XML returned from server, into a tree structure
|
||||||
aws_iobuf_realloc(response);
|
aws_iobuf_realloc(response);
|
||||||
|
@ -387,6 +476,17 @@ S3_Create_Or_Open(char* testFileName,
|
||||||
// recv UploadID from Rank 0
|
// recv UploadID from Rank 0
|
||||||
MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm);
|
MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* initializations for N:1 writes using EMC byte-range extensions */
|
||||||
|
else {
|
||||||
|
|
||||||
|
/* maybe reset to zero-length, so "append" can work */
|
||||||
|
if (needs_reset) {
|
||||||
|
aws_iobuf_reset(param->io_buf);
|
||||||
|
AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -397,6 +497,7 @@ S3_Create_Or_Open(char* testFileName,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static
|
static
|
||||||
void *
|
void *
|
||||||
S3_Create( char *testFileName, IOR_param_t * param ) {
|
S3_Create( char *testFileName, IOR_param_t * param ) {
|
||||||
|
@ -407,8 +508,25 @@ S3_Create( char *testFileName, IOR_param_t * param ) {
|
||||||
if (param->verbose >= VERBOSE_2) {
|
if (param->verbose >= VERBOSE_2) {
|
||||||
printf("<- S3_Create\n");
|
printf("<- S3_Create\n");
|
||||||
}
|
}
|
||||||
return S3_Create_Or_Open( testFileName, param, TRUE );
|
return S3_Create_Or_Open_internal( testFileName, param, TRUE, TRUE );
|
||||||
}
|
}
|
||||||
|
static
|
||||||
|
void *
|
||||||
|
EMC_Create( char *testFileName, IOR_param_t * param ) {
|
||||||
|
if (param->verbose >= VERBOSE_2) {
|
||||||
|
printf("-> EMC_Create\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (param->verbose >= VERBOSE_2) {
|
||||||
|
printf("<- EMC_Create\n");
|
||||||
|
}
|
||||||
|
return S3_Create_Or_Open_internal( testFileName, param, TRUE, FALSE );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static
|
static
|
||||||
void *
|
void *
|
||||||
|
@ -421,22 +539,54 @@ S3_Open( char *testFileName, IOR_param_t * param ) {
|
||||||
if (param->verbose >= VERBOSE_2) {
|
if (param->verbose >= VERBOSE_2) {
|
||||||
printf("<- S3_Open( ... TRUE)\n");
|
printf("<- S3_Open( ... TRUE)\n");
|
||||||
}
|
}
|
||||||
return S3_Create_Or_Open( testFileName, param, TRUE );
|
return S3_Create_Or_Open_internal( testFileName, param, TRUE, TRUE );
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (param->verbose >= VERBOSE_2) {
|
if (param->verbose >= VERBOSE_2) {
|
||||||
printf("<- S3_Open( ... FALSE)\n");
|
printf("<- S3_Open( ... FALSE)\n");
|
||||||
}
|
}
|
||||||
return S3_Create_Or_Open( testFileName, param, FALSE );
|
return S3_Create_Or_Open_internal( testFileName, param, FALSE, TRUE );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static
|
||||||
|
void *
|
||||||
|
EMC_Open( char *testFileName, IOR_param_t * param ) {
|
||||||
|
if (param->verbose >= VERBOSE_2) {
|
||||||
|
printf("-> S3_Open\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( param->openFlags & IOR_CREAT ) {
|
||||||
|
if (param->verbose >= VERBOSE_2) {
|
||||||
|
printf("<- EMC_Open( ... TRUE)\n");
|
||||||
|
}
|
||||||
|
return S3_Create_Or_Open_internal( testFileName, param, TRUE, FALSE );
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (param->verbose >= VERBOSE_2) {
|
||||||
|
printf("<- EMC_Open( ... FALSE)\n");
|
||||||
|
}
|
||||||
|
return S3_Create_Or_Open_internal( testFileName, param, FALSE, FALSE );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* transfer (more) data to an object. <file> is just the obj name.
|
* transfer (more) data to an object. <file> is just the obj name.
|
||||||
*
|
*
|
||||||
* For N:1, param->offset is understood as offset for a given client to
|
* For N:1, param->offset is understood as offset for a given client to
|
||||||
* write into the "file". This translates to a byte-range in the HTTP
|
* write into the "file". This translates to a byte-range in the HTTP
|
||||||
* request.
|
* request. Each write in the N:1 case is treated as a complete "part",
|
||||||
|
* so there is no such thing as a partial write.
|
||||||
|
*
|
||||||
|
* For N:N, when IOR "transfer-size" differs from "block-size", IOR treats
|
||||||
|
* Xfer as a partial write (i.e. there are multiple calls to XFER, to write
|
||||||
|
* any one of the "N" objects, as a series of "append" operations). This
|
||||||
|
* is not supported in S3/REST. Therefore, we depend on an EMC extension,
|
||||||
|
* in this case. This EMC extension allows appends using a byte-range
|
||||||
|
* header spec of "Range: bytes=-1-". aws4c now provides
|
||||||
|
* s3_enable_EMC_extensions(), to allow this behavior. If EMC-extensions
|
||||||
|
* are not enabled, the aws4c library will generate a run-time error, in
|
||||||
|
* this case.
|
||||||
*
|
*
|
||||||
* Each write-request returns an ETag which is a hash of the data. (The
|
* Each write-request returns an ETag which is a hash of the data. (The
|
||||||
* ETag could also be computed directly, if we wanted.) We must save the
|
* ETag could also be computed directly, if we wanted.) We must save the
|
||||||
|
@ -459,14 +609,48 @@ S3_Open( char *testFileName, IOR_param_t * param ) {
|
||||||
* request, to transfer any amount of data. (But see above, re EMC
|
* request, to transfer any amount of data. (But see above, re EMC
|
||||||
* support for "append".)
|
* support for "append".)
|
||||||
*/
|
*/
|
||||||
|
/* In the EMC case, instead of Multi-Part Upload we can use HTTP
|
||||||
|
* "byte-range" headers to write parts of a single object. This appears to
|
||||||
|
* have several advantages over the S3 MPU spec:
|
||||||
|
*
|
||||||
|
* (a) no need for a special "open" operation, to capture an "UploadID".
|
||||||
|
* Instead we simply write byte-ranges, and the server-side resolves
|
||||||
|
* any races, producing a single winner. In the IOR case, there should
|
||||||
|
* be no races, anyhow.
|
||||||
|
*
|
||||||
|
* (b) individual write operations don't have to refer to an ID, or to
|
||||||
|
* parse and save ETags returned from every write.
|
||||||
|
*
|
||||||
|
* (c) no need for a special "close" operation, in which all the saved
|
||||||
|
* ETags are gathered at a single rank, placed into XML, and shipped to
|
||||||
|
* the server, to finalize the MPU. That special close appears to
|
||||||
|
* impose two scaling problems: (1) requires all ETags to be shipped at
|
||||||
|
* the BW available to a single process, (1) requires either that they
|
||||||
|
* all fit into memory of a single process, or be written to disk
|
||||||
|
* (imposes additional BW contraints), or make a more-complex
|
||||||
|
* interaction with a threaded curl writefunction, to present the
|
||||||
|
* appearance of a single thread to curl, whilst allowing streaming
|
||||||
|
* reception of non-local ETags.
|
||||||
|
*
|
||||||
|
* (d) no constraints on the number or size of individual parts. (These
|
||||||
|
* exist in the S3 spec, the EMC's impl of the S3 multi-part upload is
|
||||||
|
* also free of these constraints.)
|
||||||
|
*
|
||||||
|
* Instead, parallel processes can write any number and/or size of updates,
|
||||||
|
* using a "byte-range" header. After each write returns, that part of the
|
||||||
|
* global object is visible to any reader. Places that are not updated
|
||||||
|
* read as zeros.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
static
|
static
|
||||||
IOR_offset_t
|
IOR_offset_t
|
||||||
S3_Xfer(int access,
|
S3_Xfer_internal(int access,
|
||||||
void* file,
|
void* file,
|
||||||
IOR_size_t* buffer,
|
IOR_size_t* buffer,
|
||||||
IOR_offset_t length,
|
IOR_offset_t length,
|
||||||
IOR_param_t* param) {
|
IOR_param_t* param,
|
||||||
|
int multi_part_upload_p ) {
|
||||||
|
|
||||||
if (param->verbose >= VERBOSE_2) {
|
if (param->verbose >= VERBOSE_2) {
|
||||||
printf("-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n",
|
printf("-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n",
|
||||||
|
@ -490,23 +674,22 @@ S3_Xfer(int access,
|
||||||
|
|
||||||
if (param->filePerProc) { // N:N
|
if (param->filePerProc) { // N:N
|
||||||
|
|
||||||
// DEBUGGING: can we use the new emc_put_append() to append to an object?
|
// NOTE: You must call 's3_enable_EMC_extensions(1)' to let this work:
|
||||||
s3_enable_EMC_extensions(1);
|
|
||||||
s3_set_byte_range(-1,-1); // produces header "Range: bytes=-1-"
|
s3_set_byte_range(-1,-1); // produces header "Range: bytes=-1-"
|
||||||
|
|
||||||
// For performance, we append <data_ptr> directly into the linked list
|
// For performance, we append <data_ptr> directly into the linked list
|
||||||
// of data in param->io_buf. We are "appending" rather than
|
// of data in param->io_buf. We are "appending" rather than
|
||||||
// "extending", so the added buffer is seen as written data, rather
|
// "extending", so the added buffer is seen as written data, rather
|
||||||
// than empty storage.
|
// than empty storage.
|
||||||
|
|
||||||
aws_iobuf_reset(param->io_buf);
|
aws_iobuf_reset(param->io_buf);
|
||||||
aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
|
aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
|
||||||
AWS4C_CHECK( s3_put(param->io_buf, file) );
|
AWS4C_CHECK( s3_put(param->io_buf, file) );
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
|
|
||||||
// drop ptrs to <data_ptr>, in param->io_buf
|
// drop ptrs to <data_ptr>, in param->io_buf
|
||||||
aws_iobuf_reset(param->io_buf);
|
aws_iobuf_reset(param->io_buf);
|
||||||
}
|
}
|
||||||
else { // N:1
|
else if (multi_part_upload_p) { // N:1 (with MPU)
|
||||||
|
|
||||||
// Ordering of the part-numbers imposes a global ordering on
|
// Ordering of the part-numbers imposes a global ordering on
|
||||||
// the components of the final object. param->part_number
|
// the components of the final object. param->part_number
|
||||||
|
@ -537,6 +720,7 @@ S3_Xfer(int access,
|
||||||
aws_iobuf_reset(param->io_buf);
|
aws_iobuf_reset(param->io_buf);
|
||||||
aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
|
aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
|
||||||
AWS4C_CHECK( s3_put(param->io_buf, buff) );
|
AWS4C_CHECK( s3_put(param->io_buf, buff) );
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
|
|
||||||
if (verbose >= VERBOSE_4) {
|
if (verbose >= VERBOSE_4) {
|
||||||
printf("rank %d: read ETag = '%s'\n", rank, param->io_buf->eTag);
|
printf("rank %d: read ETag = '%s'\n", rank, param->io_buf->eTag);
|
||||||
|
@ -562,6 +746,23 @@ S3_Xfer(int access,
|
||||||
// drop ptrs to <data_ptr>, in param->io_buf
|
// drop ptrs to <data_ptr>, in param->io_buf
|
||||||
aws_iobuf_reset(param->io_buf);
|
aws_iobuf_reset(param->io_buf);
|
||||||
}
|
}
|
||||||
|
else { // N:1 (use EMC's byte-range write support, instead of MPU)
|
||||||
|
|
||||||
|
// NOTE: You must call 's3_enable_EMC_extensions(1)' to let this work:
|
||||||
|
s3_set_byte_range(offset, remaining); // produces appropriate byte-range header
|
||||||
|
|
||||||
|
// For performance, we append <data_ptr> directly into the linked list
|
||||||
|
// of data in param->io_buf. We are "appending" rather than
|
||||||
|
// "extending", so the added buffer is seen as written data, rather
|
||||||
|
// than empty storage.
|
||||||
|
aws_iobuf_reset(param->io_buf);
|
||||||
|
aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
|
||||||
|
AWS4C_CHECK( s3_put(param->io_buf, file) );
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
|
|
||||||
|
// drop ptrs to <data_ptr>, in param->io_buf
|
||||||
|
aws_iobuf_reset(param->io_buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if ( param->fsyncPerWrite == TRUE ) {
|
if ( param->fsyncPerWrite == TRUE ) {
|
||||||
|
@ -589,6 +790,12 @@ S3_Xfer(int access,
|
||||||
aws_iobuf_reset(param->io_buf);
|
aws_iobuf_reset(param->io_buf);
|
||||||
aws_iobuf_extend_static(param->io_buf, data_ptr, remaining);
|
aws_iobuf_extend_static(param->io_buf, data_ptr, remaining);
|
||||||
AWS4C_CHECK( s3_get(param->io_buf, file) );
|
AWS4C_CHECK( s3_get(param->io_buf, file) );
|
||||||
|
if (param->io_buf->code != 206) { /* '206 Partial Content' */
|
||||||
|
snprintf(buff, BUFF_SIZE,
|
||||||
|
"ERROR: Unexpected result (%d, '%s') at %s, line %d\n",
|
||||||
|
param->io_buf->code, param->io_buf->result, __FILE__, __LINE__);
|
||||||
|
ERR_SIMPLE(buff);
|
||||||
|
}
|
||||||
|
|
||||||
// drop ptrs to <data_ptr>, in param->io_buf
|
// drop ptrs to <data_ptr>, in param->io_buf
|
||||||
aws_iobuf_reset(param->io_buf);
|
aws_iobuf_reset(param->io_buf);
|
||||||
|
@ -601,6 +808,30 @@ S3_Xfer(int access,
|
||||||
return ( length );
|
return ( length );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static
|
||||||
|
IOR_offset_t
|
||||||
|
S3_Xfer(int access,
|
||||||
|
void* file,
|
||||||
|
IOR_size_t* buffer,
|
||||||
|
IOR_offset_t length,
|
||||||
|
IOR_param_t* param ) {
|
||||||
|
S3_Xfer_internal(access, file, buffer, length, param, TRUE);
|
||||||
|
}
|
||||||
|
static
|
||||||
|
IOR_offset_t
|
||||||
|
EMC_Xfer(int access,
|
||||||
|
void* file,
|
||||||
|
IOR_size_t* buffer,
|
||||||
|
IOR_offset_t length,
|
||||||
|
IOR_param_t* param ) {
|
||||||
|
S3_Xfer_internal(access, file, buffer, length, param, FALSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Does this even mean anything, for HTTP/S3 ?
|
* Does this even mean anything, for HTTP/S3 ?
|
||||||
*
|
*
|
||||||
|
@ -673,7 +904,9 @@ S3_Fsync( void *fd, IOR_param_t * param ) {
|
||||||
|
|
||||||
static
|
static
|
||||||
void
|
void
|
||||||
S3_Close( void *fd, IOR_param_t * param ) {
|
S3_Close_internal( void* fd,
|
||||||
|
IOR_param_t* param,
|
||||||
|
int multi_part_upload_p ) {
|
||||||
|
|
||||||
if (param->verbose >= VERBOSE_2) {
|
if (param->verbose >= VERBOSE_2) {
|
||||||
printf("-> S3_Close\n");
|
printf("-> S3_Close\n");
|
||||||
|
@ -682,8 +915,13 @@ S3_Close( void *fd, IOR_param_t * param ) {
|
||||||
|
|
||||||
if (param->open == WRITE) {
|
if (param->open == WRITE) {
|
||||||
|
|
||||||
// closing N:1 write
|
// closing N:N write
|
||||||
if (!param->filePerProc) {
|
if (param->filePerProc) {
|
||||||
|
// nothing to do ...
|
||||||
|
}
|
||||||
|
|
||||||
|
// closing N:1 write (with Multi-Part Upload)
|
||||||
|
else if (multi_part_upload_p) {
|
||||||
|
|
||||||
MPI_Datatype mpi_size_t;
|
MPI_Datatype mpi_size_t;
|
||||||
if (sizeof(size_t) == sizeof(int))
|
if (sizeof(size_t) == sizeof(int))
|
||||||
|
@ -830,6 +1068,9 @@ S3_Close( void *fd, IOR_param_t * param ) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// --- reset associated info. allows another MPU, and frees memory.
|
||||||
|
s3_MPU_reset(param);
|
||||||
|
|
||||||
// Don't you non-zero ranks go trying to stat the N:1 file until
|
// Don't you non-zero ranks go trying to stat the N:1 file until
|
||||||
// rank0 has finished the S3 multi-part finalize. It will not appear
|
// rank0 has finished the S3 multi-part finalize. It will not appear
|
||||||
// to exist, until then.
|
// to exist, until then.
|
||||||
|
@ -847,8 +1088,28 @@ S3_Close( void *fd, IOR_param_t * param ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static
|
||||||
|
void
|
||||||
|
S3_Close( void* fd,
|
||||||
|
IOR_param_t* param ) {
|
||||||
|
S3_Close_internal(fd, param, TRUE);
|
||||||
|
}
|
||||||
|
static
|
||||||
|
void
|
||||||
|
EMC_Close( void* fd,
|
||||||
|
IOR_param_t* param ) {
|
||||||
|
S3_Close_internal(fd, param, FALSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Delete an object through the S3 interface.
|
* Delete an object through the S3 interface.
|
||||||
|
*
|
||||||
|
* The only reason we separate out EMC version, is because EMC bug means a
|
||||||
|
* file that was written with appends can't be deleted, recreated, and then
|
||||||
|
* successfully read.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static
|
static
|
||||||
|
@ -863,11 +1124,47 @@ S3_Delete( char *testFileName, IOR_param_t * param ) {
|
||||||
s3_connect( param );
|
s3_connect( param );
|
||||||
|
|
||||||
AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
|
AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
|
|
||||||
if (param->verbose >= VERBOSE_2)
|
if (param->verbose >= VERBOSE_2)
|
||||||
printf("<- S3_Delete\n");
|
printf("<- S3_Delete\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static
|
||||||
|
void
|
||||||
|
EMC_Delete( char *testFileName, IOR_param_t * param ) {
|
||||||
|
|
||||||
|
if (param->verbose >= VERBOSE_2) {
|
||||||
|
printf("-> EMC_Delete(%s)\n", testFileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* maybe initialize curl */
|
||||||
|
s3_connect( param );
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// EMC BUG: If file was written with appends, and is deleted,
|
||||||
|
// Then any future recreation will result in an object that can't be read.
|
||||||
|
// this
|
||||||
|
AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
|
||||||
|
#else
|
||||||
|
// just replace with a zero-length object for now
|
||||||
|
aws_iobuf_reset(param->io_buf);
|
||||||
|
AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
|
||||||
|
#endif
|
||||||
|
|
||||||
|
AWS4C_CHECK_OK( param->io_buf );
|
||||||
|
|
||||||
|
if (param->verbose >= VERBOSE_2)
|
||||||
|
printf("<- EMC_Delete\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Determine API version.
|
* Determine API version.
|
||||||
*/
|
*/
|
||||||
|
|
16
src/aiori.h
16
src/aiori.h
|
@ -63,13 +63,15 @@ typedef struct ior_aiori {
|
||||||
IOR_offset_t (*get_file_size)(IOR_param_t *, MPI_Comm, char *);
|
IOR_offset_t (*get_file_size)(IOR_param_t *, MPI_Comm, char *);
|
||||||
} ior_aiori_t;
|
} ior_aiori_t;
|
||||||
|
|
||||||
ior_aiori_t hdf5_aiori;
|
extern ior_aiori_t hdf5_aiori;
|
||||||
ior_aiori_t hdfs_aiori;
|
extern ior_aiori_t hdfs_aiori;
|
||||||
ior_aiori_t mpiio_aiori;
|
extern ior_aiori_t mpiio_aiori;
|
||||||
ior_aiori_t ncmpi_aiori;
|
extern ior_aiori_t ncmpi_aiori;
|
||||||
ior_aiori_t posix_aiori;
|
extern ior_aiori_t posix_aiori;
|
||||||
ior_aiori_t plfs_aiori;
|
extern ior_aiori_t plfs_aiori;
|
||||||
ior_aiori_t s3_aiori;
|
extern ior_aiori_t s3_aiori;
|
||||||
|
extern ior_aiori_t s3_plus_aiori;
|
||||||
|
extern ior_aiori_t s3_emc_aiori;
|
||||||
|
|
||||||
|
|
||||||
IOR_offset_t MPIIO_GetFileSize(IOR_param_t * test, MPI_Comm testComm,
|
IOR_offset_t MPIIO_GetFileSize(IOR_param_t * test, MPI_Comm testComm,
|
||||||
|
|
23
src/ior.c
23
src/ior.c
|
@ -73,6 +73,8 @@ ior_aiori_t *available_aiori[] = {
|
||||||
#endif
|
#endif
|
||||||
#ifdef USE_S3_AIORI
|
#ifdef USE_S3_AIORI
|
||||||
&s3_aiori,
|
&s3_aiori,
|
||||||
|
&s3_plus_aiori,
|
||||||
|
&s3_emc_aiori,
|
||||||
#endif
|
#endif
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
|
@ -245,7 +247,7 @@ void init_IOR_Param_t(IOR_param_t * p)
|
||||||
* Bind the global "backend" pointer to the requested backend AIORI's
|
* Bind the global "backend" pointer to the requested backend AIORI's
|
||||||
* function table.
|
* function table.
|
||||||
*/
|
*/
|
||||||
static void AioriBind(char *api)
|
static void AioriBind(char* api, IOR_param_t* param)
|
||||||
{
|
{
|
||||||
ior_aiori_t **tmp;
|
ior_aiori_t **tmp;
|
||||||
|
|
||||||
|
@ -260,6 +262,14 @@ static void AioriBind(char *api)
|
||||||
if (backend == NULL) {
|
if (backend == NULL) {
|
||||||
ERR("unrecognized IO API");
|
ERR("unrecognized IO API");
|
||||||
}
|
}
|
||||||
|
else if (! strncmp(api, "S3", 2)) {
|
||||||
|
if (! strcmp(api, "S3_plus"))
|
||||||
|
param->curl_flags |= IOR_CURL_S3_EMC_EXT;
|
||||||
|
else if (! strcmp(api, "S3_EMC"))
|
||||||
|
param->curl_flags |= IOR_CURL_S3_EMC_EXT;
|
||||||
|
else
|
||||||
|
param->curl_flags &= ~(IOR_CURL_S3_EMC_EXT);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1303,6 +1313,7 @@ static void PrintRemoveTiming(double start, double finish, int rep)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check for file(s), then remove all files if file-per-proc, else single file.
|
* Check for file(s), then remove all files if file-per-proc, else single file.
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
|
static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
|
||||||
{
|
{
|
||||||
|
@ -1322,6 +1333,12 @@ static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
|
||||||
GetTestFileName(testFileName, test);
|
GetTestFileName(testFileName, test);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// BUG: "access()" assumes a POSIX filesystem. Maybe use
|
||||||
|
// backend->get_file_size(), instead, (and catch
|
||||||
|
// errors), or extend the aiori struct to include
|
||||||
|
// something to safely check for existence of the
|
||||||
|
// "file".
|
||||||
|
//
|
||||||
if ((rank == 0) && (access(testFileName, F_OK) == 0)) {
|
if ((rank == 0) && (access(testFileName, F_OK) == 0)) {
|
||||||
backend->delete(testFileName, test);
|
backend->delete(testFileName, test);
|
||||||
}
|
}
|
||||||
|
@ -2016,7 +2033,7 @@ static void TestIoSys(IOR_test_t *test)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* bind I/O calls to specific API */
|
/* bind I/O calls to specific API */
|
||||||
AioriBind(params->api);
|
AioriBind(params->api, params);
|
||||||
|
|
||||||
/* show test setup */
|
/* show test setup */
|
||||||
if (rank == 0 && verbose >= VERBOSE_0)
|
if (rank == 0 && verbose >= VERBOSE_0)
|
||||||
|
@ -2314,7 +2331,7 @@ static void ValidTests(IOR_param_t * test)
|
||||||
|
|
||||||
init_IOR_Param_t(&defaults);
|
init_IOR_Param_t(&defaults);
|
||||||
/* get the version of the tests */
|
/* get the version of the tests */
|
||||||
AioriBind(test->api);
|
AioriBind(test->api, test);
|
||||||
backend->set_version(test);
|
backend->set_version(test);
|
||||||
|
|
||||||
if (test->repetitions <= 0)
|
if (test->repetitions <= 0)
|
||||||
|
|
|
@ -150,8 +150,9 @@ typedef struct
|
||||||
|
|
||||||
/* REST/S3 variables */
|
/* REST/S3 variables */
|
||||||
// CURL* curl; /* for libcurl "easy" fns (now managed by aws4c) */
|
// CURL* curl; /* for libcurl "easy" fns (now managed by aws4c) */
|
||||||
# define IOR_CURL_INIT 0x01
|
# define IOR_CURL_INIT 0x01 /* curl top-level inits were perfomed once? */
|
||||||
# define IOR_CURL_NOCONTINUE 0x02
|
# define IOR_CURL_NOCONTINUE 0x02
|
||||||
|
# define IOR_CURL_S3_EMC_EXT 0x04 /* allow EMC extensions to S3? */
|
||||||
char curl_flags;
|
char curl_flags;
|
||||||
char* URI; /* "path" to target object */
|
char* URI; /* "path" to target object */
|
||||||
IOBuf* io_buf; /* aws4c places parsed header values here */
|
IOBuf* io_buf; /* aws4c places parsed header values here */
|
||||||
|
|
Loading…
Reference in New Issue