Merge pull request #11 from VI4IO/master

StoneWalling with WearOutPhase
master
Glenn K. Lockwood 2017-11-15 00:09:22 -07:00 committed by GitHub
commit 3aa1f4c9b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 60 deletions

146
src/ior.c
View File

@ -66,7 +66,7 @@ static void ShowTest(IOR_param_t *);
static void PrintLongSummaryAllTests(IOR_test_t *tests_head); static void PrintLongSummaryAllTests(IOR_test_t *tests_head);
static void TestIoSys(IOR_test_t *); static void TestIoSys(IOR_test_t *);
static void ValidateTests(IOR_param_t *); static void ValidateTests(IOR_param_t *);
static IOR_offset_t WriteOrRead(IOR_param_t * test, void *fd, int access, IOR_io_buffers* ioBuffers); static IOR_offset_t WriteOrRead(IOR_param_t * test, IOR_results_t * results, void *fd, int access, IOR_io_buffers* ioBuffers);
static void WriteTimes(IOR_param_t *, double **, int, int); static void WriteTimes(IOR_param_t *, double **, int, int);
/********************************** M A I N ***********************************/ /********************************** M A I N ***********************************/
@ -142,8 +142,11 @@ int main(int argc, char **argv)
sleep(5); sleep(5);
printf("\trank %d: awake.\n", rank); printf("\trank %d: awake.\n", rank);
} }
TestIoSys(tptr); TestIoSys(tptr);
if(rank == 0 && tptr->params.stoneWallingWearOut){
fprintf(stdout, "Pairs deadlineForStonewallingaccessed: %lld\n", (long long) tptr->results->pairs_accessed);
}
} }
if (verbose < 0) if (verbose < 0)
@ -333,6 +336,10 @@ static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep)
fprintf(stdout, fprintf(stdout,
"WARNING: Using actual aggregate bytes moved = %lld.\n", "WARNING: Using actual aggregate bytes moved = %lld.\n",
(long long) results->aggFileSizeFromXfer[rep]); (long long) results->aggFileSizeFromXfer[rep]);
if(params->deadlineForStonewalling){
fprintf(stdout,
"WARNING: maybe caused by deadlineForStonewalling\n");
}
} }
} }
} }
@ -713,6 +720,8 @@ static void DisplayUsage(char **argv)
" -C reorderTasks -- changes task ordering to n+1 ordering for readback", " -C reorderTasks -- changes task ordering to n+1 ordering for readback",
" -d N interTestDelay -- delay between reps in seconds", " -d N interTestDelay -- delay between reps in seconds",
" -D N deadlineForStonewalling -- seconds before stopping write or read phase", " -D N deadlineForStonewalling -- seconds before stopping write or read phase",
" -O stoneWallingWearOut=1 -- once the stonewalling timout is over, all process finish to access the amount of data",
" -O stoneWallingWearOutIterations=N -- stop after processing this number of iterations, needed for reading data back written with stoneWallingWearOut",
" -e fsync -- perform fsync upon POSIX write close", " -e fsync -- perform fsync upon POSIX write close",
" -E useExistingTestFile -- do not remove test file before write access", " -E useExistingTestFile -- do not remove test file before write access",
" -f S scriptFile -- test script name", " -f S scriptFile -- test script name",
@ -1523,8 +1532,8 @@ static void ShowSetup(IOR_param_t *params)
} }
#endif /* HAVE_LUSTRE_LUSTRE_USER_H */ #endif /* HAVE_LUSTRE_LUSTRE_USER_H */
if (params->deadlineForStonewalling > 0) { if (params->deadlineForStonewalling > 0) {
printf("\tUsing stonewalling = %d second(s)\n", printf("\tUsing stonewalling = %d second(s)%s\n",
params->deadlineForStonewalling); params->deadlineForStonewalling, params->stoneWallingWearOut ? " with phase out" : "");
} }
fflush(stdout); fflush(stdout);
} }
@ -1544,6 +1553,7 @@ static void ShowTest(IOR_param_t * test)
fprintf(stdout, "\t%s=%s\n", "hintsFileName", test->hintsFileName); fprintf(stdout, "\t%s=%s\n", "hintsFileName", test->hintsFileName);
fprintf(stdout, "\t%s=%d\n", "deadlineForStonewall", fprintf(stdout, "\t%s=%d\n", "deadlineForStonewall",
test->deadlineForStonewalling); test->deadlineForStonewalling);
fprintf(stdout, "\t%s=%d\n", "stoneWallingWearOut", test->stoneWallingWearOut);
fprintf(stdout, "\t%s=%d\n", "maxTimeDuration", test->maxTimeDuration); fprintf(stdout, "\t%s=%d\n", "maxTimeDuration", test->maxTimeDuration);
fprintf(stdout, "\t%s=%d\n", "outlierThreshold", fprintf(stdout, "\t%s=%d\n", "outlierThreshold",
test->outlierThreshold); test->outlierThreshold);
@ -2045,7 +2055,7 @@ static void TestIoSys(IOR_test_t *test)
CurrentTimeString()); CurrentTimeString());
} }
timer[2][rep] = GetTimeStamp(); timer[2][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, fd, WRITE, &ioBuffers); dataMoved = WriteOrRead(params, results, fd, WRITE, &ioBuffers);
if (params->verbose >= VERBOSE_4) { if (params->verbose >= VERBOSE_4) {
printf("* data moved = %llu\n", dataMoved); printf("* data moved = %llu\n", dataMoved);
fflush(stdout); fflush(stdout);
@ -2098,7 +2108,7 @@ static void TestIoSys(IOR_test_t *test)
GetTestFileName(testFileName, params); GetTestFileName(testFileName, params);
params->open = WRITECHECK; params->open = WRITECHECK;
fd = backend->open(testFileName, params); fd = backend->open(testFileName, params);
dataMoved = WriteOrRead(params, fd, WRITECHECK, &ioBuffers); dataMoved = WriteOrRead(params, results, fd, WRITECHECK, &ioBuffers);
backend->close(fd, params); backend->close(fd, params);
rankOffset = 0; rankOffset = 0;
} }
@ -2170,7 +2180,7 @@ static void TestIoSys(IOR_test_t *test)
CurrentTimeString()); CurrentTimeString());
} }
timer[8][rep] = GetTimeStamp(); timer[8][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, fd, operation_flag, &ioBuffers); dataMoved = WriteOrRead(params, results, fd, operation_flag, &ioBuffers);
timer[9][rep] = GetTimeStamp(); timer[9][rep] = GetTimeStamp();
if (params->intraTestBarriers) if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm), MPI_CHECK(MPI_Barrier(testComm),
@ -2507,22 +2517,71 @@ static IOR_offset_t *GetOffsetArrayRandom(IOR_param_t * test, int pretendRank,
return (offsetArray); return (offsetArray);
} }
static IOR_offset_t WriteOrReadSingle(IOR_offset_t pairCnt, IOR_offset_t *offsetArray, int pretendRank,
IOR_offset_t * transferCount, int * errors, IOR_param_t * test, int * fd, IOR_io_buffers* ioBuffers, int access){
IOR_offset_t amtXferred;
IOR_offset_t transfer;
void *buffer = ioBuffers->buffer;
void *checkBuffer = ioBuffers->checkBuffer;
void *readCheckBuffer = ioBuffers->readCheckBuffer;
test->offset = offsetArray[pairCnt];
transfer = test->transferSize;
if (access == WRITE) {
/*
* fills each transfer with a unique pattern
* containing the offset into the file
*/
if (test->storeFileOffset == TRUE) {
FillBuffer(buffer, test, test->offset, pretendRank);
}
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot write to file");
} else if (access == READ) {
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot read from file");
} else if (access == WRITECHECK) {
memset(checkBuffer, 'a', transfer);
amtXferred =
backend->xfer(access, fd, checkBuffer, transfer,
test);
if (amtXferred != transfer)
ERR("cannot read from file write check");
(*transferCount)++;
*errors += CompareBuffers(buffer, checkBuffer, transfer,
*transferCount, test,
WRITECHECK);
} else if (access == READCHECK) {
amtXferred = backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer){
ERR("cannot read from file");
}
if (test->storeFileOffset == TRUE) {
FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
}
*errors += CompareBuffers(readCheckBuffer, buffer, transfer, *transferCount, test, READCHECK);
}
return amtXferred;
}
/* /*
* Write or Read data to file(s). This loops through the strides, writing * Write or Read data to file(s). This loops through the strides, writing
* out the data to each block in transfer sizes, until the remainder left is 0. * out the data to each block in transfer sizes, until the remainder left is 0.
*/ */
static IOR_offset_t WriteOrRead(IOR_param_t * test, void *fd, int access, IOR_io_buffers* ioBuffers) static IOR_offset_t WriteOrRead(IOR_param_t * test, IOR_results_t * results, void *fd, int access, IOR_io_buffers* ioBuffers)
{ {
int errors = 0; int errors = 0;
IOR_offset_t amtXferred; IOR_offset_t amtXferred;
IOR_offset_t transfer;
IOR_offset_t transferCount = 0; IOR_offset_t transferCount = 0;
IOR_offset_t pairCnt = 0; IOR_offset_t pairCnt = 0;
IOR_offset_t *offsetArray; IOR_offset_t *offsetArray;
int pretendRank; int pretendRank;
void *buffer = ioBuffers->buffer;
void *checkBuffer = ioBuffers->checkBuffer;
void *readCheckBuffer = ioBuffers->readCheckBuffer;
IOR_offset_t dataMoved = 0; /* for data rate calculation */ IOR_offset_t dataMoved = 0; /* for data rate calculation */
double startForStonewall; double startForStonewall;
int hitStonewall; int hitStonewall;
@ -2544,55 +2603,30 @@ static IOR_offset_t WriteOrRead(IOR_param_t * test, void *fd, int access, IOR_io
> test->deadlineForStonewalling)); > test->deadlineForStonewalling));
/* loop over offsets to access */ /* loop over offsets to access */
while ((offsetArray[pairCnt] != -1) && !hitStonewall) { while ((offsetArray[pairCnt] != -1) && !hitStonewall ) {
test->offset = offsetArray[pairCnt]; dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
transfer = test->transferSize;
if (access == WRITE) {
/*
* fills each transfer with a unique pattern
* containing the offset into the file
*/
if (test->storeFileOffset == TRUE) {
FillBuffer(buffer, test, test->offset, pretendRank);
}
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot write to file");
} else if (access == READ) {
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot read from file");
} else if (access == WRITECHECK) {
memset(checkBuffer, 'a', transfer);
amtXferred =
backend->xfer(access, fd, checkBuffer, transfer,
test);
if (amtXferred != transfer)
ERR("cannot read from file write check");
transferCount++;
errors += CompareBuffers(buffer, checkBuffer, transfer,
transferCount, test,
WRITECHECK);
} else if (access == READCHECK) {
amtXferred = backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer){
ERR("cannot read from file");
}
if (test->storeFileOffset == TRUE) {
FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
}
errors += CompareBuffers(readCheckBuffer, buffer, transfer, transferCount, test, READCHECK);
}
dataMoved += amtXferred;
pairCnt++; pairCnt++;
hitStonewall = ((test->deadlineForStonewalling != 0) hitStonewall = ((test->deadlineForStonewalling != 0)
&& ((GetTimeStamp() - startForStonewall) && ((GetTimeStamp() - startForStonewall)
> test->deadlineForStonewalling)); > test->deadlineForStonewalling)) || (test->stoneWallingWearOutIterations != 0 && pairCnt == test->stoneWallingWearOutIterations) ;
} }
if (test->stoneWallingWearOut){
MPI_CHECK(MPI_Allreduce(& pairCnt, &results->pairs_accessed,
1, MPI_LONG_LONG_INT, MPI_MAX, testComm), "cannot reduce pairs moved");
if (verbose >= VERBOSE_1){
printf("%d: stonewalling pairs accessed globally: %lld this rank: %lld\n", rank, (long long) results->pairs_accessed, (long long) pairCnt);
}
if(pairCnt != results->pairs_accessed){
// some work needs still to be done !
for(; pairCnt < results->pairs_accessed; pairCnt++ ) {
dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
}
}
}else{
results->pairs_accessed = pairCnt;
}
totalErrorCount += CountErrors(test, access, errors); totalErrorCount += CountErrors(test, access, errors);

