mdtest/src/ior.c

2146 lines
85 KiB
C
Raw Normal View History

/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*/
2011-06-17 23:20:43 +04:00
/******************************************************************************\
* *
* Copyright (c) 2003, The Regents of the University of California *
* See the file COPYRIGHT for a complete copyright notice and license. *
* *
\******************************************************************************/
#ifdef HAVE_CONFIG_H
2014-07-31 03:17:21 +04:00
# include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h> /* tolower() */
#include <errno.h>
2011-06-17 23:20:43 +04:00
#include <math.h>
#include <mpi.h>
#include <string.h>
#include <sys/stat.h> /* struct stat */
2011-06-17 23:20:43 +04:00
#include <time.h>
2014-07-31 03:17:21 +04:00
2011-06-17 23:20:43 +04:00
#ifndef _WIN32
2014-07-31 03:17:21 +04:00
# include <sys/time.h> /* gettimeofday() */
# include <sys/utsname.h> /* uname() */
2011-06-17 23:20:43 +04:00
#endif
2014-07-31 03:17:21 +04:00
#include <assert.h>
2011-06-17 23:20:43 +04:00
#include "ior.h"
2018-07-08 15:38:05 +03:00
#include "ior-internal.h"
#include "aiori.h"
#include "utilities.h"
#include "parse_options.h"
2011-06-17 23:20:43 +04:00
/* file scope globals */
extern char **environ;
2018-07-07 13:42:21 +03:00
static int totalErrorCount;
static const ior_aiori_t *backend;
static void DestroyTests(IOR_test_t *tests_head);
static void DisplayUsage(char **);
static char *PrependDir(IOR_param_t *, char *);
static char **ParseFileName(char *, int *);
static IOR_test_t *SetupTests(int, char **);
static void TestIoSys(IOR_test_t *);
static void ValidateTests(IOR_param_t *);
static IOR_offset_t WriteOrRead(IOR_param_t * test, IOR_results_t * results, void *fd, int access, IOR_io_buffers* ioBuffers);
static void WriteTimes(IOR_param_t *, double **, int, int);
2018-07-07 13:42:21 +03:00
IOR_test_t * ior_run(int argc, char **argv, MPI_Comm world_com, FILE * world_out){
IOR_test_t *tests_head;
IOR_test_t *tptr;
2018-07-07 13:42:21 +03:00
out_logfile = world_out;
2018-07-08 15:47:55 +03:00
out_resultfile = world_out;
2018-07-07 13:42:21 +03:00
mpi_comm_world = world_com;
2011-06-17 23:20:43 +04:00
2018-07-07 13:42:21 +03:00
MPI_CHECK(MPI_Comm_size(mpi_comm_world, &numTasksWorld), "cannot get number of tasks");
MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
PrintEarlyHeader();
/* Sanity check, we were compiled with SOME backend, right? */
if (0 == aiori_count ()) {
ERR("No IO backends compiled into ior. "
"Run 'configure --with-<backend>', and recompile.");
}
/* setup tests, and validate parameters */
tests_head = SetupTests(argc, argv);
verbose = tests_head->params.verbose;
2018-07-07 13:42:21 +03:00
tests_head->params.testComm = world_com;
/* check for commandline 'help' request */
if (rank == 0 && tests_head->params.showHelp == TRUE) {
DisplayUsage(argv);
}
PrintHeader(argc, argv);
2011-12-11 13:50:19 +04:00
/* perform each test */
for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
2018-07-07 13:42:21 +03:00
totalErrorCount = 0;
verbose = tptr->params.verbose;
2018-07-08 15:07:32 +03:00
tptr->params.testComm = world_com;
if (rank == 0 && verbose >= VERBOSE_0) {
ShowTestInfo(&tptr->params);
}
if (rank == 0 && verbose >= VERBOSE_3) {
ShowTest(&tptr->params);
}
TestIoSys(tptr);
2018-07-07 13:42:21 +03:00
tptr->results->errors = totalErrorCount;
ShowTestEnd(tptr);
}
PrintLongSummaryAllTests(tests_head);
/* display finish time */
PrintTestEnds();
2018-07-07 13:42:21 +03:00
return tests_head;
}
2018-07-07 13:42:21 +03:00
int ior_main(int argc, char **argv)
{
int i;
IOR_test_t *tests_head;
IOR_test_t *tptr;
2018-07-08 15:07:32 +03:00
2018-07-07 13:42:21 +03:00
out_logfile = stdout;
2018-07-08 15:47:55 +03:00
out_resultfile = stdout;
2018-07-07 13:42:21 +03:00
/*
* check -h option from commandline without starting MPI;
* if the help option is requested in a script file (showHelp=TRUE),
* the help output will be displayed in the MPI job
*/
for (i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0) {
DisplayUsage(argv);
return (0);
}
}
#ifdef USE_S3_AIORI
/* This is supposed to be done before *any* threads are created.
* Could MPI_Init() create threads (or call multi-threaded
* libraries)? We'll assume so. */
AWS4C_CHECK( aws_init() );
#endif
/* start the MPI code */
MPI_CHECK(MPI_Init(&argc, &argv), "cannot initialize MPI");
mpi_comm_world = MPI_COMM_WORLD;
MPI_CHECK(MPI_Comm_size(mpi_comm_world, &numTasksWorld),
"cannot get number of tasks");
MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
2018-07-08 15:07:32 +03:00
2018-07-07 13:42:21 +03:00
PrintEarlyHeader();
/* set error-handling */
/*MPI_CHECK(MPI_Errhandler_set(mpi_comm_world, MPI_ERRORS_RETURN),
"cannot set errhandler"); */
/* Sanity check, we were compiled with SOME backend, right? */
if (0 == aiori_count ()) {
ERR("No IO backends compiled into ior. "
"Run 'configure --with-<backend>', and recompile.");
}
/* setup tests, and validate parameters */
tests_head = SetupTests(argc, argv);
verbose = tests_head->params.verbose;
tests_head->params.testComm = mpi_comm_world;
/* check for commandline 'help' request */
if (tests_head->params.showHelp == TRUE) {
if( rank == 0 ){
DisplayUsage(argv);
}
MPI_Finalize();
exit(0);
}
PrintHeader(argc, argv);
/* perform each test */
for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
verbose = tptr->params.verbose;
if (rank == 0 && verbose >= VERBOSE_0) {
ShowTestInfo(&tptr->params);
}
if (rank == 0 && verbose >= VERBOSE_3) {
ShowTest(&tptr->params);
}
2018-07-07 13:42:21 +03:00
// This is useful for trapping a running MPI process. While
// this is sleeping, run the script 'testing/hdfs/gdb.attach'
if (verbose >= VERBOSE_4) {
fprintf(out_logfile, "\trank %d: sleeping\n", rank);
sleep(5);
fprintf(out_logfile, "\trank %d: awake.\n", rank);
}
TestIoSys(tptr);
ShowTestEnd(tptr);
2018-07-07 13:42:21 +03:00
}
if (verbose < 0)
/* always print final summary */
verbose = 0;
PrintLongSummaryAllTests(tests_head);
/* display finish time */
PrintTestEnds();
2018-07-07 13:42:21 +03:00
DestroyTests(tests_head);
MPI_CHECK(MPI_Finalize(), "cannot finalize MPI");
#ifdef USE_S3_AIORI
/* done once per program, after exiting all threads.
* NOTE: This fn doesn't return a value that can be checked for success. */
aws_cleanup();
#endif
return totalErrorCount;
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/***************************** F U N C T I O N S ******************************/
/*
* Initialize an IOR_param_t structure to the defaults
*/
void init_IOR_Param_t(IOR_param_t * p)
{
const char *default_aiori = aiori_default ();
2018-04-23 23:10:04 +03:00
char *hdfs_user;
assert (NULL != default_aiori);
memset(p, 0, sizeof(IOR_param_t));
p->mode = IOR_IRUSR | IOR_IWUSR | IOR_IRGRP | IOR_IWGRP;
p->openFlags = IOR_RDWR | IOR_CREAT;
strncpy(p->api, default_aiori, MAX_STR);
strncpy(p->platform, "HOST(OSTYPE)", MAX_STR);
2018-07-08 00:39:14 +03:00
strncpy(p->testFileName, "testFile", MAX_PATHLEN);
p->nodes = 1;
p->tasksPerNode = 1;
p->repetitions = 1;
p->repCounter = -1;
p->open = WRITE;
p->taskPerNodeOffset = 1;
p->segmentCount = 1;
p->blockSize = 1048576;
p->transferSize = 262144;
p->randomSeed = -1;
p->incompressibleSeed = 573;
2018-07-07 13:42:21 +03:00
p->testComm = mpi_comm_world;
p->setAlignment = 1;
p->lustre_start_ost = -1;
2018-04-23 23:10:04 +03:00
hdfs_user = getenv("USER");
if (!hdfs_user)
hdfs_user = "";
strncpy(p->hdfs_user, hdfs_user, MAX_STR);
p->hdfs_name_node = "default";
p->hdfs_name_node_port = 0; /* ??? */
p->hdfs_fs = NULL;
p->hdfs_replicas = 0; /* invokes the default */
p->hdfs_block_size = 0;
// p->curl = NULL;
p->URI = NULL;
p->curl_flags = 0;
p->io_buf = NULL;
p->etags = NULL;
p->part_number = 0;
p->beegfs_numTargets = -1;
p->beegfs_chunkSize = -1;
p->mmap_ptr = NULL;
}
/**
* Bind the global "backend" pointer to the requested backend AIORI's
* function table.
*/
static void AioriBind(char* api, IOR_param_t* param)
2011-06-17 23:20:43 +04:00
{
backend = aiori_select (api);
if (NULL != backend) {
if (! strncmp(api, "S3", 2)) {
if (! strcasecmp(api, "S3_EMC")) {
param->curl_flags |= IOR_CURL_S3_EMC_EXT;
} else {
param->curl_flags &= ~(IOR_CURL_S3_EMC_EXT);
}
}
} else {
ERR("unrecognized IO API");
}
}
2011-06-17 23:20:43 +04:00
static void
DisplayOutliers(int numTasks,
2011-06-17 23:20:43 +04:00
double timerVal,
char *timeString, int access, int outlierThreshold)
2011-06-17 23:20:43 +04:00
{
char accessString[MAX_STR];
double sum, mean, sqrDiff, var, sd;
/* for local timerVal, don't compensate for wall clock delta */
timerVal += wall_clock_delta;
MPI_CHECK(MPI_Allreduce
(&timerVal, &sum, 1, MPI_DOUBLE, MPI_SUM, testComm),
"MPI_Allreduce()");
mean = sum / numTasks;
sqrDiff = pow((mean - timerVal), 2);
MPI_CHECK(MPI_Allreduce
(&sqrDiff, &var, 1, MPI_DOUBLE, MPI_SUM, testComm),
"MPI_Allreduce()");
var = var / numTasks;
sd = sqrt(var);
2011-06-17 23:20:43 +04:00
if (access == WRITE) {
strcpy(accessString, "write");
} else { /* READ */
strcpy(accessString, "read");
}
if (fabs(timerVal - mean) > (double)outlierThreshold) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "WARNING: for task %d, %s %s is %f\n",
rank, accessString, timeString, timerVal);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, " (mean=%f, stddev=%f)\n", mean, sd);
fflush(out_logfile);
}
}
2011-06-17 23:20:43 +04:00
/*
* Check for outliers in start/end times and elapsed create/xfer/close times.
*/
static void CheckForOutliers(IOR_param_t * test, double **timer, int rep,
int access)
2011-06-17 23:20:43 +04:00
{
int shift;
2011-06-17 23:20:43 +04:00
if (access == WRITE) {
shift = 0;
} else { /* READ */
shift = 6;
}
2011-06-17 23:20:43 +04:00
DisplayOutliers(test->numTasks, timer[shift + 0][rep],
"start time", access, test->outlierThreshold);
DisplayOutliers(test->numTasks,
timer[shift + 1][rep] - timer[shift + 0][rep],
"elapsed create time", access, test->outlierThreshold);
DisplayOutliers(test->numTasks,
timer[shift + 3][rep] - timer[shift + 2][rep],
"elapsed transfer time", access,
test->outlierThreshold);
DisplayOutliers(test->numTasks,
timer[shift + 5][rep] - timer[shift + 4][rep],
"elapsed close time", access, test->outlierThreshold);
DisplayOutliers(test->numTasks, timer[shift + 5][rep], "end time",
access, test->outlierThreshold);
}
2011-06-17 23:20:43 +04:00
/*
* Check if actual file size equals expected size; if not use actual for
* calculating performance rate.
*/
static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep)
2011-06-17 23:20:43 +04:00
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
MPI_CHECK(MPI_Allreduce(&dataMoved, &results->aggFileSizeFromXfer[rep],
1, MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
if (strcasecmp(params->api, "HDF5") != 0 && strcasecmp(params->api, "NCMPI") != 0) {
if (verbose >= VERBOSE_0 && rank == 0) {
if ((params->expectedAggFileSize
!= results->aggFileSizeFromXfer[rep])
|| (results->aggFileSizeFromStat[rep]
!= results->aggFileSizeFromXfer[rep])) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"WARNING: Expected aggregate file size = %lld.\n",
2011-12-14 10:04:27 +04:00
(long long) params->expectedAggFileSize);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"WARNING: Stat() of aggregate file size = %lld.\n",
2011-12-14 10:04:27 +04:00
(long long) results->aggFileSizeFromStat[rep]);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"WARNING: Using actual aggregate bytes moved = %lld.\n",
2011-12-14 10:04:27 +04:00
(long long) results->aggFileSizeFromXfer[rep]);
if(params->deadlineForStonewalling){
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"WARNING: maybe caused by deadlineForStonewalling\n");
}
}
}
2011-06-17 23:20:43 +04:00
}
results->aggFileSizeForBW[rep] = results->aggFileSizeFromXfer[rep];
}
2011-06-17 23:20:43 +04:00
/*
* Compare buffers after reading/writing each transfer. Displays only first
* difference in buffers and returns total errors counted.
*/
static size_t
CompareBuffers(void *expectedBuffer,
void *unknownBuffer,
size_t size,
IOR_offset_t transferCount, IOR_param_t *test, int access)
2011-06-17 23:20:43 +04:00
{
2018-07-08 00:39:14 +03:00
char testFileName[MAX_PATHLEN];
2011-11-12 03:11:28 +04:00
char bufferLabel1[MAX_STR];
char bufferLabel2[MAX_STR];
size_t i, j, length, first, last;
size_t errorCount = 0;
int inError = 0;
unsigned long long *goodbuf = (unsigned long long *)expectedBuffer;
unsigned long long *testbuf = (unsigned long long *)unknownBuffer;
if (access == WRITECHECK || access == READCHECK) {
strcpy(bufferLabel1, "Expected: ");
strcpy(bufferLabel2, "Actual: ");
} else {
ERR("incorrect argument for CompareBuffers()");
}
2011-06-17 23:20:43 +04:00
length = size / sizeof(IOR_size_t);
first = -1;
if (verbose >= VERBOSE_3) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"[%d] At file byte offset %lld, comparing %llu-byte transfer\n",
rank, test->offset, (long long)size);
}
for (i = 0; i < length; i++) {
if (testbuf[i] != goodbuf[i]) {
errorCount++;
if (verbose >= VERBOSE_2) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"[%d] At transfer buffer #%lld, index #%lld (file byte offset %lld):\n",
rank, transferCount - 1, (long long)i,
test->offset +
(IOR_size_t) (i * sizeof(IOR_size_t)));
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel1);
fprintf(out_logfile, "%016llx\n", goodbuf[i]);
fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel2);
fprintf(out_logfile, "%016llx\n", testbuf[i]);
}
if (!inError) {
inError = 1;
first = i;
last = i;
} else {
last = i;
}
} else if (verbose >= VERBOSE_5 && i % 4 == 0) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"[%d] PASSED offset = %lld bytes, transfer %lld\n",
rank,
((i * sizeof(unsigned long long)) +
test->offset), transferCount);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "[%d] GOOD %s0x", rank, bufferLabel1);
for (j = 0; j < 4; j++)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "%016llx ", goodbuf[i + j]);
fprintf(out_logfile, "\n[%d] GOOD %s0x", rank, bufferLabel2);
for (j = 0; j < 4; j++)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "%016llx ", testbuf[i + j]);
fprintf(out_logfile, "\n");
}
}
if (inError) {
inError = 0;
GetTestFileName(testFileName, test);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"[%d] FAILED comparison of buffer containing %d-byte ints:\n",
rank, (int)sizeof(unsigned long long int));
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "[%d] File name = %s\n", rank, testFileName);
fprintf(out_logfile, "[%d] In transfer %lld, ", rank,
transferCount);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"%lld errors between buffer indices %lld and %lld.\n",
(long long)errorCount, (long long)first,
(long long)last);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "[%d] File byte offset = %lld:\n", rank,
((first * sizeof(unsigned long long)) + test->offset));
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel1);
for (j = first; j < length && j < first + 4; j++)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "%016llx ", goodbuf[j]);
if (j == length)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "[end of buffer]");
fprintf(out_logfile, "\n[%d] %s0x", rank, bufferLabel2);
for (j = first; j < length && j < first + 4; j++)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "%016llx ", testbuf[j]);
if (j == length)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "[end of buffer]");
fprintf(out_logfile, "\n");
if (test->quitOnError == TRUE)
ERR("data check error, aborting execution");
2011-06-17 23:20:43 +04:00
}
return (errorCount);
2011-11-12 03:11:28 +04:00
}
2011-11-12 03:11:28 +04:00
/*
* Count all errors across all tasks; report errors found.
*/
static int CountErrors(IOR_param_t * test, int access, int errors)
{
int allErrors = 0;
if (test->checkWrite || test->checkRead) {
MPI_CHECK(MPI_Reduce(&errors, &allErrors, 1, MPI_INT, MPI_SUM,
0, testComm), "cannot reduce errors");
MPI_CHECK(MPI_Bcast(&allErrors, 1, MPI_INT, 0, testComm),
"cannot broadcast allErrors value");
if (allErrors != 0) {
totalErrorCount += allErrors;
test->errorFound = TRUE;
}
if (rank == 0 && allErrors != 0) {
if (allErrors < 0) {
WARN("overflow in errors counted");
allErrors = -1;
}
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "WARNING: incorrect data on %s (%d errors found).\n",
access == WRITECHECK ? "write" : "read", allErrors);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"Used Time Stamp %u (0x%x) for Data Signature\n",
test->timeStampSignatureValue,
test->timeStampSignatureValue);
}
}
return (allErrors);
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Allocate a page-aligned (required by O_DIRECT) buffer.
*/
static void *aligned_buffer_alloc(size_t size)
2011-06-17 23:20:43 +04:00
{
size_t pageSize;
size_t pageMask;
char *buf, *tmp;
char *aligned;
pageSize = getpagesize();
pageMask = pageSize - 1;
buf = malloc(size + pageSize + sizeof(void *));
if (buf == NULL)
ERR("out of memory");
/* find the alinged buffer */
tmp = buf + sizeof(char *);
aligned = tmp + pageSize - ((size_t) tmp & pageMask);
/* write a pointer to the original malloc()ed buffer into the bytes
preceding "aligned", so that the aligned buffer can later be free()ed */
tmp = aligned - sizeof(void *);
*(void **)tmp = buf;
return (void *)aligned;
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Free a buffer allocated by aligned_buffer_alloc().
*/
static void aligned_buffer_free(void *buf)
{
free(*(void **)((char *)buf - sizeof(char *)));
}
void AllocResults(IOR_test_t *test)
{
int reps;
if (test->results != NULL)
return;
reps = test->params.repetitions;
test->results = (IOR_results_t *)malloc(sizeof(IOR_results_t));
if (test->results == NULL)
ERR("malloc of IOR_results_t failed");
test->results->writeTime = (double *)malloc(reps * sizeof(double));
if (test->results->writeTime == NULL)
ERR("malloc of writeTime array failed");
memset(test->results->writeTime, 0, reps * sizeof(double));
test->results->readTime = (double *)malloc(reps * sizeof(double));
if (test->results->readTime == NULL)
ERR("malloc of readTime array failed");
memset(test->results->readTime, 0, reps * sizeof(double));
test->results->aggFileSizeFromStat =
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
if (test->results->aggFileSizeFromStat == NULL)
ERR("malloc of aggFileSizeFromStat failed");
test->results->aggFileSizeFromXfer =
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
if (test->results->aggFileSizeFromXfer == NULL)
ERR("malloc of aggFileSizeFromXfer failed");
test->results->aggFileSizeForBW =
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
if (test->results->aggFileSizeForBW == NULL)
ERR("malloc of aggFileSizeForBW failed");
}
void FreeResults(IOR_test_t *test)
{
if (test->results != NULL) {
free(test->results->aggFileSizeFromStat);
free(test->results->aggFileSizeFromXfer);
free(test->results->aggFileSizeForBW);
free(test->results->readTime);
free(test->results->writeTime);
free(test->results);
}
}
/**
2011-06-17 23:20:43 +04:00
* Create new test for list of tests.
*/
IOR_test_t *CreateTest(IOR_param_t *init_params, int test_num)
2011-06-17 23:20:43 +04:00
{
IOR_test_t *newTest = NULL;
newTest = (IOR_test_t *) malloc(sizeof(IOR_test_t));
if (newTest == NULL)
ERR("malloc() of IOR_test_t failed");
newTest->params = *init_params;
GetPlatformName(newTest->params.platform);
newTest->params.nodes = init_params->numTasks / tasksPerNode;
newTest->params.tasksPerNode = tasksPerNode;
newTest->params.id = test_num;
newTest->next = NULL;
newTest->results = NULL;
return newTest;
}
static void DestroyTest(IOR_test_t *test)
{
FreeResults(test);
free(test);
}
static void DestroyTests(IOR_test_t *tests_head)
{
IOR_test_t *tptr, *next;
for (tptr = tests_head; tptr != NULL; tptr = next) {
next = tptr->next;
DestroyTest(tptr);
}
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Display usage of script file.
*/
static void DisplayUsage(char **argv)
2011-06-17 23:20:43 +04:00
{
char *opts[] = {
"OPTIONS:",
" -a S api -- API for I/O [POSIX|MMAP|MPIIO|HDF5|HDFS|S3|S3_EMC|NCMPI]",
2012-01-14 02:05:13 +04:00
" -A N refNum -- user supplied reference number to include in the summary",
" -b N blockSize -- contiguous bytes to write per task (e.g.: 8, 4k, 2m, 1g)",
" -B useO_DIRECT -- uses O_DIRECT for POSIX, bypassing I/O buffers",
" -c collective -- collective I/O",
" -C reorderTasks -- changes task ordering to n+1 ordering for readback",
" -d N interTestDelay -- delay between reps in seconds",
" -D N deadlineForStonewalling -- seconds before stopping write or read phase",
" -O stoneWallingWearOut=1 -- once the stonewalling timout is over, all process finish to access the amount of data",
" -O stoneWallingWearOutIterations=N -- stop after processing this number of iterations, needed for reading data back written with stoneWallingWearOut",
2018-07-08 02:16:30 +03:00
" -O stoneWallingStatusFile=FILE -- this file keeps the number of iterations from stonewalling during write and allows to use them for read",
" -e fsync -- perform fsync/msync upon POSIX/MMAP write close",
" -E useExistingTestFile -- do not remove test file before write access",
" -f S scriptFile -- test script name",
" -F filePerProc -- file-per-process",
" -g intraTestBarriers -- use barriers between open, write/read, and close",
" -G N setTimeStampSignature -- set value for time stamp signature/random seed",
" -h showHelp -- displays options and help",
" -H showHints -- show hints",
" -i N repetitions -- number of repetitions of test",
" -I individualDataSets -- datasets not shared by all procs [not working]",
" -j N outlierThreshold -- warn on outlier N seconds from mean",
" -J N setAlignment -- HDF5 alignment in bytes (e.g.: 8, 4k, 2m, 1g)",
" -k keepFile -- don't remove the test file(s) on program exit",
" -K keepFileWithError -- keep error-filled file(s) after data-checking",
" -l datapacket type-- type of packet that will be created [offset|incompressible|timestamp|o|i|t]",
" -m multiFile -- use number of reps (-i) for multiple file count",
2012-01-14 03:47:37 +04:00
" -M N memoryPerNode -- hog memory on the node (e.g.: 2g, 75%)",
" -n noFill -- no fill in HDF5 file creation",
" -N N numTasks -- number of tasks that should participate in the test",
" -o S testFile -- full name for test",
" -O S string of IOR directives (e.g. -O checkRead=1,lustreStripeCount=32)",
" -p preallocate -- preallocate file size",
" -P useSharedFilePointer -- use shared file pointer [not working]",
" -q quitOnError -- during file error-checking, abort on error",
2012-01-13 08:49:56 +04:00
" -Q N taskPerNodeOffset for read tests use with -C & -Z options (-C constant N, -Z at least N)",
" -r readFile -- read existing file",
" -R checkRead -- verify that the output of read matches the expected signature (used with -G)",
" -s N segmentCount -- number of segments",
" -S useStridedDatatype -- put strided access into datatype [not working]",
" -t N transferSize -- size of transfer in bytes (e.g.: 8, 4k, 2m, 1g)",
2018-07-07 14:20:29 +03:00
" -T N maxTimeDuration -- max time in minutes executing repeated test; it aborts only between iterations and not within a test!",
" -u uniqueDir -- use unique directory name for each file-per-process",
" -U S hintsFileName -- full name for hints file",
" -v verbose -- output information (repeating flag increases level)",
" -V useFileView -- use MPI_File_set_view",
" -w writeFile -- write file",
" -W checkWrite -- check read after write",
" -x singleXferAttempt -- do not retry transfer if incomplete",
2012-01-13 08:49:56 +04:00
" -X N reorderTasksRandomSeed -- random seed for -Z option",
" -Y fsyncPerWrite -- perform fsync/msync after each POSIX/MMAP write",
" -z randomOffset -- access is to random, not sequential, offsets within a file",
2012-01-13 08:49:56 +04:00
" -Z reorderTasksRandom -- changes task ordering to random ordering for readback",
2018-07-08 15:47:55 +03:00
" -O summaryFile=FILE -- store result data into this file",
" -O summaryFormat=[default,JSON,CSV] -- use the format for outputing the summary",
" ",
" NOTE: S is a string, N is an integer number.",
" ",
""
};
int i = 0;
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "Usage: %s [OPTIONS]\n\n", *argv);
for (i = 0; strlen(opts[i]) > 0; i++)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "%s\n", opts[i]);
2011-06-17 23:20:43 +04:00
return;
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Distribute IOR_HINTs to all tasks' environments.
*/
void DistributeHints(void)
2011-06-17 23:20:43 +04:00
{
char hint[MAX_HINTS][MAX_STR], fullHint[MAX_STR], hintVariable[MAX_STR];
int hintCount = 0, i;
if (rank == 0) {
for (i = 0; environ[i] != NULL; i++) {
if (strncmp(environ[i], "IOR_HINT", strlen("IOR_HINT"))
== 0) {
hintCount++;
if (hintCount == MAX_HINTS) {
WARN("exceeded max hints; reset MAX_HINTS and recompile");
hintCount = MAX_HINTS;
break;
}
/* assume no IOR_HINT is greater than MAX_STR in length */
strncpy(hint[hintCount - 1], environ[i],
MAX_STR - 1);
}
2011-06-17 23:20:43 +04:00
}
}
MPI_CHECK(MPI_Bcast(&hintCount, sizeof(hintCount), MPI_BYTE,
2011-06-17 23:20:43 +04:00
0, MPI_COMM_WORLD), "cannot broadcast hints");
for (i = 0; i < hintCount; i++) {
MPI_CHECK(MPI_Bcast(&hint[i], MAX_STR, MPI_BYTE,
0, MPI_COMM_WORLD),
"cannot broadcast hints");
strcpy(fullHint, hint[i]);
strcpy(hintVariable, strtok(fullHint, "="));
if (getenv(hintVariable) == NULL) {
/* doesn't exist in this task's environment; better set it */
if (putenv(hint[i]) != 0)
WARN("cannot set environment variable");
}
2011-06-17 23:20:43 +04:00
}
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Fill buffer, which is transfer size bytes long, with known 8-byte long long
* int values. In even-numbered 8-byte long long ints, store MPI task in high
* bits and timestamp signature in low bits. In odd-numbered 8-byte long long
* ints, store transfer offset. If storeFileOffset option is used, the file
* (not transfer) offset is stored instead.
*/
static void
FillIncompressibleBuffer(void* buffer, IOR_param_t * test)
{
size_t i;
unsigned long long hi, lo;
unsigned long long *buf = (unsigned long long *)buffer;
for (i = 0; i < test->transferSize / sizeof(unsigned long long); i++) {
hi = ((unsigned long long) rand_r(&test->incompressibleSeed) << 32);
lo = (unsigned long long) rand_r(&test->incompressibleSeed);
buf[i] = hi | lo;
}
}
unsigned int reseed_incompressible_prng = TRUE;
static void
FillBuffer(void *buffer,
IOR_param_t * test, unsigned long long offset, int fillrank)
2011-06-17 23:20:43 +04:00
{
size_t i;
unsigned long long hi, lo;
unsigned long long *buf = (unsigned long long *)buffer;
if(test->dataPacketType == incompressible ) { /* Make for some non compressable buffers with randomish data */
/* In order for write checks to work, we have to restart the psuedo random sequence */
if(reseed_incompressible_prng == TRUE) {
test->incompressibleSeed = test->setTimeStampSignature + rank; /* We copied seed into timestampSignature at initialization, also add the rank to add randomness between processes */
reseed_incompressible_prng = FALSE;
}
FillIncompressibleBuffer(buffer, test);
}
else {
hi = ((unsigned long long)fillrank) << 32;
lo = (unsigned long long)test->timeStampSignatureValue;
for (i = 0; i < test->transferSize / sizeof(unsigned long long); i++) {
if ((i % 2) == 0) {
/* evens contain MPI rank and time in seconds */
buf[i] = hi | lo;
} else {
/* odds contain offset */
buf[i] = offset + (i * sizeof(unsigned long long));
}
}
2011-06-17 23:20:43 +04:00
}
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Return string describing machine name and type.
*/
void GetPlatformName(char *platformName)
2011-06-17 23:20:43 +04:00
{
char nodeName[MAX_STR], *p, *start, sysName[MAX_STR];
struct utsname name;
if (uname(&name) != 0) {
2011-12-15 01:40:25 +04:00
EWARN("cannot get platform name");
sprintf(sysName, "%s", "Unknown");
sprintf(nodeName, "%s", "Unknown");
2011-06-17 23:20:43 +04:00
} else {
sprintf(sysName, "%s", name.sysname);
sprintf(nodeName, "%s", name.nodename);
2011-06-17 23:20:43 +04:00
}
start = nodeName;
if (strlen(nodeName) == 0) {
p = start;
} else {
/* point to one character back from '\0' */
p = start + strlen(nodeName) - 1;
}
/*
* to cut off trailing node number, search backwards
* for the first non-numeric character
*/
while (p != start) {
if (*p < '0' || *p > '9') {
*(p + 1) = '\0';
break;
} else {
p--;
}
}
2011-06-17 23:20:43 +04:00
sprintf(platformName, "%s(%s)", nodeName, sysName);
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Parse file name.
*/
static char **ParseFileName(char *name, int *count)
2011-06-17 23:20:43 +04:00
{
char **fileNames, *tmp, *token;
char delimiterString[3] = { FILENAME_DELIMITER, '\n', '\0' };
int i = 0;
*count = 0;
tmp = name;
/* pass one */
/* if something there, count the first item */
if (*tmp != '\0') {
(*count)++;
}
/* count the rest of the filenames */
while (*tmp != '\0') {
if (*tmp == FILENAME_DELIMITER) {
(*count)++;
}
tmp++;
}
2011-06-17 23:20:43 +04:00
fileNames = (char **)malloc((*count) * sizeof(char **));
if (fileNames == NULL)
ERR("out of memory");
/* pass two */
token = strtok(name, delimiterString);
while (token != NULL) {
fileNames[i] = token;
token = strtok(NULL, delimiterString);
i++;
}
return (fileNames);
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
2018-07-08 15:38:05 +03:00
2011-06-17 23:20:43 +04:00
/*
2018-07-08 15:38:05 +03:00
* Return test file name to access.
* for single shared file, fileNames[0] is returned in testFileName
2011-06-17 23:20:43 +04:00
*/
2018-07-08 15:38:05 +03:00
void GetTestFileName(char *testFileName, IOR_param_t * test)
2011-06-17 23:20:43 +04:00
{
2018-07-08 15:38:05 +03:00
char **fileNames;
char initialTestFileName[MAX_PATHLEN];
char testFileNameRoot[MAX_STR];
char tmpString[MAX_STR];
int count;
2018-07-08 15:38:05 +03:00
/* parse filename for multiple file systems */
strcpy(initialTestFileName, test->testFileName);
fileNames = ParseFileName(initialTestFileName, &count);
if (count > 1 && test->uniqueDir == TRUE)
ERR("cannot use multiple file names with unique directories");
if (test->filePerProc) {
strcpy(testFileNameRoot,
fileNames[((rank +
rankOffset) % test->numTasks) % count]);
} else {
strcpy(testFileNameRoot, fileNames[0]);
2011-06-17 23:20:43 +04:00
}
2018-07-08 15:38:05 +03:00
/* give unique name if using multiple files */
if (test->filePerProc) {
/*
* prepend rank subdirectory before filename
* e.g., /dir/file => /dir/<rank>/file
*/
if (test->uniqueDir == TRUE) {
strcpy(testFileNameRoot,
PrependDir(test, testFileNameRoot));
}
sprintf(testFileName, "%s.%08d", testFileNameRoot,
(rank + rankOffset) % test->numTasks);
} else {
strcpy(testFileName, testFileNameRoot);
}
2018-07-08 15:38:05 +03:00
/* add suffix for multiple files */
if (test->repCounter > -1) {
sprintf(tmpString, ".%d", test->repCounter);
strcat(testFileName, tmpString);
}
free (fileNames);
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* From absolute directory, insert rank as subdirectory. Allows each task
* to write to its own directory. E.g., /dir/file => /dir/<rank>/file.
*/
static char *PrependDir(IOR_param_t * test, char *rootDir)
2011-06-17 23:20:43 +04:00
{
char *dir;
char fname[MAX_STR + 1];
char *p;
int i;
dir = (char *)malloc(MAX_STR + 1);
if (dir == NULL)
ERR("out of memory");
/* get dir name */
strcpy(dir, rootDir);
i = strlen(dir) - 1;
while (i > 0) {
if (dir[i] == '\0' || dir[i] == '/') {
dir[i] = '/';
dir[i + 1] = '\0';
break;
}
i--;
}
/* get file name */
strcpy(fname, rootDir);
p = fname;
while (i > 0) {
if (fname[i] == '\0' || fname[i] == '/') {
p = fname + (i + 1);
break;
}
i--;
}
/* create directory with rank as subdirectory */
sprintf(dir, "%s%d", dir, (rank + rankOffset) % test->numTasks);
/* dir doesn't exist, so create */
if (access(dir, F_OK) != 0) {
if (mkdir(dir, S_IRWXU) < 0) {
ERR("cannot create directory");
}
/* check if correct permissions */
} else if (access(dir, R_OK) != 0 || access(dir, W_OK) != 0 ||
access(dir, X_OK) != 0) {
ERR("invalid directory permissions");
}
/* concatenate dir and file names */
strcat(dir, "/");
strcat(dir, p);
return dir;
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/******************************************************************************/
/*
* Reduce test results, and show if verbose set.
*/
static void ReduceIterResults(IOR_test_t *test, double **timer, int rep,
int access)
2011-06-17 23:20:43 +04:00
{
2018-07-08 15:38:05 +03:00
double reduced[12] = { 0 };
double diff[6];
double *diff_subset;
double totalTime;
double bw;
2018-07-08 15:38:05 +03:00
int i;
MPI_Op op;
assert(access == WRITE || access == READ);
2011-12-11 08:45:19 +04:00
/* Find the minimum start time of the even numbered timers, and the
maximum finish time for the odd numbered timers */
for (i = 0; i < 12; i++) {
op = i % 2 ? MPI_MAX : MPI_MIN;
MPI_CHECK(MPI_Reduce(&timer[i][rep], &reduced[i], 1, MPI_DOUBLE,
op, 0, testComm), "MPI_Reduce()");
2011-06-17 23:20:43 +04:00
}
2011-12-11 08:45:19 +04:00
if (rank != 0) {
/* Only rank 0 tallies and prints the results. */
return;
}
/* Calculate elapsed times and throughput numbers */
for (i = 0; i < 6; i++) {
diff[i] = reduced[2 * i + 1] - reduced[2 * i];
}
if (access == WRITE) {
totalTime = reduced[5] - reduced[0];
test->results->writeTime[rep] = totalTime;
diff_subset = &diff[0];
} else { /* READ */
totalTime = reduced[11] - reduced[6];
test->results->readTime[rep] = totalTime;
diff_subset = &diff[3];
}
2011-12-11 08:45:19 +04:00
if (verbose < VERBOSE_0) {
return;
}
bw = (double)test->results->aggFileSizeForBW[rep] / totalTime;
2012-01-09 06:55:46 +04:00
2018-07-08 15:38:05 +03:00
PrintReducedResult(test, access, bw, diff_subset, totalTime, rep);
2012-01-09 06:55:46 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Check for file(s), then remove all files if file-per-proc, else single file.
*
2011-06-17 23:20:43 +04:00
*/
static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
{
int tmpRankOffset = 0;
if (filePerProc) {
/* in random tasks, delete own file */
if (test->reorderTasksRandom == TRUE) {
tmpRankOffset = rankOffset;
rankOffset = 0;
GetTestFileName(testFileName, test);
}
if (backend->access(testFileName, F_OK, test) == 0) {
backend->delete(testFileName, test);
}
if (test->reorderTasksRandom == TRUE) {
rankOffset = tmpRankOffset;
GetTestFileName(testFileName, test);
}
} else {
if ((rank == 0) && (backend->access(testFileName, F_OK, test) == 0)) {
backend->delete(testFileName, test);
}
}
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Setup tests by parsing commandline and creating test script.
* Perform a sanity-check on the configured parameters.
2011-06-17 23:20:43 +04:00
*/
static IOR_test_t *SetupTests(int argc, char **argv)
2011-06-17 23:20:43 +04:00
{
IOR_test_t *tests, *testsHead;
2011-06-17 23:20:43 +04:00
/* count the tasks per node */
tasksPerNode = CountTasksPerNode(mpi_comm_world);
2011-06-17 23:20:43 +04:00
testsHead = tests = ParseCommandLine(argc, argv);
/*
* Since there is no guarantee that anyone other than
* task 0 has the environment settings for the hints, pass
2018-07-07 13:42:21 +03:00
* the hint=value pair to everyone else in mpi_comm_world
*/
DistributeHints();
2011-06-17 23:20:43 +04:00
/* check validity of tests and create test queue */
while (tests != NULL) {
ValidateTests(&tests->params);
tests = tests->next;
}
2011-06-17 23:20:43 +04:00
2018-07-07 13:42:21 +03:00
init_clock();
2011-06-17 23:20:43 +04:00
/* seed random number generator */
2018-07-07 13:42:21 +03:00
SeedRandGen(mpi_comm_world);
2011-06-17 23:20:43 +04:00
return (testsHead);
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
2011-11-12 03:11:28 +04:00
/*
* Setup transfer buffers, creating and filling as needed.
*/
static void XferBuffersSetup(IOR_io_buffers* ioBuffers, IOR_param_t* test,
int pretendRank)
2011-06-17 23:20:43 +04:00
{
ioBuffers->buffer = aligned_buffer_alloc(test->transferSize);
if (test->checkWrite || test->checkRead) {
ioBuffers->checkBuffer = aligned_buffer_alloc(test->transferSize);
}
if (test->checkRead || test->checkWrite) {
ioBuffers->readCheckBuffer = aligned_buffer_alloc(test->transferSize);
}
return;
}
/*
* Free transfer buffers.
*/
static void XferBuffersFree(IOR_io_buffers* ioBuffers, IOR_param_t* test)
{
aligned_buffer_free(ioBuffers->buffer);
if (test->checkWrite || test->checkRead) {
aligned_buffer_free(ioBuffers->checkBuffer);
}
if (test->checkRead) {
aligned_buffer_free(ioBuffers->readCheckBuffer);
2011-06-17 23:20:43 +04:00
}
return;
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* malloc a buffer, touching every page in an attempt to defeat lazy allocation.
*/
static void *malloc_and_touch(size_t size)
{
size_t page_size;
char *buf;
char *ptr;
if (size == 0)
return NULL;
page_size = sysconf(_SC_PAGESIZE);
buf = (char *)malloc(size);
if (buf == NULL)
2012-01-09 06:41:30 +04:00
return NULL;
for (ptr = buf; ptr < buf+size; ptr += page_size) {
*ptr = (char)1;
}
return (void *)buf;
}
static void file_hits_histogram(IOR_param_t *params)
{
int *rankoffs = NULL;
int *filecont = NULL;
int *filehits = NULL;
int ifile;
int jfile;
if (rank == 0) {
rankoffs = (int *)malloc(params->numTasks * sizeof(int));
filecont = (int *)malloc(params->numTasks * sizeof(int));
filehits = (int *)malloc(params->numTasks * sizeof(int));
}
MPI_CHECK(MPI_Gather(&rankOffset, 1, MPI_INT, rankoffs,
2018-07-07 13:42:21 +03:00
1, MPI_INT, 0, mpi_comm_world),
"MPI_Gather error");
if (rank != 0)
return;
memset((void *)filecont, 0, params->numTasks * sizeof(int));
for (ifile = 0; ifile < params->numTasks; ifile++) {
filecont[(ifile + rankoffs[ifile]) % params->numTasks]++;
}
memset((void *)filehits, 0, params->numTasks * sizeof(int));
for (ifile = 0; ifile < params->numTasks; ifile++)
for (jfile = 0; jfile < params->numTasks; jfile++) {
if (ifile == filecont[jfile])
filehits[ifile]++;
}
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "#File Hits Dist:");
jfile = 0;
ifile = 0;
while (jfile < params->numTasks && ifile < params->numTasks) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, " %d", filehits[ifile]);
jfile += filehits[ifile], ifile++;
}
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "\n");
free(rankoffs);
free(filecont);
free(filehits);
}
2012-01-09 00:30:05 +04:00
int test_time_elapsed(IOR_param_t *params, double startTime)
{
double endTime;
2012-01-09 00:30:05 +04:00
if (params->maxTimeDuration == 0)
return 0;
2012-01-09 00:30:05 +04:00
endTime = startTime + (params->maxTimeDuration * 60);
2012-01-09 00:30:05 +04:00
return GetTimeStamp() >= endTime;
2012-01-09 00:30:05 +04:00
}
2012-01-09 06:41:30 +04:00
/*
* hog some memory as a rough simulation of a real application's memory use
*/
static void *HogMemory(IOR_param_t *params)
{
size_t size;
void *buf;
if (params->memoryPerTask != 0) {
size = params->memoryPerTask;
} else if (params->memoryPerNode != 0) {
if (verbose >= VERBOSE_3)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "This node hogging %ld bytes of memory\n",
2012-01-09 06:41:30 +04:00
params->memoryPerNode);
size = params->memoryPerNode / params->tasksPerNode;
} else {
return NULL;
}
if (verbose >= VERBOSE_3)
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "This task hogging %ld bytes of memory\n", size);
2012-01-09 06:41:30 +04:00
buf = malloc_and_touch(size);
if (buf == NULL)
ERR("malloc of simulated applciation buffer failed");
return buf;
}
2011-06-17 23:20:43 +04:00
/*
* Using the test parameters, run iteration(s) of single test.
*/
static void TestIoSys(IOR_test_t *test)
2011-06-17 23:20:43 +04:00
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
char testFileName[MAX_STR];
double *timer[12];
double startTime;
int pretendRank;
2012-01-09 00:30:05 +04:00
int i, rep;
void *fd;
MPI_Group orig_group, new_group;
int range[3];
IOR_offset_t dataMoved; /* for data rate calculation */
void *hog_buf;
IOR_io_buffers ioBuffers;
/* set up communicator for test */
if (params->numTasks > numTasksWorld) {
if (rank == 0) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"WARNING: More tasks requested (%d) than available (%d),",
params->numTasks, numTasksWorld);
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, " running on %d tasks.\n",
numTasksWorld);
2011-06-17 23:20:43 +04:00
}
params->numTasks = numTasksWorld;
}
2018-07-07 13:42:21 +03:00
MPI_CHECK(MPI_Comm_group(mpi_comm_world, &orig_group),
"MPI_Comm_group() error");
range[0] = 0; /* first rank */
range[1] = params->numTasks - 1; /* last rank */
range[2] = 1; /* stride */
MPI_CHECK(MPI_Group_range_incl(orig_group, 1, &range, &new_group),
"MPI_Group_range_incl() error");
2018-07-07 13:42:21 +03:00
MPI_CHECK(MPI_Comm_create(mpi_comm_world, new_group, &testComm),
"MPI_Comm_create() error");
MPI_CHECK(MPI_Group_free(&orig_group), "MPI_Group_Free() error");
MPI_CHECK(MPI_Group_free(&new_group), "MPI_Group_Free() error");
params->testComm = testComm;
if (testComm == MPI_COMM_NULL) {
/* tasks not in the group do not participate in this test */
2018-07-07 13:42:21 +03:00
MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
return;
}
if (rank == 0 && verbose >= VERBOSE_1) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "Participating tasks: %d\n", params->numTasks);
fflush(out_logfile);
}
if (rank == 0 && params->reorderTasks == TRUE && verbose >= VERBOSE_1) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"Using reorderTasks '-C' (expecting block, not cyclic, task assignment)\n");
2018-07-07 13:42:21 +03:00
fflush(out_logfile);
2011-06-17 23:20:43 +04:00
}
params->tasksPerNode = CountTasksPerNode(testComm);
2011-06-17 23:20:43 +04:00
/* setup timers */
for (i = 0; i < 12; i++) {
timer[i] = (double *)malloc(params->repetitions * sizeof(double));
if (timer[i] == NULL)
2011-12-11 08:45:19 +04:00
ERR("malloc failed");
}
2011-12-11 08:45:19 +04:00
/* bind I/O calls to specific API */
AioriBind(params->api, params);
2011-06-17 23:20:43 +04:00
/* show test setup */
if (rank == 0 && verbose >= VERBOSE_0)
ShowSetup(params);
2011-06-17 23:20:43 +04:00
2012-01-09 06:41:30 +04:00
hog_buf = HogMemory(params);
pretendRank = (rank + rankOffset) % params->numTasks;
/* IO Buffer Setup */
if (params->setTimeStampSignature) { // initialize the buffer properly
params->timeStampSignatureValue = (unsigned int)params->setTimeStampSignature;
}
XferBuffersSetup(&ioBuffers, params, pretendRank);
reseed_incompressible_prng = TRUE; // reset pseudo random generator, necessary to guarantee the next call to FillBuffer produces the same value as it is right now
/* Initial time stamp */
startTime = GetTimeStamp();
2011-06-17 23:20:43 +04:00
/* loop over test iterations */
for (rep = 0; rep < params->repetitions; rep++) {
/* Get iteration start time in seconds in task 0 and broadcast to
all tasks */
if (rank == 0) {
if (! params->setTimeStampSignature) {
time_t currentTime;
if ((currentTime = time(NULL)) == -1) {
ERR("cannot get current time");
}
params->timeStampSignatureValue =
(unsigned int)currentTime;
if (verbose >= VERBOSE_2) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"Using Time Stamp %u (0x%x) for Data Signature\n",
params->timeStampSignatureValue,
params->timeStampSignatureValue);
}
}
if (rep == 0 && verbose >= VERBOSE_0) {
2018-07-08 15:47:55 +03:00
PrintTableHeader();
}
}
MPI_CHECK(MPI_Bcast
(&params->timeStampSignatureValue, 1, MPI_UNSIGNED, 0,
testComm), "cannot broadcast start time value");
2017-12-09 13:52:13 +03:00
FillBuffer(ioBuffers.buffer, params, 0, pretendRank);
/* use repetition count for number of multiple files */
if (params->multiFile)
params->repCounter = rep;
/*
* write the file(s), getting timing between I/O calls
*/
2012-01-09 00:30:05 +04:00
if (params->writeFile && !test_time_elapsed(params, startTime)) {
GetTestFileName(testFileName, params);
if (verbose >= VERBOSE_3) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "task %d writing %s\n", rank,
testFileName);
}
DelaySecs(params->interTestDelay);
if (params->useExistingTestFile == FALSE) {
RemoveFile(testFileName, params->filePerProc,
params);
}
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
params->open = WRITE;
timer[0][rep] = GetTimeStamp();
fd = backend->create(testFileName, params);
timer[1][rep] = GetTimeStamp();
if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm),
"barrier error");
if (rank == 0 && verbose >= VERBOSE_1) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"Commencing write performance test: %s",
CurrentTimeString());
}
timer[2][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, results, fd, WRITE, &ioBuffers);
if (params->verbose >= VERBOSE_4) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "* data moved = %llu\n", dataMoved);
fflush(out_logfile);
}
timer[3][rep] = GetTimeStamp();
if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm),
"barrier error");
timer[4][rep] = GetTimeStamp();
backend->close(fd, params);
timer[5][rep] = GetTimeStamp();
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
/* get the size of the file just written */
results->aggFileSizeFromStat[rep] =
backend->get_file_size(params, testComm, testFileName);
/* check if stat() of file doesn't equal expected file size,
use actual amount of byte moved */
CheckFileSize(test, dataMoved, rep);
if (verbose >= VERBOSE_3)
WriteTimes(params, timer, rep, WRITE);
ReduceIterResults(test, timer, rep, WRITE);
if (params->outlierThreshold) {
CheckForOutliers(params, timer, rep, WRITE);
}
}
2011-06-17 23:20:43 +04:00
/*
* perform a check of data, reading back data and comparing
* against what was expected to be written
*/
2012-01-09 00:30:05 +04:00
if (params->checkWrite && !test_time_elapsed(params, startTime)) {
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
if (rank == 0 && verbose >= VERBOSE_1) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"Verifying contents of the file(s) just written.\n");
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "%s\n", CurrentTimeString());
}
if (params->reorderTasks) {
/* move two nodes away from writing node */
rankOffset =
(2 * params->tasksPerNode) % params->numTasks;
}
// update the check buffer
FillBuffer(ioBuffers.readCheckBuffer, params, 0, (rank + rankOffset) % params->numTasks);
reseed_incompressible_prng = TRUE; /* Re-Seed the PRNG to get same sequence back, if random */
GetTestFileName(testFileName, params);
params->open = WRITECHECK;
fd = backend->open(testFileName, params);
dataMoved = WriteOrRead(params, results, fd, WRITECHECK, &ioBuffers);
backend->close(fd, params);
rankOffset = 0;
}
/*
* read the file(s), getting timing between I/O calls
*/
if ((params->readFile || params->checkRead ) && !test_time_elapsed(params, startTime)) {
int operation_flag = READ;
if ( params->checkRead ){
// actually read and then compare the buffer
operation_flag = READCHECK;
}
/* Get rankOffset [file offset] for this process to read, based on -C,-Z,-Q,-X options */
/* Constant process offset reading */
if (params->reorderTasks) {
/* move taskPerNodeOffset nodes[1==default] away from writing node */
rankOffset =
(params->taskPerNodeOffset *
params->tasksPerNode) % params->numTasks;
}
/* random process offset reading */
if (params->reorderTasksRandom) {
/* this should not intefere with randomOffset within a file because GetOffsetArrayRandom */
/* seeds every random() call */
int nodeoffset;
unsigned int iseed0;
nodeoffset = params->taskPerNodeOffset;
nodeoffset = (nodeoffset < params->nodes) ? nodeoffset : params->nodes - 1;
if (params->reorderTasksRandomSeed < 0)
iseed0 = -1 * params->reorderTasksRandomSeed + rep;
else
iseed0 = params->reorderTasksRandomSeed;
srand(rank + iseed0);
{
rankOffset = rand() % params->numTasks;
}
while (rankOffset <
(nodeoffset * params->tasksPerNode)) {
rankOffset = rand() % params->numTasks;
}
/* Get more detailed stats if requested by verbose level */
if (verbose >= VERBOSE_2) {
file_hits_histogram(params);
}
}
if(operation_flag == READCHECK){
2017-10-25 16:57:50 +03:00
FillBuffer(ioBuffers.readCheckBuffer, params, 0, (rank + rankOffset) % params->numTasks);
}
/* Using globally passed rankOffset, following function generates testFileName to read */
GetTestFileName(testFileName, params);
if (verbose >= VERBOSE_3) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "task %d reading %s\n", rank,
testFileName);
}
DelaySecs(params->interTestDelay);
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
params->open = READ;
timer[6][rep] = GetTimeStamp();
fd = backend->open(testFileName, params);
timer[7][rep] = GetTimeStamp();
if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm),
"barrier error");
if (rank == 0 && verbose >= VERBOSE_1) {
2018-07-07 13:42:21 +03:00
fprintf(out_logfile,
"Commencing read performance test: %s",
CurrentTimeString());
}
timer[8][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, results, fd, operation_flag, &ioBuffers);
timer[9][rep] = GetTimeStamp();
if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm),
"barrier error");
timer[10][rep] = GetTimeStamp();
backend->close(fd, params);
timer[11][rep] = GetTimeStamp();
/* get the size of the file just read */
results->aggFileSizeFromStat[rep] =
backend->get_file_size(params, testComm,
testFileName);
/* check if stat() of file doesn't equal expected file size,
use actual amount of byte moved */
CheckFileSize(test, dataMoved, rep);
if (verbose >= VERBOSE_3)
WriteTimes(params, timer, rep, READ);
ReduceIterResults(test, timer, rep, READ);
if (params->outlierThreshold) {
CheckForOutliers(params, timer, rep, READ);
}
}
if (!params->keepFile
2012-01-09 06:55:46 +04:00
&& !(params->errorFound && params->keepFileWithError)) {
double start, finish;
start = GetTimeStamp();
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
RemoveFile(testFileName, params->filePerProc, params);
2012-01-09 06:55:46 +04:00
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
finish = GetTimeStamp();
PrintRemoveTiming(start, finish, rep);
} else {
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
}
params->errorFound = FALSE;
rankOffset = 0;
}
2011-06-17 23:20:43 +04:00
MPI_CHECK(MPI_Comm_free(&testComm), "MPI_Comm_free() error");
if (params->summary_every_test) {
PrintLongSummaryHeader();
PrintLongSummaryOneTest(test);
} else {
PrintShortSummary(test);
}
XferBuffersFree(&ioBuffers, params);
if (hog_buf != NULL)
free(hog_buf);
for (i = 0; i < 12; i++) {
free(timer[i]);
2011-06-17 23:20:43 +04:00
}
/* Sync with the tasks that did not participate in this test */
2018-07-07 13:42:21 +03:00
MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Determine if valid tests from parameters.
*/
static void ValidateTests(IOR_param_t * test)
2011-06-17 23:20:43 +04:00
{
IOR_param_t defaults;
init_IOR_Param_t(&defaults);
/* get the version of the tests */
AioriBind(test->api, test);
backend->set_version(test);
if (test->repetitions <= 0)
WARN_RESET("too few test repetitions",
test, &defaults, repetitions);
if (test->numTasks <= 0)
ERR("too few tasks for testing");
if (test->interTestDelay < 0)
WARN_RESET("inter-test delay must be nonnegative value",
test, &defaults, interTestDelay);
if (test->readFile != TRUE && test->writeFile != TRUE
&& test->checkRead != TRUE && test->checkWrite != TRUE)
ERR("test must write, read, or check read/write file");
if(! test->setTimeStampSignature && test->writeFile != TRUE && test->checkRead == TRUE)
ERR("using readCheck only requires to write a timeStampSignature -- use -G");
if (test->segmentCount < 0)
ERR("segment count must be positive value");
if ((test->blockSize % sizeof(IOR_size_t)) != 0)
ERR("block size must be a multiple of access size");
if (test->blockSize < 0)
ERR("block size must be non-negative integer");
if ((test->transferSize % sizeof(IOR_size_t)) != 0)
ERR("transfer size must be a multiple of access size");
if (test->setAlignment < 0)
ERR("alignment must be non-negative integer");
if (test->transferSize < 0)
ERR("transfer size must be non-negative integer");
if (test->transferSize == 0) {
ERR("test will not complete with zero transfer size");
2011-06-17 23:20:43 +04:00
} else {
if ((test->blockSize % test->transferSize) != 0)
ERR("block size must be a multiple of transfer size");
}
if (test->blockSize < test->transferSize)
ERR("block size must not be smaller than transfer size");
/* specific APIs */
if ((strcasecmp(test->api, "MPIIO") == 0)
&& (test->blockSize < sizeof(IOR_size_t)
|| test->transferSize < sizeof(IOR_size_t)))
ERR("block/transfer size may not be smaller than IOR_size_t for MPIIO");
if ((strcasecmp(test->api, "HDF5") == 0)
&& (test->blockSize < sizeof(IOR_size_t)
|| test->transferSize < sizeof(IOR_size_t)))
ERR("block/transfer size may not be smaller than IOR_size_t for HDF5");
if ((strcasecmp(test->api, "NCMPI") == 0)
&& (test->blockSize < sizeof(IOR_size_t)
|| test->transferSize < sizeof(IOR_size_t)))
ERR("block/transfer size may not be smaller than IOR_size_t for NCMPI");
if ((test->useFileView == TRUE)
&& (sizeof(MPI_Aint) < 8) /* used for 64-bit datatypes */
&&((test->numTasks * test->blockSize) >
(2 * (IOR_offset_t) GIBIBYTE)))
ERR("segment size must be < 2GiB");
if ((strcasecmp(test->api, "POSIX") != 0) && test->singleXferAttempt)
WARN_RESET("retry only available in POSIX",
test, &defaults, singleXferAttempt);
2018-07-08 02:16:30 +03:00
if ((strcasecmp(test->api, "POSIX") != 0) &&
(strcasecmp(test->api, "MMAP") != 0) &&
(strcasecmp(test->api, "MPIIO") != 0)
&& test->fsync)
WARN_RESET("fsync() only available in POSIX/MMAP",
test, &defaults, fsync);
if ((strcasecmp(test->api, "MPIIO") != 0) && test->preallocate)
WARN_RESET("preallocation only available in MPIIO",
test, &defaults, preallocate);
if ((strcasecmp(test->api, "MPIIO") != 0) && test->useFileView)
WARN_RESET("file view only available in MPIIO",
test, &defaults, useFileView);
if ((strcasecmp(test->api, "MPIIO") != 0) && test->useSharedFilePointer)
WARN_RESET("shared file pointer only available in MPIIO",
test, &defaults, useSharedFilePointer);
if ((strcasecmp(test->api, "MPIIO") == 0) && test->useSharedFilePointer)
WARN_RESET("shared file pointer not implemented",
test, &defaults, useSharedFilePointer);
if ((strcasecmp(test->api, "MPIIO") != 0) && test->useStridedDatatype)
WARN_RESET("strided datatype only available in MPIIO",
test, &defaults, useStridedDatatype);
if ((strcasecmp(test->api, "MPIIO") == 0) && test->useStridedDatatype)
WARN_RESET("strided datatype not implemented",
test, &defaults, useStridedDatatype);
if ((strcasecmp(test->api, "MPIIO") == 0)
&& test->useStridedDatatype && (test->blockSize < sizeof(IOR_size_t)
|| test->transferSize <
sizeof(IOR_size_t)))
ERR("need larger file size for strided datatype in MPIIO");
if ((strcasecmp(test->api, "POSIX") == 0) && test->showHints)
WARN_RESET("hints not available in POSIX",
test, &defaults, showHints);
if ((strcasecmp(test->api, "POSIX") == 0) && test->collective)
WARN_RESET("collective not available in POSIX",
test, &defaults, collective);
if ((strcasecmp(test->api, "MMAP") == 0) && test->fsyncPerWrite
&& (test->transferSize & (sysconf(_SC_PAGESIZE) - 1)))
ERR("transfer size must be aligned with PAGESIZE for MMAP with fsyncPerWrite");
/* parameter consitency */
if (test->reorderTasks == TRUE && test->reorderTasksRandom == TRUE)
ERR("Both Constant and Random task re-ordering specified. Choose one and resubmit");
if (test->randomOffset && test->reorderTasksRandom
&& test->filePerProc == FALSE)
ERR("random offset and random reorder tasks specified with single-shared-file. Choose one and resubmit");
if (test->randomOffset && test->reorderTasks
&& test->filePerProc == FALSE)
ERR("random offset and constant reorder tasks specified with single-shared-file. Choose one and resubmit");
if (test->randomOffset && test->checkRead)
ERR("random offset not available with read check option (use write check)");
if (test->randomOffset && test->storeFileOffset)
ERR("random offset not available with store file offset option)");
if ((strcasecmp(test->api, "MPIIO") == 0) && test->randomOffset
&& test->collective)
ERR("random offset not available with collective MPIIO");
if ((strcasecmp(test->api, "MPIIO") == 0) && test->randomOffset
&& test->useFileView)
ERR("random offset not available with MPIIO fileviews");
if ((strcasecmp(test->api, "HDF5") == 0) && test->randomOffset)
ERR("random offset not available with HDF5");
if ((strcasecmp(test->api, "NCMPI") == 0) && test->randomOffset)
ERR("random offset not available with NCMPI");
if ((strcasecmp(test->api, "HDF5") != 0) && test->individualDataSets)
WARN_RESET("individual datasets only available in HDF5",
test, &defaults, individualDataSets);
if ((strcasecmp(test->api, "HDF5") == 0) && test->individualDataSets)
WARN_RESET("individual data sets not implemented",
test, &defaults, individualDataSets);
if ((strcasecmp(test->api, "NCMPI") == 0) && test->filePerProc)
ERR("file-per-proc not available in current NCMPI");
if (test->noFill) {
if (strcasecmp(test->api, "HDF5") != 0) {
ERR("'no fill' option only available in HDF5");
} else {
/* check if hdf5 available */
#if defined (H5_VERS_MAJOR) && defined (H5_VERS_MINOR)
/* no-fill option not available until hdf5-1.6.x */
#if (H5_VERS_MAJOR > 0 && H5_VERS_MINOR > 5)
;
#else
char errorString[MAX_STR];
sprintf(errorString,
"'no fill' option not available in %s",
test->apiVersion);
ERR(errorString);
#endif
#else
WARN("unable to determine HDF5 version for 'no fill' usage");
#endif
}
}
if (test->useExistingTestFile && test->lustre_set_striping)
ERR("Lustre stripe options are incompatible with useExistingTestFile");
/* N:1 and N:N */
IOR_offset_t NtoN = test->filePerProc;
IOR_offset_t Nto1 = ! NtoN;
IOR_offset_t s = test->segmentCount;
IOR_offset_t t = test->transferSize;
IOR_offset_t b = test->blockSize;
if (Nto1 && (s != 1) && (b != t)) {
ERR("N:1 (strided) requires xfer-size == block-size");
}
2011-11-12 03:11:28 +04:00
}
/**
* Returns a precomputed array of IOR_offset_t for the inner benchmark loop.
* They are sequential and the last element is set to -1 as end marker.
* @param test IOR_param_t for getting transferSize, blocksize and SegmentCount
* @param pretendRank int pretended Rank for shifting the offsest corectly
* @return IOR_offset_t
*/
static IOR_offset_t *GetOffsetArraySequential(IOR_param_t * test,
int pretendRank)
2011-06-17 23:20:43 +04:00
{
IOR_offset_t i, j, k = 0;
IOR_offset_t offsets;
IOR_offset_t *offsetArray;
/* count needed offsets */
offsets = (test->blockSize / test->transferSize) * test->segmentCount;
/* setup empty array */
offsetArray =
(IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
if (offsetArray == NULL)
ERR("malloc() failed");
offsetArray[offsets] = -1; /* set last offset with -1 */
/* fill with offsets */
for (i = 0; i < test->segmentCount; i++) {
for (j = 0; j < (test->blockSize / test->transferSize); j++) {
offsetArray[k] = j * test->transferSize;
if (test->filePerProc) {
offsetArray[k] += i * test->blockSize;
} else {
offsetArray[k] +=
(i * test->numTasks * test->blockSize)
+ (pretendRank * test->blockSize);
}
k++;
}
}
return (offsetArray);
2011-11-12 03:11:28 +04:00
}
/**
2018-07-07 13:42:21 +03:00
* Returns a precomputed array of IOR_offset_t for the inner benchmark loop.
* They get created sequentially and mixed up in the end. The last array element
* is set to -1 as end marker.
* It should be noted that as the seeds get synchronised across all processes
* every process computes the same random order if used with filePerProc.
* For a shared file all transfers get randomly assigned to ranks. The processes
* can also have differen't numbers of transfers. This might lead to a bigger
* diversion in accesse as it dose with filePerProc. This is expected but
* should be mined.
* @param test IOR_param_t for getting transferSize, blocksize and SegmentCount
* @param pretendRank int pretended Rank for shifting the offsest corectly
* @return IOR_offset_t
* @return
*/
static IOR_offset_t *GetOffsetArrayRandom(IOR_param_t * test, int pretendRank,
int access)
2011-06-17 23:20:43 +04:00
{
int seed;
IOR_offset_t i, value, tmp;
2011-11-12 03:11:28 +04:00
IOR_offset_t offsets = 0;
IOR_offset_t offsetCnt = 0;
IOR_offset_t fileSize;
IOR_offset_t *offsetArray;
/* set up seed for random() */
if (access == WRITE || access == READ) {
test->randomSeed = seed = random();
2011-06-17 23:20:43 +04:00
} else {
seed = test->randomSeed;
2011-06-17 23:20:43 +04:00
}
srandom(seed);
2011-06-17 23:20:43 +04:00
fileSize = test->blockSize * test->segmentCount;
if (test->filePerProc == FALSE) {
fileSize *= test->numTasks;
2011-06-17 23:20:43 +04:00
}
/* count needed offsets (pass 1) */
2011-06-17 23:20:43 +04:00
for (i = 0; i < fileSize; i += test->transferSize) {
if (test->filePerProc == FALSE) {
// this counts which process get how many transferes in
// a shared file
if ((random() % test->numTasks) == pretendRank) {
offsets++;
}
} else {
offsets++;
}
2011-06-17 23:20:43 +04:00
}
/* setup empty array */
offsetArray =
(IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
if (offsetArray == NULL)
ERR("malloc() failed");
offsetArray[offsets] = -1; /* set last offset with -1 */
if (test->filePerProc) {
/* fill array */
for (i = 0; i < offsets; i++) {
offsetArray[i] = i * test->transferSize;
}
} else {
/* fill with offsets (pass 2) */
srandom(seed); /* need same seed to get same transfers as counted in the beginning*/
for (i = 0; i < fileSize; i += test->transferSize) {
if ((random() % test->numTasks) == pretendRank) {
offsetArray[offsetCnt] = i;
offsetCnt++;
}
}
}
/* reorder array */
for (i = 0; i < offsets; i++) {
value = random() % offsets;
tmp = offsetArray[value];
offsetArray[value] = offsetArray[i];
offsetArray[i] = tmp;
}
SeedRandGen(test->testComm); /* synchronize seeds across tasks */
2011-06-17 23:20:43 +04:00
return (offsetArray);
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
static IOR_offset_t WriteOrReadSingle(IOR_offset_t pairCnt, IOR_offset_t *offsetArray, int pretendRank,
IOR_offset_t * transferCount, int * errors, IOR_param_t * test, int * fd, IOR_io_buffers* ioBuffers, int access){
IOR_offset_t amtXferred = 0;
IOR_offset_t transfer;
void *buffer = ioBuffers->buffer;
void *checkBuffer = ioBuffers->checkBuffer;
void *readCheckBuffer = ioBuffers->readCheckBuffer;
test->offset = offsetArray[pairCnt];
transfer = test->transferSize;
if (access == WRITE) {
2017-11-30 13:56:26 +03:00
/* fills each transfer with a unique pattern
* containing the offset into the file */
if (test->storeFileOffset == TRUE) {
FillBuffer(buffer, test, test->offset, pretendRank);
}
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot write to file");
} else if (access == READ) {
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot read from file");
} else if (access == WRITECHECK) {
memset(checkBuffer, 'a', transfer);
2017-11-30 13:56:26 +03:00
if (test->storeFileOffset == TRUE) {
FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
}
amtXferred = backend->xfer(access, fd, checkBuffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot read from file write check");
(*transferCount)++;
*errors += CompareBuffers(readCheckBuffer, checkBuffer, transfer,
*transferCount, test,
WRITECHECK);
} else if (access == READCHECK) {
amtXferred = backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer){
ERR("cannot read from file");
}
if (test->storeFileOffset == TRUE) {
FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
}
*errors += CompareBuffers(readCheckBuffer, buffer, transfer, *transferCount, test, READCHECK);
}
return amtXferred;
}
2011-06-17 23:20:43 +04:00
/*
* Write or Read data to file(s). This loops through the strides, writing
* out the data to each block in transfer sizes, until the remainder left is 0.
*/
static IOR_offset_t WriteOrRead(IOR_param_t * test, IOR_results_t * results, void *fd, int access, IOR_io_buffers* ioBuffers)
2011-06-17 23:20:43 +04:00
{
int errors = 0;
IOR_offset_t amtXferred;
IOR_offset_t transferCount = 0;
2018-07-07 13:42:21 +03:00
uint64_t pairCnt = 0;
IOR_offset_t *offsetArray;
int pretendRank;
IOR_offset_t dataMoved = 0; /* for data rate calculation */
double startForStonewall;
int hitStonewall;
/* initialize values */
pretendRank = (rank + rankOffset) % test->numTasks;
if (test->randomOffset) {
offsetArray = GetOffsetArrayRandom(test, pretendRank, access);
} else {
offsetArray = GetOffsetArraySequential(test, pretendRank);
2011-06-17 23:20:43 +04:00
}
/* check for stonewall */
startForStonewall = GetTimeStamp();
2011-06-17 23:20:43 +04:00
hitStonewall = ((test->deadlineForStonewalling != 0)
&& ((GetTimeStamp() - startForStonewall)
> test->deadlineForStonewalling));
2018-07-07 16:14:55 +03:00
if(access == READ && test->stoneWallingStatusFile[0]){
test->stoneWallingWearOutIterations = ReadStoneWallingIterations(test->stoneWallingStatusFile);
if(test->stoneWallingWearOutIterations == -1){
ERR("Could not read back the stonewalling status from the file!");
}
}
/* loop over offsets to access */
while ((offsetArray[pairCnt] != -1) && !hitStonewall ) {
dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
pairCnt++;
hitStonewall = ((test->deadlineForStonewalling != 0)
&& ((GetTimeStamp() - startForStonewall)
> test->deadlineForStonewalling)) || (test->stoneWallingWearOutIterations != 0 && pairCnt == test->stoneWallingWearOutIterations) ;
}
if (test->stoneWallingWearOut){
2018-07-07 13:42:21 +03:00
if (verbose >= VERBOSE_1){
fprintf(out_logfile, "%d: stonewalling pairs accessed: %lld\n", rank, (long long) pairCnt);
}
long long data_moved_ll = (long long) dataMoved;
long long pairs_accessed_min = 0;
MPI_CHECK(MPI_Allreduce(& pairCnt, &results->pairs_accessed,
1, MPI_LONG_LONG_INT, MPI_MAX, testComm), "cannot reduce pairs moved");
2018-07-07 13:42:21 +03:00
double stonewall_runtime = GetTimeStamp() - startForStonewall;
results->stonewall_time = stonewall_runtime;
MPI_CHECK(MPI_Reduce(& pairCnt, & pairs_accessed_min,
1, MPI_LONG_LONG_INT, MPI_MIN, 0, testComm), "cannot reduce pairs moved");
MPI_CHECK(MPI_Reduce(& data_moved_ll, & results->stonewall_min_data_accessed,
1, MPI_LONG_LONG_INT, MPI_MIN, 0, testComm), "cannot reduce pairs moved");
MPI_CHECK(MPI_Reduce(& data_moved_ll, & results->stonewall_avg_data_accessed,
1, MPI_LONG_LONG_INT, MPI_SUM, 0, testComm), "cannot reduce pairs moved");
if(rank == 0){
fprintf(out_logfile, "stonewalling pairs accessed min: %lld max: %zu -- min data: %.1f GiB mean data: %.1f GiB time: %.1fs\n",
pairs_accessed_min, results->pairs_accessed,
results->stonewall_min_data_accessed /1024.0 / 1024 / 1024, results->stonewall_avg_data_accessed / 1024.0 / 1024 / 1024 / test->numTasks , results->stonewall_time);
results->stonewall_min_data_accessed *= test->numTasks;
}
if(pairs_accessed_min == pairCnt){
results->stonewall_min_data_accessed = 0;
results->stonewall_avg_data_accessed = 0;
}
if(pairCnt != results->pairs_accessed){
// some work needs still to be done !
for(; pairCnt < results->pairs_accessed; pairCnt++ ) {
dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
}
}
}else{
results->pairs_accessed = pairCnt;
}
2011-06-17 23:20:43 +04:00
totalErrorCount += CountErrors(test, access, errors);
2011-06-17 23:20:43 +04:00
free(offsetArray);
2011-06-17 23:20:43 +04:00
if (access == WRITE && test->fsync == TRUE) {
backend->fsync(fd, test); /*fsync after all accesses */
}
return (dataMoved);
2011-11-12 03:11:28 +04:00
}
2011-06-17 23:20:43 +04:00
/*
* Write times taken during each iteration of the test.
*/
static void
WriteTimes(IOR_param_t * test, double **timer, int iteration, int writeOrRead)
2011-06-17 23:20:43 +04:00
{
2011-11-12 03:11:28 +04:00
char accessType[MAX_STR];
char timerName[MAX_STR];
2018-07-07 13:42:21 +03:00
int i, start = 0, stop = 0;
if (writeOrRead == WRITE) {
start = 0;
stop = 6;
strcpy(accessType, "WRITE");
} else if (writeOrRead == READ) {
start = 6;
stop = 12;
strcpy(accessType, "READ");
} else {
ERR("incorrect WRITE/READ option");
}
for (i = start; i < stop; i++) {
switch (i) {
case 0:
strcpy(timerName, "write open start");
break;
case 1:
strcpy(timerName, "write open stop");
break;
case 2:
strcpy(timerName, "write start");
break;
case 3:
strcpy(timerName, "write stop");
break;
case 4:
strcpy(timerName, "write close start");
break;
case 5:
strcpy(timerName, "write close stop");
break;
case 6:
strcpy(timerName, "read open start");
break;
case 7:
strcpy(timerName, "read open stop");
break;
case 8:
strcpy(timerName, "read start");
break;
case 9:
strcpy(timerName, "read stop");
break;
case 10:
strcpy(timerName, "read close start");
break;
case 11:
strcpy(timerName, "read close stop");
break;
default:
strcpy(timerName, "invalid timer");
break;
}
2018-07-07 13:42:21 +03:00
fprintf(out_logfile, "Test %d: Iter=%d, Task=%d, Time=%f, %s\n",
test->id, iteration, (int)rank, timer[i][iteration],
timerName);
}
2011-11-12 03:11:28 +04:00
}