From 348754c87a99222bb7ab20921cc6eb465dd2ae8c Mon Sep 17 00:00:00 2001 From: "Julian M. Kunkel" Date: Mon, 2 Nov 2020 18:35:01 +0000 Subject: [PATCH] md-workbench code ported. --- src/md-workbench.c | 229 ++++++++++++++++++++++----------------------- src/md-workbench.h | 2 - 2 files changed, 112 insertions(+), 119 deletions(-) diff --git a/src/md-workbench.c b/src/md-workbench.c index ddf250d..5b39c45 100644 --- a/src/md-workbench.c +++ b/src/md-workbench.c @@ -18,6 +18,8 @@ This is the modified version md-workbench-fs that can utilize AIORI. It follows the hierarchical file system semantics in contrast to the md-workbench (without -fs) which has dataset and object semantics. */ +#define FILEMODE S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH +#define DIRMODE S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IXOTH #define CHECK_MPI_RET(ret) if (ret != MPI_SUCCESS){ printf("Unexpected error in MPI on Line %d\n", __LINE__);} #define LLU (long long unsigned) @@ -57,6 +59,7 @@ struct benchmark_options{ int quiet_output; char * run_info_file; + char * prefix; // directory to work on int ignore_precreate_errors; int rank; @@ -68,20 +71,29 @@ struct benchmark_options{ uint64_t start_item_number; }; -static int global_iteration = 0; +static int global_iteration; struct benchmark_options o; +static void def_dset_name(char * out_name, int n, int d){ + sprintf(out_name, "%s/%d_%d", o.prefix, n, d); +} + +static void def_obj_name(char * out_name, char * dset, int n, int d, int i){ + sprintf(out_name, "%s/%d_%d/file-%d", dset, n, d, i); +} + void init_options(){ memset(& o, 0, sizeof(o)); o.interface = "POSIX"; + o.prefix = "./out"; o.num = 1000; o.precreate = 3000; o.dset_count = 10; o.offset = 1; o.iterations = 3; o.file_size = 3901; - o.run_info_file = "mdtest.status"; + o.run_info_file = "md-workbench.status"; } static void wait(double runtime){ @@ -130,7 +142,7 @@ static void print_detailed_stat_header(){ } static int sum_err(phase_stat_t * p){ - return p->dset_name.err + p->dset_create.err + p->dset_delete.err + p->obj_name.err + p->obj_create.err + p->obj_read.err + p->obj_stat.err + p->obj_delete.err; + return p->dset_create.err + p->dset_delete.err + p->obj_create.err + p->obj_read.err + p->obj_stat.err + p->obj_delete.err; } static double statistics_mean(int count, double * arr){ @@ -178,10 +190,10 @@ static void print_p_stat(char * buff, const char * name, phase_stat_t * p, doubl } if (o.print_detailed_stats){ - sprintf(buff, "%s \t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%.3fs\t%.3fs\t%.2f MiB/s %.4e", name, p->dset_name.suc, p->dset_create.suc, p->dset_delete.suc, p->obj_name.suc, p->obj_create.suc, p->obj_read.suc, p->obj_stat.suc, p->obj_delete.suc, p->t, t, tp, p->max_op_time); + sprintf(buff, "%s \t%d\t%d\t%d\t%d\t%d\t%d\t%.3fs\t%.3fs\t%.2f MiB/s %.4e", name, p->dset_create.suc, p->dset_delete.suc, p->obj_create.suc, p->obj_read.suc, p->obj_stat.suc, p->obj_delete.suc, p->t, t, tp, p->max_op_time); if (errs > 0){ - sprintf(buff, "%s err\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d", name, p->dset_name.err, p->dset_create.err, p->dset_delete.err, p->obj_name.err, p->obj_create.err, p->obj_read.err, p->obj_stat.err, p->obj_delete.err); + sprintf(buff, "%s err\t%d\t%d\t%d\t%d\t%d\t%d", name, p->dset_create.err, p->dset_delete.err, p->obj_create.err, p->obj_read.err, p->obj_stat.err, p->obj_delete.err); } }else{ int pos = 0; @@ -327,7 +339,7 @@ static void compute_histogram(const char * name, time_result_t * times, time_sta static void end_phase(const char * name, phase_stat_t * p){ int ret; - char buff[4096]; + char buff[MAX_PATHLEN]; //char * limit_memory_P = NULL; MPI_Barrier(MPI_COMM_WORLD); @@ -348,7 +360,7 @@ static void end_phase(const char * name, phase_stat_t * p){ } ret = MPI_Gather(& p->t, 1, MPI_DOUBLE, g_stat.t_all, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD); CHECK_MPI_RET(ret) - ret = MPI_Reduce(& p->dset_name, & g_stat.dset_name, 2*(3+5), MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); + ret = MPI_Reduce(& p->dset_create, & g_stat.dset_create, 2*(2+4), MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); CHECK_MPI_RET(ret) ret = MPI_Reduce(& p->max_op_time, & g_stat.max_op_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); CHECK_MPI_RET(ret) @@ -410,12 +422,12 @@ static void end_phase(const char * name, phase_stat_t * p){ print_p_stat(buff, name, p, p->t, 0); printf("0: %s\n", buff); for(int i=1; i < o.size; i++){ - MPI_Recv(buff, 4096, MPI_CHAR, i, 4711, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(buff, MAX_PATHLEN, MPI_CHAR, i, 4711, MPI_COMM_WORLD, MPI_STATUS_IGNORE); printf("%d: %s\n", i, buff); } }else{ print_p_stat(buff, name, p, p->t, 0); - MPI_Send(buff, 4096, MPI_CHAR, 0, 4711, MPI_COMM_WORLD); + MPI_Send(buff, MAX_PATHLEN, MPI_CHAR, 0, 4711, MPI_COMM_WORLD); } } @@ -444,22 +456,14 @@ static void end_phase(const char * name, phase_stat_t * p){ } void run_precreate(phase_stat_t * s, int current_index){ - char dset[4096]; - char obj_name[4096]; + char dset[MAX_PATHLEN]; + char obj_name[MAX_PATHLEN]; int ret; for(int i=0; i < o.dset_count; i++){ - ret = o.backend->def_dset_name(dset, o.rank, i); - if (ret != 0){ - if (! o.ignore_precreate_errors){ - printf("Error defining the dataset name\n"); - MPI_Abort(MPI_COMM_WORLD, 1); - } - s->dset_name.err++; - continue; - } - s->dset_name.suc++; - ret = o.backend->create_dset(dset); + def_dset_name(dset, o.rank, i); + + ret = o.backend->mkdir(dset, DIRMODE, o.backend_options); if (ret == 0){ s->dset_create.suc++; }else{ @@ -480,38 +484,32 @@ void run_precreate(phase_stat_t * s, int current_index){ // create the obj for(int f=current_index; f < o.precreate; f++){ for(int d=0; d < o.dset_count; d++){ - ret = o.backend->def_dset_name(dset, o.rank, d); + def_dset_name(dset, o.rank, d); pos++; - ret = o.backend->def_obj_name(obj_name, o.rank, d, f); - if (ret != 0){ - s->dset_name.err++; - if (! o.ignore_precreate_errors){ - printf("%d: Error while creating the obj name\n", o.rank); - fflush(stdout); - MPI_Abort(MPI_COMM_WORLD, 1); - } - s->obj_name.err++; - continue; - } + def_obj_name(obj_name, dset, o.rank, d, f); op_timer = GetTimeStamp(); - ret = o.backend->write_obj(dset, obj_name, buf, o.file_size); - add_timed_result(op_timer, s->phase_start_timer, s->time_create, pos, & s->max_op_time, & op_time); - - if (o.verbosity >= 2){ - printf("%d: write %s:%s (%d)\n", o.rank, dset, obj_name, ret); + aiori_fd_t * aiori_fh = o.backend->open(obj_name, IOR_WRONLY | IOR_CREAT, o.backend_options); + if (NULL == aiori_fh){ + FAIL("unable to open file %s", obj_name); } - - if (ret == 0){ + if ( o.file_size == (int) o.backend->xfer(WRITE, aiori_fh, (IOR_size_t *) buf, o.file_size, 0, o.backend_options)) { s->obj_create.suc++; }else{ s->obj_create.err++; if (! o.ignore_precreate_errors){ - printf("%d: Error while creating the obj: %s\n", o.rank, obj_name); - fflush(stdout); - MPI_Abort(MPI_COMM_WORLD, 1); + printf("%d: Error while creating the obj: %s\n", o.rank, obj_name); + fflush(stdout); + MPI_Abort(MPI_COMM_WORLD, 1); } } + o.backend->close(aiori_fh, o.backend_options); + + add_timed_result(op_timer, s->phase_start_timer, s->time_create, pos, & s->max_op_time, & op_time); + + if (o.verbosity >= 2){ + printf("%d: write %s:%s (%d)\n", o.rank, dset, obj_name, ret); + } } } free(buf); @@ -519,8 +517,8 @@ void run_precreate(phase_stat_t * s, int current_index){ /* FIFO: create a new file, write to it. Then read from the first created file, delete it... */ void run_benchmark(phase_stat_t * s, int * current_index_p){ - char dset[4096]; - char obj_name[4096]; + char dset[MAX_PATHLEN]; + char obj_name[MAX_PATHLEN]; int ret; char * buf = malloc(o.file_size); memset(buf, o.rank % 256, o.file_size); @@ -531,25 +529,26 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){ int armed_stone_wall = (o.stonewall_timer > 0); int f; double phase_allreduce_time = 0; + aiori_fd_t * aiori_fh; for(f=0; f < total_num; f++){ float bench_runtime = 0; // the time since start for(int d=0; d < o.dset_count; d++){ double op_time; + struct stat stat_buf; const int prevFile = f + start_index; pos++; int readRank = (o.rank - o.offset * (d+1)) % o.size; readRank = readRank < 0 ? readRank + o.size : readRank; - ret = o.backend->def_obj_name(obj_name, readRank, d, prevFile); - if (ret != 0){ - s->obj_name.err++; - continue; - } - ret = o.backend->def_dset_name(dset, readRank, d); + def_dset_name(dset, readRank, d); + def_obj_name(obj_name, dset, readRank, d, prevFile); op_timer = GetTimeStamp(); - ret = o.backend->stat_obj(dset, obj_name, o.file_size); + + ret = o.backend->stat(obj_name, & stat_buf, o.backend_options); + // TODO potentially check return value must be identical to o.file_size + bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_stat, pos, & s->max_op_time, & op_time); if(o.relative_waiting_factor > 1e-9) { wait(op_time); @@ -572,51 +571,61 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){ } op_timer = GetTimeStamp(); - ret = o.backend->read_obj(dset, obj_name, buf, o.file_size); + aiori_fh = o.backend->open(obj_name, IOR_WRONLY | IOR_CREAT, o.backend_options); + if (NULL == aiori_fh){ + FAIL("unable to open file %s", obj_name); + } + if ( o.file_size == (int) o.backend->xfer(READ, aiori_fh, (IOR_size_t *) buf, o.file_size, 0, o.backend_options)) { + s->obj_read.suc++; + }else{ + s->obj_read.err++; + printf("%d: Error while reading the obj: %s\n", o.rank, obj_name); + fflush(stdout); + MPI_Abort(MPI_COMM_WORLD, 1); + } + o.backend->close(aiori_fh, o.backend_options); + bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_read, pos, & s->max_op_time, & op_time); if(o.relative_waiting_factor > 1e-9) { wait(op_time); } - - if (ret == 0){ - s->obj_read.suc++; - }else{ - printf("%d: Error while reading the file %s (%s)\n", o.rank, dset, strerror(errno)); - s->obj_read.err++; - } - if(o.read_only){ continue; } op_timer = GetTimeStamp(); - ret = o.backend->delete_obj(dset, obj_name); + o.backend->delete(obj_name, o.backend_options); bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_delete, pos, & s->max_op_time, & op_time); if(o.relative_waiting_factor > 1e-9) { wait(op_time); } if (o.verbosity >= 2){ - printf("%d: delete %s:%s (%d)\n", o.rank, dset, obj_name, ret); - } - - if (ret == 0){ - s->obj_delete.suc++; - }else{ - printf("%d: Error while deleting the object %s:%s\n", o.rank, dset, obj_name); - s->obj_delete.err++; + printf("%d: delete %s:%s\n", o.rank, dset, obj_name); } + s->obj_delete.suc++; int writeRank = (o.rank + o.offset * (d+1)) % o.size; - ret = o.backend->def_obj_name(obj_name, writeRank, d, o.precreate + prevFile); - if (ret != 0){ - s->obj_name.err++; - continue; - } - ret = o.backend->def_dset_name(dset, writeRank, d); + def_dset_name(dset, writeRank, d); + def_obj_name(obj_name, dset, writeRank, d, o.precreate + prevFile); op_timer = GetTimeStamp(); - ret = o.backend->write_obj(dset, obj_name, buf, o.file_size); + aiori_fh = o.backend->open(obj_name, IOR_WRONLY | IOR_CREAT, o.backend_options); + if (NULL == aiori_fh){ + FAIL("unable to open file %s", obj_name); + } + if ( o.file_size == (int) o.backend->xfer(WRITE, aiori_fh, (IOR_size_t *) buf, o.file_size, 0, o.backend_options)) { + s->obj_create.suc++; + }else{ + s->obj_create.err++; + if (! o.ignore_precreate_errors){ + printf("%d: Error while creating the obj: %s\n", o.rank, obj_name); + fflush(stdout); + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + o.backend->close(aiori_fh, o.backend_options); + bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_create, pos, & s->max_op_time, & op_time); if(o.relative_waiting_factor > 1e-9) { wait(op_time); @@ -625,14 +634,6 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){ if (o.verbosity >= 2){ printf("%d: write %s:%s (%d)\n", o.rank, dset, obj_name, ret); } - - if (ret == 0){ - s->obj_create.suc++; - }else{ - if (o.verbosity) - printf("%d: Error while writing the obj: %s\n", o.rank, dset); - s->obj_create.err++; - } } // end loop if(armed_stone_wall && bench_runtime >= o.stonewall_timer){ @@ -681,45 +682,36 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){ } void run_cleanup(phase_stat_t * s, int start_index){ - char dset[4096]; - char obj_name[4096]; - int ret; + char dset[MAX_PATHLEN]; + char obj_name[MAX_PATHLEN]; double op_timer; // timer for individual operations size_t pos = -1; // position inside the individual measurement array for(int d=0; d < o.dset_count; d++){ - ret = o.backend->def_dset_name(dset, o.rank, d); + def_dset_name(dset, o.rank, d); for(int f=0; f < o.precreate; f++){ double op_time; pos++; - ret = o.backend->def_obj_name(obj_name, o.rank, d, f + start_index); + def_obj_name(obj_name, dset, o.rank, d, f + start_index); op_timer = GetTimeStamp(); - ret = o.backend->delete_obj(dset, obj_name); + o.backend->delete(obj_name, o.backend_options); add_timed_result(op_timer, s->phase_start_timer, s->time_delete, pos, & s->max_op_time, & op_time); if (o.verbosity >= 2){ - printf("%d: delete %s:%s (%d)\n", o.rank, dset, obj_name, ret); - } - - if (ret == 0){ - s->obj_delete.suc++; - }else{ - s->obj_delete.err++; + printf("%d: delete %s\n", o.rank, obj_name); } + s->obj_delete.suc++; } - ret = o.backend->rm_dset(dset); - - if (o.verbosity >= 2){ - printf("%d: delete dset %s (%d)\n", o.rank, dset, ret); - } - - if (ret == 0){ + if (o.backend->rmdir(dset, o.backend_options)) { s->dset_delete.suc++; }else{ - s->dset_delete.err++; + printf("unable to remove directory %s", dset); + } + if (o.verbosity >= 2){ + printf("%d: delete dset %s\n", o.rank, dset); } } } @@ -733,6 +725,7 @@ static option_help options [] = { {0, "latency-all", "Keep the latency files from all ranks.", OPTION_FLAG, 'd', & o.latency_keep_all}, {'P', "precreate-per-set", "Number of object to precreate per data set.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.precreate}, {'D', "data-sets", "Number of data sets covered per process and iteration.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.dset_count}, + {'o', NULL, "Output directory", OPTION_OPTIONAL_ARGUMENT, 's', & o.prefix}, {'q', "quiet", "Avoid irrelevant printing.", OPTION_FLAG, 'd', & o.quiet_output}, //{'m', "lim-free-mem", "Allocate memory until this limit (in MiB) is reached.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.limit_memory}, // {'M', "lim-free-mem-phase", "Allocate memory until this limit (in MiB) is reached between the phases, but free it before starting the next phase; the time is NOT included for the phase.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.limit_memory_between_phases}, @@ -799,6 +792,7 @@ int md_workbench(int argc, char ** argv){ int printhelp = 0; char * limit_memory_P = NULL; + global_iteration = 0; init_options(); MPI_Comm_rank(MPI_COMM_WORLD, & o.rank); @@ -816,10 +810,12 @@ int md_workbench(int argc, char ** argv){ options_all_t * global_options = airoi_create_all_module_options(options); int parsed = option_parse(argc, argv, global_options); o.backend = aiori_select(o.interface); - if (o.backend == NULL) + if (o.backend == NULL){ ERR("Unrecognized I/O API"); - if (! o.backend->enable_mdtest) + } + if (! o.backend->enable_mdtest){ ERR("Backend doesn't support MDWorbench"); + } o.backend_options = airoi_update_module_options(o.backend, global_options); if (!(o.phase_cleanup || o.phase_precreate || o.phase_benchmark)){ @@ -832,7 +828,9 @@ int md_workbench(int argc, char ** argv){ exit(1); } - o.backend->initialize(o.backend_options); + if (o.backend->initialize){ + o.backend->initialize(o.backend_options); + } if(o.backend->xfer_hints){ o.backend->xfer_hints(& o.hints); } @@ -884,10 +882,8 @@ int md_workbench(int argc, char ** argv){ if (o.phase_precreate){ if (o.rank == 0){ - ret = o.backend->prepare_global(); - if ( ret != 0 ){ - printf("Rank 0 could not prepare the run, aborting\n"); - MPI_Abort(MPI_COMM_WORLD, 1); + if (o.backend->mkdir(o.prefix, DIRMODE, o.backend_options) != 0) { + EWARNF("Unable to create test directory %s", o.prefix); } } init_stats(& phase_stats, o.precreate * o.dset_count); @@ -935,9 +931,8 @@ int md_workbench(int argc, char ** argv){ end_phase("cleanup", & phase_stats); if (o.rank == 0){ - ret = o.backend->purge_global(); - if (ret != 0){ - printf("Rank 0: Error purging the global environment\n"); + if (! o.backend->rmdir(o.prefix, o.backend_options)) { + FAIL("unable to remove directory %s", o.prefix); } } }else{ diff --git a/src/md-workbench.h b/src/md-workbench.h index 0be70b1..c556af8 100644 --- a/src/md-workbench.h +++ b/src/md-workbench.h @@ -30,11 +30,9 @@ typedef struct{ // NOTE: if this type is changed, adjust end_phase() !!! double t; // maximum time double * t_all; - op_stat_t dset_name; op_stat_t dset_create; op_stat_t dset_delete; - op_stat_t obj_name; op_stat_t obj_create; op_stat_t obj_read; op_stat_t obj_stat;