View File

@ -5,7 +5,7 @@
* 8-chars of indenting is ridiculous. If you really want 8-spaces, * 8-chars of indenting is ridiculous. If you really want 8-spaces,
* then change the mode-line to use tabs, and configure your personal * then change the mode-line to use tabs, and configure your personal
* editor environment to use 8-space tab-stops. * editor environment to use 8-space tab-stops.
* *
*/ */
/******************************************************************************\ /******************************************************************************\
* * * *
@ -67,7 +67,7 @@ enum PACKET_TYPE
/* A struct to hold the buffers so we can pass 1 pointer around instead of 3 /* A struct to hold the buffers so we can pass 1 pointer around instead of 3
*/ */
typedef struct IO_BUFFERS typedef struct IO_BUFFERS
{ {
void* buffer; void* buffer;
void* checkBuffer; void* checkBuffer;
@ -141,6 +141,8 @@ typedef struct
int useExistingTestFile; /* do not delete test file before access */ int useExistingTestFile; /* do not delete test file before access */
int storeFileOffset; /* use file offset as stored signature */ int storeFileOffset; /* use file offset as stored signature */
int deadlineForStonewalling; /* max time in seconds to run any test phase */ int deadlineForStonewalling; /* max time in seconds to run any test phase */
int stoneWallingWearOut; /* wear out the stonewalling, once the timout is over, each process has to write the same amount */
int stoneWallingWearOutIterations; /* the number of iterations for the stonewallingWearOut, needed for readBack */
int maxTimeDuration; /* max time in minutes to run each test */ int maxTimeDuration; /* max time in minutes to run each test */
int outlierThreshold; /* warn on outlier N seconds from mean */ int outlierThreshold; /* warn on outlier N seconds from mean */
int verbose; /* verbosity */ int verbose; /* verbosity */
@ -218,6 +220,7 @@ typedef struct
typedef struct { typedef struct {
double *writeTime; double *writeTime;
double *readTime; double *readTime;
size_t pairs_accessed; // number of I/Os done, useful for deadlineForStonewalling
IOR_offset_t *aggFileSizeFromStat; IOR_offset_t *aggFileSizeFromStat;
IOR_offset_t *aggFileSizeFromXfer; IOR_offset_t *aggFileSizeFromXfer;
IOR_offset_t *aggFileSizeForBW; IOR_offset_t *aggFileSizeForBW;

View File

@ -171,6 +171,10 @@ void DecodeDirective(char *line, IOR_param_t *params)
strcpy(params->hintsFileName, value); strcpy(params->hintsFileName, value);
} else if (strcasecmp(option, "deadlineforstonewalling") == 0) { } else if (strcasecmp(option, "deadlineforstonewalling") == 0) {
params->deadlineForStonewalling = atoi(value); params->deadlineForStonewalling = atoi(value);
} else if (strcasecmp(option, "stoneWallingWearOut") == 0) {
params->stoneWallingWearOut = atoi(value);
} else if (strcasecmp(option, "stoneWallingWearOutIterations") == 0) {
params->stoneWallingWearOutIterations = atoi(value);
} else if (strcasecmp(option, "maxtimeduration") == 0) { } else if (strcasecmp(option, "maxtimeduration") == 0) {
params->maxTimeDuration = atoi(value); params->maxTimeDuration = atoi(value);
} else if (strcasecmp(option, "outlierthreshold") == 0) { } else if (strcasecmp(option, "outlierthreshold") == 0) {
@ -539,13 +543,13 @@ IOR_test_t *ParseCommandLine(int argc, char **argv)
initialTestParams.dataPacketType = incompressible; initialTestParams.dataPacketType = incompressible;
break; break;
case 't': /* timestamp */ case 't': /* timestamp */
initialTestParams.dataPacketType = timestamp; initialTestParams.dataPacketType = timestamp;
break; break;
case 'o': /* offset packet */ case 'o': /* offset packet */
initialTestParams.storeFileOffset = TRUE; initialTestParams.storeFileOffset = TRUE;
initialTestParams.dataPacketType = offset; initialTestParams.dataPacketType = offset;
break; break;
default: default:
fprintf(stdout, fprintf(stdout,
"Unknown arguement for -l %s generic assumed\n", optarg); "Unknown arguement for -l %s generic assumed\n", optarg);
break; break;