Merge pull request #318 from hpc/fix-comm

Refactoring: remove global comm_world communicator from utilities.c
master
Julian Kunkel 2021-01-20 18:25:53 +00:00 committed by GitHub
commit d5f5cf974d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 71 additions and 75 deletions

View File

@ -255,7 +255,7 @@ HandleDistribute(enum handleType type)
DCHECK(rc, "Failed to get global handle size");
}
MPI_CHECK(MPI_Bcast(&global.iov_buf_len, 1, MPI_UINT64_T, 0, MPI_COMM_WORLD),
MPI_CHECK(MPI_Bcast(&global.iov_buf_len, 1, MPI_UINT64_T, 0, testComm),
"Failed to bcast global handle buffer size");
global.iov_len = global.iov_buf_len;
@ -273,7 +273,7 @@ HandleDistribute(enum handleType type)
DCHECK(rc, "Failed to create global handle");
}
MPI_CHECK(MPI_Bcast(global.iov_buf, global.iov_buf_len, MPI_BYTE, 0, MPI_COMM_WORLD),
MPI_CHECK(MPI_Bcast(global.iov_buf, global.iov_buf_len, MPI_BYTE, 0, testComm),
"Failed to bcast global pool handle");
if (rank != 0) {
@ -555,16 +555,16 @@ DFS_Finalize(aiori_mod_opt_t *options)
DFS_options_t *o = (DFS_options_t *)options;
int rc;
MPI_Barrier(MPI_COMM_WORLD);
MPI_Barrier(testComm);
d_hash_table_destroy(dir_hash, true /* force */);
rc = dfs_umount(dfs);
DCHECK(rc, "Failed to umount DFS namespace");
MPI_Barrier(MPI_COMM_WORLD);
MPI_Barrier(testComm);
rc = daos_cont_close(coh, NULL);
DCHECK(rc, "Failed to close container %s (%d)", o->cont, rc);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Barrier(testComm);
if (o->destroy) {
if (rank == 0) {
@ -580,7 +580,7 @@ DFS_Finalize(aiori_mod_opt_t *options)
INFO(VERBOSE_1, "Container Destroy time = %f secs", t2-t1);
}
MPI_Bcast(&rc, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&rc, 1, MPI_INT, 0, testComm);
if (rc) {
if (rank == 0)
DCHECK(rc, "Failed to destroy container %s (%d)", o->cont, rc);
@ -594,7 +594,7 @@ DFS_Finalize(aiori_mod_opt_t *options)
rc = daos_pool_disconnect(poh, NULL);
DCHECK(rc, "Failed to disconnect from pool");
MPI_CHECK(MPI_Barrier(MPI_COMM_WORLD), "barrier error");
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
if (rank == 0)
INFO(VERBOSE_1, "Finalizing DAOS..\n");

View File

@ -1076,7 +1076,7 @@ static void S3_Close_internal(aiori_fd_t* fd, s3_options_t* param, int multi_pa
MPI_Abort(testComm, 1);
}
MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
etag_vec, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
etag_vec, etag_data_size, MPI_BYTE, 0, testComm);
// --- debugging: show the gathered etag data
// (This shows the raw concatenated etag-data from each node.)
@ -1196,7 +1196,7 @@ static void S3_Close_internal(aiori_fd_t* fd, s3_options_t* param, int multi_pa
aws_iobuf_append_str(xml, "</CompleteMultipartUpload>\n");
} else {
MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
NULL, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
NULL, etag_data_size, MPI_BYTE, 0, testComm);
}
} else { /* N:N */

View File

@ -51,9 +51,9 @@ static const ior_aiori_t *backend;
static void DestroyTests(IOR_test_t *tests_head);
static char *PrependDir(IOR_param_t *, char *);
static char **ParseFileName(char *, int *);
static void InitTests(IOR_test_t * , MPI_Comm);
static void InitTests(IOR_test_t *);
static void TestIoSys(IOR_test_t *);
static void ValidateTests(IOR_param_t *);
static void ValidateTests(IOR_param_t * params, MPI_Comm com);
static IOR_offset_t WriteOrRead(IOR_param_t *test, IOR_results_t *results,
aiori_fd_t *fd, const int access,
IOR_io_buffers *ioBuffers);
@ -107,13 +107,12 @@ IOR_test_t * ior_run(int argc, char **argv, MPI_Comm world_com, FILE * world_out
IOR_test_t *tptr;
out_logfile = world_out;
out_resultfile = world_out;
mpi_comm_world = world_com;
MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
MPI_CHECK(MPI_Comm_rank(world_com, &rank), "cannot get rank");
/* setup tests, and validate parameters */
tests_head = ParseCommandLine(argc, argv);
InitTests(tests_head, world_com);
tests_head = ParseCommandLine(argc, argv, world_com);
InitTests(tests_head);
PrintHeader(argc, argv);
@ -147,20 +146,19 @@ int ior_main(int argc, char **argv)
/*
* check -h option from commandline without starting MPI;
*/
tests_head = ParseCommandLine(argc, argv);
tests_head = ParseCommandLine(argc, argv, MPI_COMM_WORLD);
/* start the MPI code */
MPI_CHECK(MPI_Init(&argc, &argv), "cannot initialize MPI");
mpi_comm_world = MPI_COMM_WORLD;
MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
MPI_CHECK(MPI_Comm_rank(MPI_COMM_WORLD, &rank), "cannot get rank");
/* set error-handling */
/*MPI_CHECK(MPI_Errhandler_set(mpi_comm_world, MPI_ERRORS_RETURN),
"cannot set errhandler"); */
/* setup tests, and validate parameters */
InitTests(tests_head, mpi_comm_world);
InitTests(tests_head);
PrintHeader(argc, argv);
@ -201,7 +199,7 @@ int ior_main(int argc, char **argv)
/*
* Initialize an IOR_param_t structure to the defaults
*/
void init_IOR_Param_t(IOR_param_t * p)
void init_IOR_Param_t(IOR_param_t * p, MPI_Comm com)
{
const char *default_aiori = aiori_default ();
assert (NULL != default_aiori);
@ -231,7 +229,8 @@ void init_IOR_Param_t(IOR_param_t * p)
p->transferSize = 262144;
p->randomSeed = -1;
p->incompressibleSeed = 573;
p->testComm = mpi_comm_world;
p->testComm = com; // this com might change for smaller tests
p->mpi_comm_world = com;
p->URI = NULL;
}
@ -567,7 +566,7 @@ static void DestroyTests(IOR_test_t *tests_head)
/*
* Distribute IOR_HINTs to all tasks' environments.
*/
void DistributeHints(void)
static void DistributeHints(MPI_Comm com)
{
char hint[MAX_HINTS][MAX_STR], fullHint[MAX_STR], hintVariable[MAX_STR];
int hintCount = 0, i;
@ -589,11 +588,9 @@ void DistributeHints(void)
}
}
MPI_CHECK(MPI_Bcast(&hintCount, sizeof(hintCount), MPI_BYTE,
0, MPI_COMM_WORLD), "cannot broadcast hints");
MPI_CHECK(MPI_Bcast(&hintCount, sizeof(hintCount), MPI_BYTE, 0, com), "cannot broadcast hints");
for (i = 0; i < hintCount; i++) {
MPI_CHECK(MPI_Bcast(&hint[i], MAX_STR, MPI_BYTE,
0, MPI_COMM_WORLD),
MPI_CHECK(MPI_Bcast(&hint[i], MAX_STR, MPI_BYTE, 0, com),
"cannot broadcast hints");
strcpy(fullHint, hint[i]);
strcpy(hintVariable, strtok(fullHint, "="));
@ -953,8 +950,12 @@ static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
* Setup tests by parsing commandline and creating test script.
* Perform a sanity-check on the configured parameters.
*/
static void InitTests(IOR_test_t *tests, MPI_Comm com)
static void InitTests(IOR_test_t *tests)
{
if(tests == NULL){
return;
}
MPI_Comm com = tests->params.mpi_comm_world;
int mpiNumNodes = 0;
int mpiNumTasks = 0;
int mpiNumTasksOnNode0 = 0;
@ -975,7 +976,7 @@ static void InitTests(IOR_test_t *tests, MPI_Comm com)
* task 0 has the environment settings for the hints, pass
* the hint=value pair to everyone else in mpi_comm_world
*/
DistributeHints();
DistributeHints(com);
/* check validity of tests and create test queue */
while (tests != NULL) {
@ -1004,11 +1005,11 @@ static void InitTests(IOR_test_t *tests, MPI_Comm com)
params->expectedAggFileSize =
params->blockSize * params->segmentCount * params->numTasks;
ValidateTests(&tests->params);
ValidateTests(&tests->params, com);
tests = tests->next;
}
init_clock();
init_clock(com);
}
/*
@ -1071,7 +1072,7 @@ static void file_hits_histogram(IOR_param_t *params)
}
MPI_CHECK(MPI_Gather(&rankOffset, 1, MPI_INT, rankoffs,
1, MPI_INT, 0, mpi_comm_world),
1, MPI_INT, 0, params->testComm),
"MPI_Gather error");
if (rank != 0)
@ -1227,21 +1228,21 @@ static void TestIoSys(IOR_test_t *test)
IOR_io_buffers ioBuffers;
/* set up communicator for test */
MPI_CHECK(MPI_Comm_group(mpi_comm_world, &orig_group),
MPI_CHECK(MPI_Comm_group(params->mpi_comm_world, &orig_group),
"MPI_Comm_group() error");
range[0] = 0; /* first rank */
range[1] = params->numTasks - 1; /* last rank */
range[2] = 1; /* stride */
MPI_CHECK(MPI_Group_range_incl(orig_group, 1, &range, &new_group),
"MPI_Group_range_incl() error");
MPI_CHECK(MPI_Comm_create(mpi_comm_world, new_group, &testComm),
MPI_CHECK(MPI_Comm_create(params->mpi_comm_world, new_group, &testComm),
"MPI_Comm_create() error");
MPI_CHECK(MPI_Group_free(&orig_group), "MPI_Group_Free() error");
MPI_CHECK(MPI_Group_free(&new_group), "MPI_Group_Free() error");
params->testComm = testComm;
if (testComm == MPI_COMM_NULL) {
/* tasks not in the group do not participate in this test */
MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
MPI_CHECK(MPI_Barrier(params->mpi_comm_world), "barrier error");
return;
}
if (rank == 0 && verbose >= VERBOSE_1) {
@ -1415,7 +1416,7 @@ static void TestIoSys(IOR_test_t *test)
if ((params->readFile || params->checkRead ) && !test_time_elapsed(params, startTime)) {
/* check for stonewall */
if(params->stoneWallingStatusFile){
params->stoneWallingWearOutIterations = ReadStoneWallingIterations(params->stoneWallingStatusFile);
params->stoneWallingWearOutIterations = ReadStoneWallingIterations(params->stoneWallingStatusFile, params->testComm);
if(params->stoneWallingWearOutIterations == -1 && rank == 0){
WARN("Could not read back the stonewalling status from the file!");
params->stoneWallingWearOutIterations = 0;
@ -1538,17 +1539,16 @@ static void TestIoSys(IOR_test_t *test)
free(hog_buf);
/* Sync with the tasks that did not participate in this test */
MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
MPI_CHECK(MPI_Barrier(params->mpi_comm_world), "barrier error");
}
/*
* Determine if valid tests from parameters.
*/
static void ValidateTests(IOR_param_t * test)
static void ValidateTests(IOR_param_t * test, MPI_Comm com)
{
IOR_param_t defaults;
init_IOR_Param_t(&defaults);
init_IOR_Param_t(&defaults, com);
if (test->repetitions <= 0)
WARN_RESET("too few test repetitions",
@ -1884,7 +1884,7 @@ static IOR_offset_t WriteOrRead(IOR_param_t *test, IOR_results_t *results,
if ( test->collective && test->deadlineForStonewalling ) {
// if collective-mode, you'll get a HANG, if some rank 'accidentally' leave this loop
// it absolutely must be an 'all or none':
MPI_CHECK(MPI_Bcast(&hitStonewall, 1, MPI_INT, 0, MPI_COMM_WORLD), "hitStonewall broadcast failed");
MPI_CHECK(MPI_Bcast(&hitStonewall, 1, MPI_INT, 0, testComm), "hitStonewall broadcast failed");
}
}
}

View File

@ -98,7 +98,8 @@ typedef struct
char * options; /* options string */
// intermediate options
int collective; /* collective I/O */
MPI_Comm testComm; /* MPI communicator */
MPI_Comm testComm; /* Current MPI communicator */
MPI_Comm mpi_comm_world; /* The global MPI communicator */
int dryRun; /* do not perform any I/Os just run evtl. inputs print dummy output */
int dualMount; /* dual mount points */
int numTasks; /* number of tasks for test */
@ -205,7 +206,7 @@ IOR_test_t *CreateTest(IOR_param_t *init_params, int test_num);
void AllocResults(IOR_test_t *test);
char * GetPlatformName(void);
void init_IOR_Param_t(IOR_param_t *p);
void init_IOR_Param_t(IOR_param_t *p, MPI_Comm global_com);
/*
* This function runs IOR given by command line, useful for testing

View File

@ -853,8 +853,8 @@ mdworkbench_results_t* md_workbench_run(int argc, char ** argv, MPI_Comm world_c
int ret;
int printhelp = 0;
char * limit_memory_P = NULL;
init_options();
init_clock(world_com);
o.com = world_com;
o.logfile = out_logfile;
@ -935,8 +935,8 @@ mdworkbench_results_t* md_workbench_run(int argc, char ** argv, MPI_Comm world_c
// MPI_Abort(o.com, 1);
//}
double bench_start;
bench_start = GetTimeStamp();
double t_bench_start;
t_bench_start = GetTimeStamp();
phase_stat_t phase_stats;
size_t result_count = (2 + o.iterations) * (o.adaptive_waiting_mode ? 7 : 1);
o.results = malloc(sizeof(mdworkbench_results_t) + sizeof(mdworkbench_result_t) * result_count);
@ -1006,7 +1006,7 @@ mdworkbench_results_t* md_workbench_run(int argc, char ** argv, MPI_Comm world_c
store_position(current_index);
}
double t_all = GetTimeStamp();
double t_all = GetTimeStamp() - t_bench_start;
if(o.backend->finalize){
o.backend->finalize(o.backend_options);
}

View File

@ -1118,7 +1118,7 @@ void file_test(const int iteration, const int ntasks, const char *path, rank_pro
if (o.stoneWallingStatusFile){
int64_t expected_items;
/* The number of items depends on the stonewalling file */
expected_items = ReadStoneWallingIterations(o.stoneWallingStatusFile);
expected_items = ReadStoneWallingIterations(o.stoneWallingStatusFile, testComm);
if(expected_items >= 0){
if(o.directory_loops > 1){
o.directory_loops = expected_items / o.items_per_dir;
@ -1944,9 +1944,8 @@ mdtest_results_t * mdtest_run(int argc, char **argv, MPI_Comm world_com, FILE *
testComm = world_com;
out_logfile = world_out;
out_resultfile = world_out;
mpi_comm_world = world_com;
init_clock();
init_clock(world_com);
mdtest_init_args();
int i, j;

View File

@ -32,7 +32,7 @@
#include "option.h"
#include "aiori.h"
IOR_param_t initialTestParams;
static IOR_param_t initialTestParams;
option_help * createGlobalOptions(IOR_param_t * params);
@ -451,9 +451,9 @@ option_help * createGlobalOptions(IOR_param_t * params){
/*
* Parse Commandline.
*/
IOR_test_t *ParseCommandLine(int argc, char **argv)
IOR_test_t *ParseCommandLine(int argc, char **argv, MPI_Comm com)
{
init_IOR_Param_t(& initialTestParams);
init_IOR_Param_t(& initialTestParams, com);
IOR_test_t *tests = NULL;

View File

@ -13,8 +13,6 @@
#include "ior.h"
extern IOR_param_t initialTestParams;
IOR_test_t *ParseCommandLine(int argc, char **argv);
IOR_test_t *ParseCommandLine(int argc, char **argv, MPI_Comm com);
#endif /* !_PARSE_OPTIONS_H */

View File

@ -65,7 +65,6 @@ int rank = 0;
int rankOffset = 0;
int verbose = VERBOSE_0; /* verbose output */
MPI_Comm testComm;
MPI_Comm mpi_comm_world;
FILE * out_logfile = NULL;
FILE * out_resultfile = NULL;
enum OutputFormat_t outputFormat;
@ -706,34 +705,34 @@ double GetTimeStamp(void)
/*
* Determine any spread (range) between node times.
*/
static double TimeDeviation(void)
static double TimeDeviation(MPI_Comm com)
{
double timestamp;
double min = 0;
double max = 0;
double roottimestamp;
MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
MPI_CHECK(MPI_Barrier(com), "barrier error");
timestamp = GetTimeStamp();
MPI_CHECK(MPI_Reduce(&timestamp, &min, 1, MPI_DOUBLE,
MPI_MIN, 0, mpi_comm_world),
MPI_MIN, 0, com),
"cannot reduce tasks' times");
MPI_CHECK(MPI_Reduce(&timestamp, &max, 1, MPI_DOUBLE,
MPI_MAX, 0, mpi_comm_world),
MPI_MAX, 0, com),
"cannot reduce tasks' times");
/* delta between individual nodes' time and root node's time */
roottimestamp = timestamp;
MPI_CHECK(MPI_Bcast(&roottimestamp, 1, MPI_DOUBLE, 0, mpi_comm_world),
MPI_CHECK(MPI_Bcast(&roottimestamp, 1, MPI_DOUBLE, 0, com),
"cannot broadcast root's time");
wall_clock_delta = timestamp - roottimestamp;
return max - min;
}
void init_clock(){
void init_clock(MPI_Comm com){
/* check for skew between tasks' start times */
wall_clock_deviation = TimeDeviation();
wall_clock_deviation = TimeDeviation(com);
}
char * PrintTimestamp() {
@ -751,16 +750,16 @@ char * PrintTimestamp() {
return datestring;
}
int64_t ReadStoneWallingIterations(char * const filename){
int64_t ReadStoneWallingIterations(char * const filename, MPI_Comm com){
long long data;
if(rank != 0){
MPI_Bcast( & data, 1, MPI_LONG_LONG_INT, 0, mpi_comm_world);
MPI_Bcast( & data, 1, MPI_LONG_LONG_INT, 0, com);
return data;
}else{
FILE * out = fopen(filename, "r");
if (out == NULL){
data = -1;
MPI_Bcast( & data, 1, MPI_LONG_LONG_INT, 0, mpi_comm_world);
MPI_Bcast( & data, 1, MPI_LONG_LONG_INT, 0, com);
return data;
}
int ret = fscanf(out, "%lld", & data);
@ -768,7 +767,7 @@ int64_t ReadStoneWallingIterations(char * const filename){
return -1;
}
fclose(out);
MPI_Bcast( & data, 1, MPI_LONG_LONG_INT, 0, mpi_comm_world);
MPI_Bcast( & data, 1, MPI_LONG_LONG_INT, 0, com);
return data;
}
}

View File

@ -22,7 +22,6 @@ extern int rank;
extern int rankOffset;
extern int verbose;
extern MPI_Comm testComm;
extern MPI_Comm mpi_comm_world;
extern FILE * out_resultfile;
extern enum OutputFormat_t outputFormat; /* format of the output */
@ -52,10 +51,10 @@ void updateParsedOptions(IOR_param_t * options, options_all_t * global_options);
size_t NodeMemoryStringToBytes(char *size_str);
/* Returns -1, if cannot be read */
int64_t ReadStoneWallingIterations(char * const filename);
int64_t ReadStoneWallingIterations(char * const filename, MPI_Comm com);
void StoreStoneWallingIterations(char * const filename, int64_t count);
void init_clock(void);
void init_clock(MPI_Comm com);
double GetTimeStamp(void);
char * PrintTimestamp(); // TODO remove this function
unsigned long GetProcessorAndCore(int *chip, int *core);

View File

@ -19,13 +19,13 @@ MDTEST 2 -I 20 -a DUMMY -x stonewall-md.log -T -v
MDTEST 2 -I 20 -a DUMMY -x stonewall-md.log -D -v
#shared tests
IOR 2 -a POSIX -w -z -Y -e -i1 -m -t 100k -b 100k
IOR 2 -a POSIX -w -k -e -i1 -m -t 100k -b 100k
IOR 2 -a POSIX -r -z-k -e -i1 -m -t 100k -b 100k
IOR 2 -a POSIX -w -z -Y -e -i1 -m -t 100k -b 200k
IOR 2 -a POSIX -w -k -e -i1 -m -t 100k -b 200k
IOR 2 -a POSIX -r -z-k -e -i1 -m -t 100k -b 200k
#test mutually exclusive options
IOR 2 -a POSIX -w -z -k -e -i1 -m -t 100k -b 100k
IOR 2 -a POSIX -w -z -k -e -i1 -m -t 100k -b 100k
IOR 2 -a POSIX -w -z -k -e -i1 -m -t 100k -b 200k
IOR 2 -a POSIX -w -z -k -e -i1 -m -t 100k -b 200k
IOR 2 -a POSIX -w -Z -i1 -m -t 100k -b 100k -d 0.1
# Now set the num tasks per node to 1: