New stonewalling option to create a wear out phase.

Once a process hits the stonewall (timelimit), they all figure out the maximum pair read/written.
Each proces continues to read/write until the maximum number of pairs is reached, this simulates the wear out.
master
Julian Kunkel 2017-10-20 18:02:24 +02:00
parent 2ce37ae90a
commit aa0b8c92d2
3 changed files with 98 additions and 58 deletions

139
src/ior.c
View File

@ -93,7 +93,7 @@ static void ShowTest(IOR_param_t *);
static void PrintLongSummaryAllTests(IOR_test_t *tests_head); static void PrintLongSummaryAllTests(IOR_test_t *tests_head);
static void TestIoSys(IOR_test_t *); static void TestIoSys(IOR_test_t *);
static void ValidateTests(IOR_param_t *); static void ValidateTests(IOR_param_t *);
static IOR_offset_t WriteOrRead(IOR_param_t * test, void *fd, int access, IOR_io_buffers* ioBuffers); static IOR_offset_t WriteOrRead(IOR_param_t * test, IOR_results_t * results, void *fd, int access, IOR_io_buffers* ioBuffers);
static void WriteTimes(IOR_param_t *, double **, int, int); static void WriteTimes(IOR_param_t *, double **, int, int);
/********************************** M A I N ***********************************/ /********************************** M A I N ***********************************/
@ -169,6 +169,9 @@ int main(int argc, char **argv)
sleep(5); sleep(5);
printf("\trank %d: awake.\n", rank); printf("\trank %d: awake.\n", rank);
} }
if(rank == 0 && tptr->params.stoneWallingWearOut){
fprintf(stdout, "Pairs deadlineForStonewallingaccessed: %lld\n", (long long) tptr->results->pairs_accessed);
}
TestIoSys(tptr); TestIoSys(tptr);
} }
@ -366,6 +369,10 @@ static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep)
fprintf(stdout, fprintf(stdout,
"WARNING: Using actual aggregate bytes moved = %lld.\n", "WARNING: Using actual aggregate bytes moved = %lld.\n",
(long long) results->aggFileSizeFromXfer[rep]); (long long) results->aggFileSizeFromXfer[rep]);
if(params->deadlineForStonewalling){
fprintf(stdout,
"WARNING: maybe caused by deadlineForStonewalling\n");
}
} }
} }
} }
@ -785,6 +792,7 @@ static void DisplayUsage(char **argv)
" -W checkWrite -- check read after write", " -W checkWrite -- check read after write",
" -x singleXferAttempt -- do not retry transfer if incomplete", " -x singleXferAttempt -- do not retry transfer if incomplete",
" -X N reorderTasksRandomSeed -- random seed for -Z option", " -X N reorderTasksRandomSeed -- random seed for -Z option",
" -y stoneWallingWearOut -- once the stonewalling timout is over, all process finish to access the amount of data",
" -Y fsyncPerWrite -- perform fsync after each POSIX write", " -Y fsyncPerWrite -- perform fsync after each POSIX write",
" -z randomOffset -- access is to random, not sequential, offsets within a file", " -z randomOffset -- access is to random, not sequential, offsets within a file",
" -Z reorderTasksRandom -- changes task ordering to random ordering for readback", " -Z reorderTasksRandom -- changes task ordering to random ordering for readback",
@ -1556,8 +1564,8 @@ static void ShowSetup(IOR_param_t *params)
} }
#endif /* HAVE_LUSTRE_LUSTRE_USER_H */ #endif /* HAVE_LUSTRE_LUSTRE_USER_H */
if (params->deadlineForStonewalling > 0) { if (params->deadlineForStonewalling > 0) {
printf("\tUsing stonewalling = %d second(s)\n", printf("\tUsing stonewalling = %d second(s)%s\n",
params->deadlineForStonewalling); params->deadlineForStonewalling, params->stoneWallingWearOut ? " with phase out" : "");
} }
fflush(stdout); fflush(stdout);
} }
@ -1577,6 +1585,7 @@ static void ShowTest(IOR_param_t * test)
fprintf(stdout, "\t%s=%s\n", "hintsFileName", test->hintsFileName); fprintf(stdout, "\t%s=%s\n", "hintsFileName", test->hintsFileName);
fprintf(stdout, "\t%s=%d\n", "deadlineForStonewall", fprintf(stdout, "\t%s=%d\n", "deadlineForStonewall",
test->deadlineForStonewalling); test->deadlineForStonewalling);
fprintf(stdout, "\t%s=%d\n", "stoneWallingWearOut", test->stoneWallingWearOut);
fprintf(stdout, "\t%s=%d\n", "maxTimeDuration", test->maxTimeDuration); fprintf(stdout, "\t%s=%d\n", "maxTimeDuration", test->maxTimeDuration);
fprintf(stdout, "\t%s=%d\n", "outlierThreshold", fprintf(stdout, "\t%s=%d\n", "outlierThreshold",
test->outlierThreshold); test->outlierThreshold);
@ -2078,7 +2087,7 @@ static void TestIoSys(IOR_test_t *test)
CurrentTimeString()); CurrentTimeString());
} }
timer[2][rep] = GetTimeStamp(); timer[2][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, fd, WRITE, &ioBuffers); dataMoved = WriteOrRead(params, results, fd, WRITE, &ioBuffers);
if (params->verbose >= VERBOSE_4) { if (params->verbose >= VERBOSE_4) {
printf("* data moved = %llu\n", dataMoved); printf("* data moved = %llu\n", dataMoved);
fflush(stdout); fflush(stdout);
@ -2131,7 +2140,7 @@ static void TestIoSys(IOR_test_t *test)
GetTestFileName(testFileName, params); GetTestFileName(testFileName, params);
params->open = WRITECHECK; params->open = WRITECHECK;
fd = backend->open(testFileName, params); fd = backend->open(testFileName, params);
dataMoved = WriteOrRead(params, fd, WRITECHECK, &ioBuffers); dataMoved = WriteOrRead(params, results, fd, WRITECHECK, &ioBuffers);
backend->close(fd, params); backend->close(fd, params);
rankOffset = 0; rankOffset = 0;
} }
@ -2203,7 +2212,7 @@ static void TestIoSys(IOR_test_t *test)
CurrentTimeString()); CurrentTimeString());
} }
timer[8][rep] = GetTimeStamp(); timer[8][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, fd, operation_flag, &ioBuffers); dataMoved = WriteOrRead(params, results, fd, operation_flag, &ioBuffers);
timer[9][rep] = GetTimeStamp(); timer[9][rep] = GetTimeStamp();
if (params->intraTestBarriers) if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm), MPI_CHECK(MPI_Barrier(testComm),
@ -2540,22 +2549,71 @@ static IOR_offset_t *GetOffsetArrayRandom(IOR_param_t * test, int pretendRank,
return (offsetArray); return (offsetArray);
} }
static IOR_offset_t WriteOrReadSingle(IOR_offset_t pairCnt, IOR_offset_t *offsetArray, int pretendRank,
IOR_offset_t * transferCount, int * errors, IOR_param_t * test, int * fd, IOR_io_buffers* ioBuffers, int access){
IOR_offset_t amtXferred;
IOR_offset_t transfer;
void *buffer = ioBuffers->buffer;
void *checkBuffer = ioBuffers->checkBuffer;
void *readCheckBuffer = ioBuffers->readCheckBuffer;
test->offset = offsetArray[pairCnt];
transfer = test->transferSize;
if (access == WRITE) {
/*
* fills each transfer with a unique pattern
* containing the offset into the file
*/
if (test->storeFileOffset == TRUE) {
FillBuffer(buffer, test, test->offset, pretendRank);
}
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot write to file");
} else if (access == READ) {
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot read from file");
} else if (access == WRITECHECK) {
memset(checkBuffer, 'a', transfer);
amtXferred =
backend->xfer(access, fd, checkBuffer, transfer,
test);
if (amtXferred != transfer)
ERR("cannot read from file write check");
(*transferCount)++;
*errors += CompareBuffers(buffer, checkBuffer, transfer,
*transferCount, test,
WRITECHECK);
} else if (access == READCHECK) {
amtXferred = backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer){
ERR("cannot read from file");
}
if (test->storeFileOffset == TRUE) {
FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
}
*errors += CompareBuffers(readCheckBuffer, buffer, transfer, *transferCount, test, READCHECK);
}
return amtXferred;
}
/* /*
* Write or Read data to file(s). This loops through the strides, writing * Write or Read data to file(s). This loops through the strides, writing
* out the data to each block in transfer sizes, until the remainder left is 0. * out the data to each block in transfer sizes, until the remainder left is 0.
*/ */
static IOR_offset_t WriteOrRead(IOR_param_t * test, void *fd, int access, IOR_io_buffers* ioBuffers) static IOR_offset_t WriteOrRead(IOR_param_t * test, IOR_results_t * results, void *fd, int access, IOR_io_buffers* ioBuffers)
{ {
int errors = 0; int errors = 0;
IOR_offset_t amtXferred; IOR_offset_t amtXferred;
IOR_offset_t transfer;
IOR_offset_t transferCount = 0; IOR_offset_t transferCount = 0;
IOR_offset_t pairCnt = 0; IOR_offset_t pairCnt = 0;
IOR_offset_t *offsetArray; IOR_offset_t *offsetArray;
int pretendRank; int pretendRank;
void *buffer = ioBuffers->buffer;
void *checkBuffer = ioBuffers->checkBuffer;
void *readCheckBuffer = ioBuffers->readCheckBuffer;
IOR_offset_t dataMoved = 0; /* for data rate calculation */ IOR_offset_t dataMoved = 0; /* for data rate calculation */
double startForStonewall; double startForStonewall;
int hitStonewall; int hitStonewall;
@ -2578,54 +2636,29 @@ static IOR_offset_t WriteOrRead(IOR_param_t * test, void *fd, int access, IOR_io
/* loop over offsets to access */ /* loop over offsets to access */
while ((offsetArray[pairCnt] != -1) && !hitStonewall) { while ((offsetArray[pairCnt] != -1) && !hitStonewall) {
test->offset = offsetArray[pairCnt]; dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
transfer = test->transferSize;
if (access == WRITE) {
/*
* fills each transfer with a unique pattern
* containing the offset into the file
*/
if (test->storeFileOffset == TRUE) {
FillBuffer(buffer, test, test->offset, pretendRank);
}
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot write to file");
} else if (access == READ) {
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot read from file");
} else if (access == WRITECHECK) {
memset(checkBuffer, 'a', transfer);
amtXferred =
backend->xfer(access, fd, checkBuffer, transfer,
test);
if (amtXferred != transfer)
ERR("cannot read from file write check");
transferCount++;
errors += CompareBuffers(buffer, checkBuffer, transfer,
transferCount, test,
WRITECHECK);
} else if (access == READCHECK) {
amtXferred = backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer){
ERR("cannot read from file");
}
if (test->storeFileOffset == TRUE) {
FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
}
errors += CompareBuffers(readCheckBuffer, buffer, transfer, transferCount, test, READCHECK);
}
dataMoved += amtXferred;
pairCnt++; pairCnt++;
hitStonewall = ((test->deadlineForStonewalling != 0) hitStonewall = ((test->deadlineForStonewalling != 0)
&& ((GetTimeStamp() - startForStonewall) && ((GetTimeStamp() - startForStonewall)
> test->deadlineForStonewalling)); > test->deadlineForStonewalling));
} }
if (test->stoneWallingWearOut){
MPI_CHECK(MPI_Allreduce(& pairCnt, &results->pairs_accessed,
1, MPI_LONG_LONG_INT, MPI_MAX, testComm), "cannot reduce pairs moved");
if (verbose >= VERBOSE_1){
printf("%d: stonewalling pairs accessed globally: %lld this rank: %lld\n", rank, (long long) results->pairs_accessed, (long long) pairCnt);
}
if(pairCnt != results->pairs_accessed){
// some work needs still to be done !
for(; pairCnt < results->pairs_accessed; pairCnt++ ) {
dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
}
}
}else{
results->pairs_accessed = pairCnt;
}
totalErrorCount += CountErrors(test, access, errors); totalErrorCount += CountErrors(test, access, errors);

View File

@ -5,7 +5,7 @@
* 8-chars of indenting is ridiculous. If you really want 8-spaces, * 8-chars of indenting is ridiculous. If you really want 8-spaces,
* then change the mode-line to use tabs, and configure your personal * then change the mode-line to use tabs, and configure your personal
* editor environment to use 8-space tab-stops. * editor environment to use 8-space tab-stops.
* *
*/ */
/******************************************************************************\ /******************************************************************************\
* * * *
@ -67,7 +67,7 @@ enum PACKET_TYPE
/* A struct to hold the buffers so we can pass 1 pointer around instead of 3 /* A struct to hold the buffers so we can pass 1 pointer around instead of 3
*/ */
typedef struct IO_BUFFERS typedef struct IO_BUFFERS
{ {
void* buffer; void* buffer;
void* checkBuffer; void* checkBuffer;
@ -141,6 +141,7 @@ typedef struct
int useExistingTestFile; /* do not delete test file before access */ int useExistingTestFile; /* do not delete test file before access */
int storeFileOffset; /* use file offset as stored signature */ int storeFileOffset; /* use file offset as stored signature */
int deadlineForStonewalling; /* max time in seconds to run any test phase */ int deadlineForStonewalling; /* max time in seconds to run any test phase */
int stoneWallingWearOut; /* wear out the stonewalling, once the timout is over, each process has to write the same amount */
int maxTimeDuration; /* max time in minutes to run each test */ int maxTimeDuration; /* max time in minutes to run each test */
int outlierThreshold; /* warn on outlier N seconds from mean */ int outlierThreshold; /* warn on outlier N seconds from mean */
int verbose; /* verbosity */ int verbose; /* verbosity */
@ -215,6 +216,7 @@ typedef struct
typedef struct { typedef struct {
double *writeTime; double *writeTime;
double *readTime; double *readTime;
size_t pairs_accessed; // number of I/Os done, useful for deadlineForStonewalling
IOR_offset_t *aggFileSizeFromStat; IOR_offset_t *aggFileSizeFromStat;
IOR_offset_t *aggFileSizeFromXfer; IOR_offset_t *aggFileSizeFromXfer;
IOR_offset_t *aggFileSizeForBW; IOR_offset_t *aggFileSizeForBW;

View File

@ -169,6 +169,8 @@ void DecodeDirective(char *line, IOR_param_t *params)
strcpy(params->hintsFileName, value); strcpy(params->hintsFileName, value);
} else if (strcasecmp(option, "deadlineforstonewalling") == 0) { } else if (strcasecmp(option, "deadlineforstonewalling") == 0) {
params->deadlineForStonewalling = atoi(value); params->deadlineForStonewalling = atoi(value);
} else if (strcasecmp(option, "stoneWallingWearOut") == 0) {
params->stoneWallingWearOut = atoi(value);
} else if (strcasecmp(option, "maxtimeduration") == 0) { } else if (strcasecmp(option, "maxtimeduration") == 0) {
params->maxTimeDuration = atoi(value); params->maxTimeDuration = atoi(value);
} else if (strcasecmp(option, "outlierthreshold") == 0) { } else if (strcasecmp(option, "outlierthreshold") == 0) {
@ -430,7 +432,7 @@ IOR_test_t *ReadConfigScript(char *scriptName)
IOR_test_t *ParseCommandLine(int argc, char **argv) IOR_test_t *ParseCommandLine(int argc, char **argv)
{ {
static const char *opts = static const char *opts =
"a:A:b:BcCd:D:eEf:FgG:hHi:Ij:J:kKl:mM:nN:o:O:pPqQ:rRs:St:T:uU:vVwWxX:YzZ"; "a:A:b:BcCd:D:eEf:FgG:hHi:Ij:J:kKl:mM:nN:o:O:pPqQ:rRs:St:T:uU:vVwWxX:YzZy";
int c, i; int c, i;
static IOR_test_t *tests = NULL; static IOR_test_t *tests = NULL;
@ -523,13 +525,13 @@ IOR_test_t *ParseCommandLine(int argc, char **argv)
initialTestParams.dataPacketType = incompressible; initialTestParams.dataPacketType = incompressible;
break; break;
case 't': /* timestamp */ case 't': /* timestamp */
initialTestParams.dataPacketType = timestamp; initialTestParams.dataPacketType = timestamp;
break; break;
case 'o': /* offset packet */ case 'o': /* offset packet */
initialTestParams.storeFileOffset = TRUE; initialTestParams.storeFileOffset = TRUE;
initialTestParams.dataPacketType = offset; initialTestParams.dataPacketType = offset;
break; break;
default: default:
fprintf(stdout, fprintf(stdout,
"Unknown arguement for -l %s generic assumed\n", optarg); "Unknown arguement for -l %s generic assumed\n", optarg);
break; break;
@ -610,6 +612,9 @@ IOR_test_t *ParseCommandLine(int argc, char **argv)
case 'X': case 'X':
initialTestParams.reorderTasksRandomSeed = atoi(optarg); initialTestParams.reorderTasksRandomSeed = atoi(optarg);
break; break;
case 'y':
initialTestParams.stoneWallingWearOut = TRUE;
break;
case 'Y': case 'Y':
initialTestParams.fsyncPerWrite = TRUE; initialTestParams.fsyncPerWrite = TRUE;
break; break;