diff --git a/src/aiori-S3.c b/src/aiori-S3.c index fb37008..1784e62 100644 --- a/src/aiori-S3.c +++ b/src/aiori-S3.c @@ -64,17 +64,7 @@ * --> Select this option with the '-a S3' command-line arg to IOR * * - * (2) "S3 + EMC append" uses S3 Multi-Part Upload for N:1, like pure S3, - * but also allows appends in the N:N case, via the EMC byte-range - * write support. This also does away with constraints on the number - * or size of parts to S3 Multi-Part Upload. - * - * --> Select this option with the '-a S3_plus' command-line arg to IOR. - * ior.c will then set the IOR_S3_EMC_EXT flag, which will cause - * s3_connect to initialize with EMC-extensions enabled. - * - * - * (3) "EMC S3 Extensions" uses the EMC byte-range support for N:1 + * (2) "EMC S3 Extensions" uses the EMC byte-range support for N:1 * writes, eliminating Multi-Part Upload. EMC expects this will * perform better than MPU, and it avoids some problems that are * imposed by the S3 MPU spec. [See comments at EMC_Xfer().] @@ -84,10 +74,10 @@ * * NOTE: Putting EMC's S3-extensions in the same file with the S3 API * allows us to share some code that would otherwise be duplicated - * (e.g. s3_connect(), etc). This should help us avoid losing bug - * fixes that are discovered in one interface or the other. In some - * cases, S3 is incapable of supporting all the needs of IOR. (For - * example, see notes about "append", above S3_Xfer(). + * (e.g. s3_connect(), etc). This should also help us avoid losing + * bug fixes that are discovered in one interface or the other. In + * some cases, S3 is incapable of supporting all the needs of IOR. + * (For example, see notes about "append", above S3_Xfer(). * ******************************************************************************/ @@ -283,6 +273,7 @@ s3_connect( IOR_param_t* param ) { aws_iobuf_growth_size(param->etags, 1024*1024*8); // our hosts are currently 10.140.0.15 - 10.140 0.18 + // TBD: Try DNS-round-robin server at vi-lb.ccstar.lanl.gov snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4)); s3_set_host(buff); @@ -347,16 +338,11 @@ s3_MPU_reset(IOR_param_t* param) { /* * One doesn't "open" an object, in REST semantics. All we really care * about is whether caller expects the object to have zero-size, when we - * return. If so, we have to delete it, then recreate it empty. - * - * NOTE: Similarly, there's no file-descriptor to return. On the other - * hand, we keep needing the file *NAME*. Therefore, we will return - * the file-name, and let IOR pass it around to our functions, in - * place of its usual file-descriptor argument. + * return. If so, we conceptually delete it, then recreate it empty. * * ISSUE: If the object is going to receive "appends" (supported in EMC S3 * extensions), the object has to exist before the first append - * operation. On the other hand, There appears to be a bug in the + * operation. On the other hand, there appears to be a bug in the * EMC implementation, such that if an object ever receives appends, * and then is deleted, and then recreated, the recreated object will * always return "500 Server Error" on GET (whether it has been @@ -364,6 +350,13 @@ s3_MPU_reset(IOR_param_t* param) { * * Therefore, a safer thing to do here is write zero-length contents, * instead of deleting. + * + * NOTE: There's also no file-descriptor to return, in REST semantics. On + * the other hand, we keep needing the file *NAME*. Therefore, we + * will return the file-name, and let IOR pass it around to our + * functions, in place of what IOR understands to be a + * file-descriptor. + * */ static @@ -374,7 +367,8 @@ S3_Create_Or_Open_internal(char* testFileName, int multi_part_upload_p ) { if (param->verbose >= VERBOSE_2) { - printf("-> S3_Create_Or_Open\n"); + printf("-> S3_Create_Or_Open('%s', ,%d, %d)\n", + testFileName, createFile, multi_part_upload_p); } /* initialize curl, if needed */ @@ -388,46 +382,38 @@ S3_Create_Or_Open_internal(char* testFileName, fprintf( stdout, "Direct I/O mode is not implemented in S3\n" ); } + // easier to think + int n_to_n = param->filePerProc; + int n_to_1 = ! n_to_n; /* check whether object needs reset to zero-length */ int needs_reset = 0; if (! multi_part_upload_p) needs_reset = 1; /* so "append" can work */ - if ( param->openFlags & IOR_TRUNC ) + else if ( param->openFlags & IOR_TRUNC ) needs_reset = 1; /* so "append" can work */ else if (createFile) { - AWS4C_CHECK( s3_head(param->io_buf, testFileName) ); - if ( ! AWS4C_OK(param->io_buf) ) + // AWS4C_CHECK( s3_head(param->io_buf, testFileName) ); + // if ( ! AWS4C_OK(param->io_buf) ) needs_reset = 1; } - if ( param->open == WRITE ) { - /* initializations for N:N writes */ - if ( param->filePerProc ) { + /* initializations for N:1 or N:N writes using multi-part upload */ + if (multi_part_upload_p) { - /* maybe reset to zero-length, so "append" can work */ - if (needs_reset) { - aws_iobuf_reset(param->io_buf); - AWS4C_CHECK( s3_put(param->io_buf, testFileName) ); - AWS4C_CHECK_OK( param->io_buf ); - } - } - - - /* initializations for N:1 writes using multi-part upload */ - else if (multi_part_upload_p) { - - /* rank0 initiates multi-part upload. The response from the server - includes an "uploadId", which must be used by all ranks, when - uploading parts. */ - if (rank == 0) { + // For N:N, all ranks do their own MPU open/close. For N:1, only + // rank0 does that. Either way, the response from the server + // includes an "uploadId", which must be used to upload parts to + // the same object. + if ( n_to_n || (rank == 0) ) { // rank0 handles truncate if ( needs_reset) { aws_iobuf_reset(param->io_buf); - AWS4C_CHECK( s3_put(param->io_buf, testFileName) ); + AWS4C_CHECK( s3_put(param->io_buf, testFileName) ); /* 0-length write */ + AWS4C_CHECK_OK( param->io_buf ); } // POST request with URL+"?uploads" initiates multi-part upload @@ -450,7 +436,7 @@ S3_Create_Or_Open_internal(char* testFileName, if (! upload_id) ERR_SIMPLE("couldn't find 'UploadId' in returned XML\n"); - if (param->verbose >= VERBOSE_4) + if (param->verbose >= VERBOSE_3) printf("got UploadId = '%s'\n", upload_id); const size_t upload_id_len = strlen(upload_id); @@ -469,19 +455,26 @@ S3_Create_Or_Open_internal(char* testFileName, xmlFreeDoc(doc); aws_iobuf_free(response); - // share UploadId across all ranks - MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm); + // For N:1, share UploadId across all ranks + if (n_to_1) + MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm); } else - // recv UploadID from Rank 0 + // N:1, and we're not rank0. recv UploadID from Rank 0 MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm); } - /* initializations for N:1 writes using EMC byte-range extensions */ + /* initializations for N:N or N:1 writes using EMC byte-range extensions */ else { /* maybe reset to zero-length, so "append" can work */ if (needs_reset) { + + if (verbose >= VERBOSE_3) { + fprintf( stdout, "rank %d resetting\n", + rank); + } + aws_iobuf_reset(param->io_buf); AWS4C_CHECK( s3_put(param->io_buf, testFileName) ); AWS4C_CHECK_OK( param->io_buf ); @@ -662,47 +655,67 @@ S3_Xfer_internal(int access, char* data_ptr = (char *)buffer; off_t offset = param->offset; + // easier to think + int n_to_n = param->filePerProc; + int n_to_1 = (! n_to_n); + int segmented = (param->segmentCount == 1); + if (access == WRITE) { /* WRITE */ - if (verbose >= VERBOSE_4) { - fprintf( stdout, "task %d writing to offset %lld\n", + if (verbose >= VERBOSE_3) { + fprintf( stdout, "rank %d writing length=%lld to offset %lld\n", rank, + remaining, param->offset + length - remaining); } - if (param->filePerProc) { // N:N + if (multi_part_upload_p) { - // NOTE: You must call 's3_enable_EMC_extensions(1)' to let this work: - s3_set_byte_range(-1,-1); // produces header "Range: bytes=-1-" - - // For performance, we append directly into the linked list - // of data in param->io_buf. We are "appending" rather than - // "extending", so the added buffer is seen as written data, rather - // than empty storage. - aws_iobuf_reset(param->io_buf); - aws_iobuf_append_static(param->io_buf, data_ptr, remaining); - AWS4C_CHECK( s3_put(param->io_buf, file) ); - AWS4C_CHECK_OK( param->io_buf ); - - // drop ptrs to , in param->io_buf - aws_iobuf_reset(param->io_buf); - } - else if (multi_part_upload_p) { // N:1 (with MPU) - - // Ordering of the part-numbers imposes a global ordering on - // the components of the final object. param->part_number - // is incremented by 1 per write, on each rank. This lets us - // use it to compute a global part-numbering. - // + // For N:1, part-numbers must have a global ordering for the + // components of the final object. param->part_number is + // incremented by 1 per write, on each rank. This lets us use it + // to compute a global part-numbering. + // + // In the N:N case, we only need to increment part-numbers within + // each rank. + // + // In the N:1 case, the global order of part-numbers we're writing + // depends on whether wer're writing strided or segmented, in + // other words, how and are acutally + // positioning the parts being written. [See discussion at + // S3_Close_internal().] + // // NOTE: 's3curl.pl --debug' shows StringToSign having partNumber // first, even if I put uploadId first in the URL. Maybe // that's what the server will do. GetStringToSign() in // aws4c is not clever about this, so we spoon-feed args in // the proper order. - size_t part_number = (param->part_number++ * numTasksWorld) + rank; + size_t part_number; + if (n_to_1) { + if (segmented) { // segmented + size_t parts_per_rank = param->blockSize / param->transferSize; + part_number = (rank * parts_per_rank) + param->part_number; + } + else // strided + part_number = (param->part_number * param->numTasks) + rank; + } + else + part_number = param->part_number; + ++ param->part_number; + + + // if (verbose >= VERBOSE_3) { + // fprintf( stdout, "rank %d of %d writing (%s,%s) part_number %lld\n", + // rank, + // param->numTasks, + // (n_to_1 ? "N:1" : "N:N"), + // (segmented ? "segmented" : "strided"), + // part_number); + // } + snprintf(buff, BUFF_SIZE, "%s?partNumber=%d&uploadId=%s", fname, part_number, param->UploadId); @@ -722,14 +735,30 @@ S3_Xfer_internal(int access, AWS4C_CHECK( s3_put(param->io_buf, buff) ); AWS4C_CHECK_OK( param->io_buf ); - if (verbose >= VERBOSE_4) { - printf("rank %d: read ETag = '%s'\n", rank, param->io_buf->eTag); - if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */ + // if (verbose >= VERBOSE_3) { + // printf("rank %d: read ETag = '%s'\n", rank, param->io_buf->eTag); + // if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */ + // fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n", + // rank, ETAG_SIZE); + // exit(1); + // } + // } + + if (verbose >= VERBOSE_3) { + fprintf( stdout, "rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n", + rank, + param->numTasks, + (n_to_1 ? "N:1" : "N:N"), + (segmented ? "segmented" : "strided"), + offset, + part_number, + param->io_buf->eTag); // incl quote-marks at [0] and [len-1] + } + if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */ fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n", rank, ETAG_SIZE); exit(1); - } - } + } // save the eTag for later // @@ -746,10 +775,15 @@ S3_Xfer_internal(int access, // drop ptrs to , in param->io_buf aws_iobuf_reset(param->io_buf); } - else { // N:1 (use EMC's byte-range write support, instead of MPU) + else { // use EMC's byte-range write-support, instead of MPU - // NOTE: You must call 's3_enable_EMC_extensions(1)' to let this work: - s3_set_byte_range(offset, remaining); // produces appropriate byte-range header + + // NOTE: You must call 's3_enable_EMC_extensions(1)' for + // byte-ranges to work for writes. + if (n_to_n) + s3_set_byte_range(-1,-1); // EMC header "Range: bytes=-1-" means "append" + else + s3_set_byte_range(offset, remaining); // For performance, we append directly into the linked list // of data in param->io_buf. We are "appending" rather than @@ -757,7 +791,7 @@ S3_Xfer_internal(int access, // than empty storage. aws_iobuf_reset(param->io_buf); aws_iobuf_append_static(param->io_buf, data_ptr, remaining); - AWS4C_CHECK( s3_put(param->io_buf, file) ); + AWS4C_CHECK ( s3_put(param->io_buf, file) ); AWS4C_CHECK_OK( param->io_buf ); // drop ptrs to , in param->io_buf @@ -772,13 +806,14 @@ S3_Xfer_internal(int access, } else { /* READ or CHECK */ - if (verbose >= VERBOSE_4) { - fprintf( stdout, "task %d reading from offset %lld\n", + if (verbose >= VERBOSE_3) { + fprintf( stdout, "rank %d reading from offset %lld\n", rank, param->offset + length - remaining ); } // read specific byte-range from the object + // [This is included in the "pure" S3 spec.] s3_set_byte_range(offset, remaining); // For performance, we append directly into the linked @@ -792,12 +827,12 @@ S3_Xfer_internal(int access, AWS4C_CHECK( s3_get(param->io_buf, file) ); if (param->io_buf->code != 206) { /* '206 Partial Content' */ snprintf(buff, BUFF_SIZE, - "ERROR: Unexpected result (%d, '%s') at %s, line %d\n", - param->io_buf->code, param->io_buf->result, __FILE__, __LINE__); + "Unexpected result (%d, '%s')", + param->io_buf->code, param->io_buf->result); ERR_SIMPLE(buff); } - // drop ptrs to , in param->io_buf + // drop refs to , in param->io_buf aws_iobuf_reset(param->io_buf); } @@ -908,173 +943,282 @@ S3_Close_internal( void* fd, IOR_param_t* param, int multi_part_upload_p ) { - if (param->verbose >= VERBOSE_2) { - printf("-> S3_Close\n"); - } char* fname = (char*)fd; /* see NOTE above S3_Create_Or_Open() */ + // easier to think + int n_to_n = param->filePerProc; + int n_to_1 = (! n_to_n); + int segmented = (param->segmentCount == 1); + + if (param->verbose >= VERBOSE_2) { + printf("-> S3_Close('%s', ,%d) %s\n", + fname, + multi_part_upload_p, + ((n_to_n) ? "N:N" : ((segmented) ? "N:1(seg)" : "N:1(str)"))); + } + if (param->open == WRITE) { - // closing N:N write - if (param->filePerProc) { - // nothing to do ... - } - // closing N:1 write (with Multi-Part Upload) - else if (multi_part_upload_p) { + // finalizing Multi-Part Upload (for N:1 or N:N) + if (multi_part_upload_p) { - MPI_Datatype mpi_size_t; - if (sizeof(size_t) == sizeof(int)) - mpi_size_t = MPI_INT; - else if (sizeof(size_t) == sizeof(long)) - mpi_size_t = MPI_LONG; - else - mpi_size_t = MPI_LONG_LONG; - // Everybody should have the same number of ETags (?) - size_t etag_data_size = param->etags->write_count; /* size of local ETag data */ - size_t etag_count = etag_data_size / ETAG_SIZE; /* number of local etags */ - size_t etag_count_max = 0; /* highest number on any proc */ + size_t etag_data_size = param->etags->write_count; /* local ETag data (bytes) */ + size_t etags_per_rank = etag_data_size / ETAG_SIZE; /* number of local etags */ - MPI_Allreduce(&etag_count, &etag_count_max, - 1, mpi_size_t, MPI_MAX, param->testComm); - if (etag_count != etag_count_max) { - printf("Rank %d: etag count mismatch: max:%d, mine:%d\n", - rank, etag_count_max, etag_count); - MPI_Abort(param->testComm, 1); - } + // --- create XML containing ETags in an IOBuf for "close" request + IOBuf* xml = NULL; - // collect ETag data at Rank0 - aws_iobuf_realloc(param->etags); /* force single contiguous buffer */ - char* etag_data = param->etags->first->buf; /* ptr to contiguous data */ - if (rank != 0) { - MPI_Gather(etag_data, etag_data_size, MPI_BYTE, - NULL, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); - } - else { - char* etag_ptr; - int rnk; - int i; + if (n_to_1) { - char* etag_vec = (char*)malloc((numTasksWorld * etag_data_size) +1); - if (! etag_vec) { - fprintf(stderr, "rank 0 failed to malloc %d bytes\n", - numTasksWorld * etag_data_size); + // for N:1, gather all Etags at Rank0 + MPI_Datatype mpi_size_t; + if (sizeof(size_t) == sizeof(int)) + mpi_size_t = MPI_INT; + else if (sizeof(size_t) == sizeof(long)) + mpi_size_t = MPI_LONG; + else + mpi_size_t = MPI_LONG_LONG; + + // Everybody should have the same number of ETags (?) + size_t etag_count_max = 0; /* highest number on any proc */ + MPI_Allreduce(&etags_per_rank, &etag_count_max, + 1, mpi_size_t, MPI_MAX, param->testComm); + if (etags_per_rank != etag_count_max) { + printf("Rank %d: etag count mismatch: max:%d, mine:%d\n", + rank, etag_count_max, etags_per_rank); MPI_Abort(param->testComm, 1); } - MPI_Gather(etag_data, etag_data_size, MPI_BYTE, - etag_vec, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); - // --- debugging: show the gathered etag data - // (This shows the raw concatenated etag-data from each node.) - if (param->verbose >= VERBOSE_4) { - printf("rank 0: gathered %d etags from all ranks:\n", etag_count); - etag_ptr=etag_vec; - for (rnk=0; rnketags); /* force single contiguous buffer */ + char* etag_data = param->etags->first->buf; /* per-rank data, contiguous */ - int ii; - for (ii=0; iinumTasks * etag_data_size) +1); + if (! etag_vec) { + fprintf(stderr, "rank 0 failed to malloc %d bytes\n", + param->numTasks * etag_data_size); + MPI_Abort(param->testComm, 1); } + MPI_Gather(etag_data, etag_data_size, MPI_BYTE, + etag_vec, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); + + // --- debugging: show the gathered etag data + // (This shows the raw concatenated etag-data from each node.) + if (param->verbose >= VERBOSE_4) { + + printf("rank 0: gathered %d etags from all ranks:\n", etags_per_rank); + etag_ptr=etag_vec; + for (rnk=0; rnknumTasks; ++rnk) { + printf("\t[%d]: '", rnk); + + int ii; + for (ii=0; ii parts, + // locally. At rank0, the etags for each rank are now + // stored as a continguous block of text, with the blocks + // stored in rank order in etag_vec. In other words, our + // internal rep at rank 0 matches the "segmented" format. + // From this, we must select etags in an order matching how + // they appear in the actual object, and give sequential + // part-numbers to the resulting sequence. + // + // That ordering of parts in the actual written object + // varies according to whether we wrote in the "segmented" + // or "strided" format. + // + // supposing N ranks, and P parts per rank: + // + // segmented: + // + // all parts for a given rank are consecutive. + // rank r writes these parts: + // + // rP, rP+1, ... (r+1)P -1 + // + // i.e. rank0 writes parts 0,1,2,3 ... P-1 + // + // + // strided: + // + // rank r writes every P-th part, starting with r. + // + // r, P+r, ... (P-1)P + r + // + // i.e. rank0 writes parts 0,P,2P,3P ... (P-1)P + // + // + // NOTE: If we knew ahead of time how many parts each rank was + // going to write, we could assign part-number ranges, per + // rank, and then have nice locality here. + // + // Alternatively, we could have everyone format their own + // XML text and send that, instead of just the tags. This + // would increase the amount of data being sent, but would + // reduce the work for rank0 to format everything. + + size_t i_max; // outer-loop + size_t j_max; // inner loop + size_t start_multiplier; // initial offset in collected data + size_t stride; // in etag_vec + + if (segmented) { // segmented + i_max = param->numTasks; + j_max = etags_per_rank; + start_multiplier = etag_data_size; /* one rank's-worth of Etag data */ + stride = ETAG_SIZE; /* one ETag */ + } + else { // strided + i_max = etags_per_rank; + j_max = param->numTasks; + start_multiplier = ETAG_SIZE; /* one ETag */ + stride = etag_data_size; /* one rank's-worth of Etag data */ + } + + + xml = aws_iobuf_new(); + aws_iobuf_growth_size(xml, 1024 * 8); + + // write XML header ... + aws_iobuf_append_str(xml, "\n"); + + int part = 0; + for (i=0; i\n" + " %d\n" + " %s\n" + " \n", + part, etag); + + aws_iobuf_append_str(xml, buff); + + etag_ptr += stride; + ++ part; + } + } + + // write XML tail ... + aws_iobuf_append_str(xml, "\n"); } - // --- create XML containing ETags in an IOBuf for "close" request - IOBuf* xml = aws_iobuf_new(); + else { + MPI_Gather(etag_data, etag_data_size, MPI_BYTE, + NULL, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD); + } + } + + else { /* N:N */ + + xml = aws_iobuf_new(); aws_iobuf_growth_size(xml, 1024 * 8); // write XML header ... aws_iobuf_append_str(xml, "\n"); - // add XML for *all* the parts. The XML must be ordered by - // part-number. Each rank wrote parts. The etags - // for each rank are staored as a continguous block of text, with - // the blocks stored in rank order in etag_vec. We must therefore - // access them in the worst possible way, regarding locality. - // - // NOTE: If we knew ahead of time how many parts each rank was - // going to write, we could assign part-number ranges, per - // rank, and then have nice locality here. - // - // Alternatively, we could have everyone format their own - // XML text and send that, instead of just the tags. This - // would increase the amount of data being sent, but would - // reduce the work for rank0 to format everything. - - int part = 0; - for (i=0; ietags, etag, ETAG_SIZE); + if (sz != ETAG_SIZE) { snprintf(buff, BUFF_SIZE, - " \n" - " %d\n" - " %s\n" - " \n", - part, etag); - - aws_iobuf_append_str(xml, buff); - - etag_ptr += etag_data_size; - ++ part; + "Read of ETag %d had length %d (not %d)\n", + rank, i, sz, ETAG_SIZE); + ERR_SIMPLE(buff); } + etag[ETAG_SIZE] = 0; + + + // write XML for next part, with Etag ... + snprintf(buff, BUFF_SIZE, + " \n" + " %d\n" + " %s\n" + " \n", + part, etag); + + aws_iobuf_append_str(xml, buff); + + ++ part; } // write XML tail ... aws_iobuf_append_str(xml, "\n"); + } + + + + // send request to finalize MPU + if (n_to_n || (rank == 0)) { // DEBUGGING: show the XML we constructed - if (param->verbose >= VERBOSE_4) + if (param->verbose >= VERBOSE_3) debug_iobuf(xml, 1, 1); // --- POST our XML to the server. snprintf(buff, BUFF_SIZE, - "%s?uploadId=%s", - fname, param->UploadId); + "%s?uploadId=%s", + fname, param->UploadId); -#if 1 AWS4C_CHECK ( s3_post(xml, buff) ); AWS4C_CHECK_OK( xml ); -#else - IOBuf* response = aws_iobuf_new(); - aws_iobuf_reset(response); - AWS4C_CHECK( s3_post2(xml, buff, NULL, response) ); - if (! AWS4C_OK(param->io_buf) ) { - fprintf(stderr, "rank %d: POST '%s' failed: %s\n", - rank, buff, param->io_buf->result); - int sz; - for (sz = aws_iobuf_getline(response, buff, BUFF_SIZE); - sz; - sz = aws_iobuf_getline(response, buff, BUFF_SIZE)) { - printf("-- %s\n", buff); - } - MPI_Abort(param->testComm, 1); - } - aws_iobuf_free(response); -#endif aws_iobuf_free(xml); } - // --- reset associated info. allows another MPU, and frees memory. + // everybody reset MPU info. Allows another MPU, and frees memory. s3_MPU_reset(param); - // Don't you non-zero ranks go trying to stat the N:1 file until - // rank0 has finished the S3 multi-part finalize. It will not appear - // to exist, until then. - MPI_CHECK(MPI_Barrier(param->testComm), "barrier error"); + // Everybody meetup, so non-zero ranks won't go trying to stat the + // N:1 file until rank0 has finished the S3 multi-part finalize. + // The object will not appear to exist, until then. + if (n_to_1) + MPI_CHECK(MPI_Barrier(param->testComm), "barrier error"); + } + else { + + // No finalization is needed, when using EMC's byte-range writing + // support. However, we do need to make sure everyone has + // finished writing, before anyone starts reading. + if (n_to_1) { + MPI_CHECK(MPI_Barrier(param->testComm), "barrier error"); + if (param->verbose >= VERBOSE_2) + printf("rank %d: passed barrier\n", rank); + } } // After writing, reset the CURL connection, so that caches won't be @@ -1123,7 +1267,17 @@ S3_Delete( char *testFileName, IOR_param_t * param ) { /* maybe initialize curl */ s3_connect( param ); +#if 0 + // EMC BUG: If file was written with appends, and is deleted, + // Then any future recreation will result in an object that can't be read. + // this AWS4C_CHECK( s3_delete(param->io_buf, testFileName) ); +#else + // just replace with a zero-length object for now + aws_iobuf_reset(param->io_buf); + AWS4C_CHECK ( s3_put(param->io_buf, testFileName) ); +#endif + AWS4C_CHECK_OK( param->io_buf ); if (param->verbose >= VERBOSE_2) @@ -1218,6 +1372,9 @@ S3_GetFileSize(IOR_param_t * param, } aggFileSizeFromStat = param->io_buf->contentLen; + if (param->verbose >= VERBOSE_2) { + printf("\trank %d: file-size %llu\n", rank, aggFileSizeFromStat); + } if ( param->filePerProc == TRUE ) { if (param->verbose >= VERBOSE_2) { diff --git a/src/ior.c b/src/ior.c index ff11af7..3775586 100644 --- a/src/ior.c +++ b/src/ior.c @@ -263,9 +263,7 @@ static void AioriBind(char* api, IOR_param_t* param) ERR("unrecognized IO API"); } else if (! strncmp(api, "S3", 2)) { - if (! strcmp(api, "S3_plus")) - param->curl_flags |= IOR_CURL_S3_EMC_EXT; - else if (! strcmp(api, "S3_EMC")) + if (! strcmp(api, "S3_EMC")) param->curl_flags |= IOR_CURL_S3_EMC_EXT; else param->curl_flags &= ~(IOR_CURL_S3_EMC_EXT); @@ -742,7 +740,7 @@ static void DisplayUsage(char **argv) { char *opts[] = { "OPTIONS:", - " -a S api -- API for I/O [POSIX|MPIIO|HDF5|NCMPI]", + " -a S api -- API for I/O [POSIX|MPIIO|HDF5|HDFS|S3|S3_EMC|NCMPI]", " -A N refNum -- user supplied reference number to include in the summary", " -b N blockSize -- contiguous bytes to write per task (e.g.: 8, 4k, 2m, 1g)", " -B useO_DIRECT -- uses O_DIRECT for POSIX, bypassing I/O buffers",