md-workbench code ported.

master
Julian M. Kunkel 2020-11-02 18:35:01 +00:00
parent 9ace15cce3
commit 348754c87a
2 changed files with 112 additions and 119 deletions

View File

@ -18,6 +18,8 @@ This is the modified version md-workbench-fs that can utilize AIORI.
It follows the hierarchical file system semantics in contrast to the md-workbench (without -fs) which has dataset and object semantics.
*/
#define FILEMODE S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH
#define DIRMODE S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IXOTH
#define CHECK_MPI_RET(ret) if (ret != MPI_SUCCESS){ printf("Unexpected error in MPI on Line %d\n", __LINE__);}
#define LLU (long long unsigned)
@ -57,6 +59,7 @@ struct benchmark_options{
int quiet_output;
char * run_info_file;
char * prefix; // directory to work on
int ignore_precreate_errors;
int rank;
@ -68,20 +71,29 @@ struct benchmark_options{
uint64_t start_item_number;
};
static int global_iteration = 0;
static int global_iteration;
struct benchmark_options o;
static void def_dset_name(char * out_name, int n, int d){
sprintf(out_name, "%s/%d_%d", o.prefix, n, d);
}
static void def_obj_name(char * out_name, char * dset, int n, int d, int i){
sprintf(out_name, "%s/%d_%d/file-%d", dset, n, d, i);
}
void init_options(){
memset(& o, 0, sizeof(o));
o.interface = "POSIX";
o.prefix = "./out";
o.num = 1000;
o.precreate = 3000;
o.dset_count = 10;
o.offset = 1;
o.iterations = 3;
o.file_size = 3901;
o.run_info_file = "mdtest.status";
o.run_info_file = "md-workbench.status";
}
static void wait(double runtime){
@ -130,7 +142,7 @@ static void print_detailed_stat_header(){
}
static int sum_err(phase_stat_t * p){
return p->dset_name.err + p->dset_create.err + p->dset_delete.err + p->obj_name.err + p->obj_create.err + p->obj_read.err + p->obj_stat.err + p->obj_delete.err;
return p->dset_create.err + p->dset_delete.err + p->obj_create.err + p->obj_read.err + p->obj_stat.err + p->obj_delete.err;
}
static double statistics_mean(int count, double * arr){
@ -178,10 +190,10 @@ static void print_p_stat(char * buff, const char * name, phase_stat_t * p, doubl
}
if (o.print_detailed_stats){
sprintf(buff, "%s \t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%.3fs\t%.3fs\t%.2f MiB/s %.4e", name, p->dset_name.suc, p->dset_create.suc, p->dset_delete.suc, p->obj_name.suc, p->obj_create.suc, p->obj_read.suc, p->obj_stat.suc, p->obj_delete.suc, p->t, t, tp, p->max_op_time);
sprintf(buff, "%s \t%d\t%d\t%d\t%d\t%d\t%d\t%.3fs\t%.3fs\t%.2f MiB/s %.4e", name, p->dset_create.suc, p->dset_delete.suc, p->obj_create.suc, p->obj_read.suc, p->obj_stat.suc, p->obj_delete.suc, p->t, t, tp, p->max_op_time);
if (errs > 0){
sprintf(buff, "%s err\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d", name, p->dset_name.err, p->dset_create.err, p->dset_delete.err, p->obj_name.err, p->obj_create.err, p->obj_read.err, p->obj_stat.err, p->obj_delete.err);
sprintf(buff, "%s err\t%d\t%d\t%d\t%d\t%d\t%d", name, p->dset_create.err, p->dset_delete.err, p->obj_create.err, p->obj_read.err, p->obj_stat.err, p->obj_delete.err);
}
}else{
int pos = 0;
@ -327,7 +339,7 @@ static void compute_histogram(const char * name, time_result_t * times, time_sta
static void end_phase(const char * name, phase_stat_t * p){
int ret;
char buff[4096];
char buff[MAX_PATHLEN];
//char * limit_memory_P = NULL;
MPI_Barrier(MPI_COMM_WORLD);
@ -348,7 +360,7 @@ static void end_phase(const char * name, phase_stat_t * p){
}
ret = MPI_Gather(& p->t, 1, MPI_DOUBLE, g_stat.t_all, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
CHECK_MPI_RET(ret)
ret = MPI_Reduce(& p->dset_name, & g_stat.dset_name, 2*(3+5), MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
ret = MPI_Reduce(& p->dset_create, & g_stat.dset_create, 2*(2+4), MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
CHECK_MPI_RET(ret)
ret = MPI_Reduce(& p->max_op_time, & g_stat.max_op_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
CHECK_MPI_RET(ret)
@ -410,12 +422,12 @@ static void end_phase(const char * name, phase_stat_t * p){
print_p_stat(buff, name, p, p->t, 0);
printf("0: %s\n", buff);
for(int i=1; i < o.size; i++){
MPI_Recv(buff, 4096, MPI_CHAR, i, 4711, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(buff, MAX_PATHLEN, MPI_CHAR, i, 4711, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("%d: %s\n", i, buff);
}
}else{
print_p_stat(buff, name, p, p->t, 0);
MPI_Send(buff, 4096, MPI_CHAR, 0, 4711, MPI_COMM_WORLD);
MPI_Send(buff, MAX_PATHLEN, MPI_CHAR, 0, 4711, MPI_COMM_WORLD);
}
}
@ -444,22 +456,14 @@ static void end_phase(const char * name, phase_stat_t * p){
}
void run_precreate(phase_stat_t * s, int current_index){
char dset[4096];
char obj_name[4096];
char dset[MAX_PATHLEN];
char obj_name[MAX_PATHLEN];
int ret;
for(int i=0; i < o.dset_count; i++){
ret = o.backend->def_dset_name(dset, o.rank, i);
if (ret != 0){
if (! o.ignore_precreate_errors){
printf("Error defining the dataset name\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
s->dset_name.err++;
continue;
}
s->dset_name.suc++;
ret = o.backend->create_dset(dset);
def_dset_name(dset, o.rank, i);
ret = o.backend->mkdir(dset, DIRMODE, o.backend_options);
if (ret == 0){
s->dset_create.suc++;
}else{
@ -480,38 +484,32 @@ void run_precreate(phase_stat_t * s, int current_index){
// create the obj
for(int f=current_index; f < o.precreate; f++){
for(int d=0; d < o.dset_count; d++){
ret = o.backend->def_dset_name(dset, o.rank, d);
def_dset_name(dset, o.rank, d);
pos++;
ret = o.backend->def_obj_name(obj_name, o.rank, d, f);
if (ret != 0){
s->dset_name.err++;
if (! o.ignore_precreate_errors){
printf("%d: Error while creating the obj name\n", o.rank);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, 1);
}
s->obj_name.err++;
continue;
}
def_obj_name(obj_name, dset, o.rank, d, f);
op_timer = GetTimeStamp();
ret = o.backend->write_obj(dset, obj_name, buf, o.file_size);
add_timed_result(op_timer, s->phase_start_timer, s->time_create, pos, & s->max_op_time, & op_time);
if (o.verbosity >= 2){
printf("%d: write %s:%s (%d)\n", o.rank, dset, obj_name, ret);
aiori_fd_t * aiori_fh = o.backend->open(obj_name, IOR_WRONLY | IOR_CREAT, o.backend_options);
if (NULL == aiori_fh){
FAIL("unable to open file %s", obj_name);
}
if (ret == 0){
if ( o.file_size == (int) o.backend->xfer(WRITE, aiori_fh, (IOR_size_t *) buf, o.file_size, 0, o.backend_options)) {
s->obj_create.suc++;
}else{
s->obj_create.err++;
if (! o.ignore_precreate_errors){
printf("%d: Error while creating the obj: %s\n", o.rank, obj_name);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, 1);
printf("%d: Error while creating the obj: %s\n", o.rank, obj_name);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
o.backend->close(aiori_fh, o.backend_options);
add_timed_result(op_timer, s->phase_start_timer, s->time_create, pos, & s->max_op_time, & op_time);
if (o.verbosity >= 2){
printf("%d: write %s:%s (%d)\n", o.rank, dset, obj_name, ret);
}
}
}
free(buf);
@ -519,8 +517,8 @@ void run_precreate(phase_stat_t * s, int current_index){
/* FIFO: create a new file, write to it. Then read from the first created file, delete it... */
void run_benchmark(phase_stat_t * s, int * current_index_p){
char dset[4096];
char obj_name[4096];
char dset[MAX_PATHLEN];
char obj_name[MAX_PATHLEN];
int ret;
char * buf = malloc(o.file_size);
memset(buf, o.rank % 256, o.file_size);
@ -531,25 +529,26 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){
int armed_stone_wall = (o.stonewall_timer > 0);
int f;
double phase_allreduce_time = 0;
aiori_fd_t * aiori_fh;
for(f=0; f < total_num; f++){
float bench_runtime = 0; // the time since start
for(int d=0; d < o.dset_count; d++){
double op_time;
struct stat stat_buf;
const int prevFile = f + start_index;
pos++;
int readRank = (o.rank - o.offset * (d+1)) % o.size;
readRank = readRank < 0 ? readRank + o.size : readRank;
ret = o.backend->def_obj_name(obj_name, readRank, d, prevFile);
if (ret != 0){
s->obj_name.err++;
continue;
}
ret = o.backend->def_dset_name(dset, readRank, d);
def_dset_name(dset, readRank, d);
def_obj_name(obj_name, dset, readRank, d, prevFile);
op_timer = GetTimeStamp();
ret = o.backend->stat_obj(dset, obj_name, o.file_size);
ret = o.backend->stat(obj_name, & stat_buf, o.backend_options);
// TODO potentially check return value must be identical to o.file_size
bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_stat, pos, & s->max_op_time, & op_time);
if(o.relative_waiting_factor > 1e-9) {
wait(op_time);
@ -572,51 +571,61 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){
}
op_timer = GetTimeStamp();
ret = o.backend->read_obj(dset, obj_name, buf, o.file_size);
aiori_fh = o.backend->open(obj_name, IOR_WRONLY | IOR_CREAT, o.backend_options);
if (NULL == aiori_fh){
FAIL("unable to open file %s", obj_name);
}
if ( o.file_size == (int) o.backend->xfer(READ, aiori_fh, (IOR_size_t *) buf, o.file_size, 0, o.backend_options)) {
s->obj_read.suc++;
}else{
s->obj_read.err++;
printf("%d: Error while reading the obj: %s\n", o.rank, obj_name);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, 1);
}
o.backend->close(aiori_fh, o.backend_options);
bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_read, pos, & s->max_op_time, & op_time);
if(o.relative_waiting_factor > 1e-9) {
wait(op_time);
}
if (ret == 0){
s->obj_read.suc++;
}else{
printf("%d: Error while reading the file %s (%s)\n", o.rank, dset, strerror(errno));
s->obj_read.err++;
}
if(o.read_only){
continue;
}
op_timer = GetTimeStamp();
ret = o.backend->delete_obj(dset, obj_name);
o.backend->delete(obj_name, o.backend_options);
bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_delete, pos, & s->max_op_time, & op_time);
if(o.relative_waiting_factor > 1e-9) {
wait(op_time);
}
if (o.verbosity >= 2){
printf("%d: delete %s:%s (%d)\n", o.rank, dset, obj_name, ret);
}
if (ret == 0){
s->obj_delete.suc++;
}else{
printf("%d: Error while deleting the object %s:%s\n", o.rank, dset, obj_name);
s->obj_delete.err++;
printf("%d: delete %s:%s\n", o.rank, dset, obj_name);
}
s->obj_delete.suc++;
int writeRank = (o.rank + o.offset * (d+1)) % o.size;
ret = o.backend->def_obj_name(obj_name, writeRank, d, o.precreate + prevFile);
if (ret != 0){
s->obj_name.err++;
continue;
}
ret = o.backend->def_dset_name(dset, writeRank, d);
def_dset_name(dset, writeRank, d);
def_obj_name(obj_name, dset, writeRank, d, o.precreate + prevFile);
op_timer = GetTimeStamp();
ret = o.backend->write_obj(dset, obj_name, buf, o.file_size);
aiori_fh = o.backend->open(obj_name, IOR_WRONLY | IOR_CREAT, o.backend_options);
if (NULL == aiori_fh){
FAIL("unable to open file %s", obj_name);
}
if ( o.file_size == (int) o.backend->xfer(WRITE, aiori_fh, (IOR_size_t *) buf, o.file_size, 0, o.backend_options)) {
s->obj_create.suc++;
}else{
s->obj_create.err++;
if (! o.ignore_precreate_errors){
printf("%d: Error while creating the obj: %s\n", o.rank, obj_name);
fflush(stdout);
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
o.backend->close(aiori_fh, o.backend_options);
bench_runtime = add_timed_result(op_timer, s->phase_start_timer, s->time_create, pos, & s->max_op_time, & op_time);
if(o.relative_waiting_factor > 1e-9) {
wait(op_time);
@ -625,14 +634,6 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){
if (o.verbosity >= 2){
printf("%d: write %s:%s (%d)\n", o.rank, dset, obj_name, ret);
}
if (ret == 0){
s->obj_create.suc++;
}else{
if (o.verbosity)
printf("%d: Error while writing the obj: %s\n", o.rank, dset);
s->obj_create.err++;
}
} // end loop
if(armed_stone_wall && bench_runtime >= o.stonewall_timer){
@ -681,45 +682,36 @@ void run_benchmark(phase_stat_t * s, int * current_index_p){
}
void run_cleanup(phase_stat_t * s, int start_index){
char dset[4096];
char obj_name[4096];
int ret;
char dset[MAX_PATHLEN];
char obj_name[MAX_PATHLEN];
double op_timer; // timer for individual operations
size_t pos = -1; // position inside the individual measurement array
for(int d=0; d < o.dset_count; d++){
ret = o.backend->def_dset_name(dset, o.rank, d);
def_dset_name(dset, o.rank, d);
for(int f=0; f < o.precreate; f++){
double op_time;
pos++;
ret = o.backend->def_obj_name(obj_name, o.rank, d, f + start_index);
def_obj_name(obj_name, dset, o.rank, d, f + start_index);
op_timer = GetTimeStamp();
ret = o.backend->delete_obj(dset, obj_name);
o.backend->delete(obj_name, o.backend_options);
add_timed_result(op_timer, s->phase_start_timer, s->time_delete, pos, & s->max_op_time, & op_time);
if (o.verbosity >= 2){
printf("%d: delete %s:%s (%d)\n", o.rank, dset, obj_name, ret);
}
if (ret == 0){
s->obj_delete.suc++;
}else{
s->obj_delete.err++;
printf("%d: delete %s\n", o.rank, obj_name);
}
s->obj_delete.suc++;
}
ret = o.backend->rm_dset(dset);
if (o.verbosity >= 2){
printf("%d: delete dset %s (%d)\n", o.rank, dset, ret);
}
if (ret == 0){
if (o.backend->rmdir(dset, o.backend_options)) {
s->dset_delete.suc++;
}else{
s->dset_delete.err++;
printf("unable to remove directory %s", dset);
}
if (o.verbosity >= 2){
printf("%d: delete dset %s\n", o.rank, dset);
}
}
}
@ -733,6 +725,7 @@ static option_help options [] = {
{0, "latency-all", "Keep the latency files from all ranks.", OPTION_FLAG, 'd', & o.latency_keep_all},
{'P', "precreate-per-set", "Number of object to precreate per data set.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.precreate},
{'D', "data-sets", "Number of data sets covered per process and iteration.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.dset_count},
{'o', NULL, "Output directory", OPTION_OPTIONAL_ARGUMENT, 's', & o.prefix},
{'q', "quiet", "Avoid irrelevant printing.", OPTION_FLAG, 'd', & o.quiet_output},
//{'m', "lim-free-mem", "Allocate memory until this limit (in MiB) is reached.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.limit_memory},
// {'M', "lim-free-mem-phase", "Allocate memory until this limit (in MiB) is reached between the phases, but free it before starting the next phase; the time is NOT included for the phase.", OPTION_OPTIONAL_ARGUMENT, 'd', & o.limit_memory_between_phases},
@ -799,6 +792,7 @@ int md_workbench(int argc, char ** argv){
int printhelp = 0;
char * limit_memory_P = NULL;
global_iteration = 0;
init_options();
MPI_Comm_rank(MPI_COMM_WORLD, & o.rank);
@ -816,10 +810,12 @@ int md_workbench(int argc, char ** argv){
options_all_t * global_options = airoi_create_all_module_options(options);
int parsed = option_parse(argc, argv, global_options);
o.backend = aiori_select(o.interface);
if (o.backend == NULL)
if (o.backend == NULL){
ERR("Unrecognized I/O API");
if (! o.backend->enable_mdtest)
}
if (! o.backend->enable_mdtest){
ERR("Backend doesn't support MDWorbench");
}
o.backend_options = airoi_update_module_options(o.backend, global_options);
if (!(o.phase_cleanup || o.phase_precreate || o.phase_benchmark)){
@ -832,7 +828,9 @@ int md_workbench(int argc, char ** argv){
exit(1);
}
o.backend->initialize(o.backend_options);
if (o.backend->initialize){
o.backend->initialize(o.backend_options);
}
if(o.backend->xfer_hints){
o.backend->xfer_hints(& o.hints);
}
@ -884,10 +882,8 @@ int md_workbench(int argc, char ** argv){
if (o.phase_precreate){
if (o.rank == 0){
ret = o.backend->prepare_global();
if ( ret != 0 ){
printf("Rank 0 could not prepare the run, aborting\n");
MPI_Abort(MPI_COMM_WORLD, 1);
if (o.backend->mkdir(o.prefix, DIRMODE, o.backend_options) != 0) {
EWARNF("Unable to create test directory %s", o.prefix);
}
}
init_stats(& phase_stats, o.precreate * o.dset_count);
@ -935,9 +931,8 @@ int md_workbench(int argc, char ** argv){
end_phase("cleanup", & phase_stats);
if (o.rank == 0){
ret = o.backend->purge_global();
if (ret != 0){
printf("Rank 0: Error purging the global environment\n");
if (! o.backend->rmdir(o.prefix, o.backend_options)) {
FAIL("unable to remove directory %s", o.prefix);
}
}
}else{

View File

@ -30,11 +30,9 @@ typedef struct{ // NOTE: if this type is changed, adjust end_phase() !!!
double t; // maximum time
double * t_all;
op_stat_t dset_name;
op_stat_t dset_create;
op_stat_t dset_delete;
op_stat_t obj_name;
op_stat_t obj_create;
op_stat_t obj_read;
op_stat_t obj_stat;