From f275671cc94e35f61b15407e763b5fb035f36971 Mon Sep 17 00:00:00 2001 From: "Julian M. Kunkel" Date: Tue, 21 Jul 2020 16:16:13 +0100 Subject: [PATCH] AIORI POSIX AIO support. Collect ops until granularity is reached, then submit pending IOs. Synchronize latest on close. Doesn't work with data verification and reuses the existing buffer. The implementation shows the potential AIO may have. Extract also the POSIX header from AIORI to allow better reuse. #240 --- configure.ac | 13 +++ src/Makefile.am | 5 + src/aiori-MMAP.c | 3 +- src/aiori-POSIX.c | 41 ++------ src/aiori-POSIX.h | 42 ++++++++ src/aiori-aio.c | 255 ++++++++++++++++++++++++++++++++++++++++++++++ src/aiori.c | 3 + src/aiori.h | 10 +- src/mdtest.c | 17 +++- 9 files changed, 340 insertions(+), 49 deletions(-) create mode 100644 src/aiori-POSIX.h create mode 100644 src/aiori-aio.c diff --git a/configure.ac b/configure.ac index a7d5085..dc05ee7 100755 --- a/configure.ac +++ b/configure.ac @@ -200,6 +200,19 @@ AS_IF([test "x$with_pmdk" != xno], [ [AC_MSG_ERROR([Library containing pmdk symbols not found])]) ]) +# LINUX AIO support +AC_ARG_WITH([aio], + [AS_HELP_STRING([--with-aio], + [support Linux AIO @<:@default=no@:>@])], + [], + [with_aio=no]) +AM_CONDITIONAL([USE_AIO_AIORI], [test x$with_aio = xyes]) +AS_IF([test "x$with_aio" != xno], [ + AC_DEFINE([USE_AIO_AIORI], [], [Build AIO backend]) + AC_CHECK_HEADERS(libaio.h,, [unset AIO]) + AC_SEARCH_LIBS([aio], [io_setup], [AC_MSG_ERROR([Library containing AIO symbol io_setup not found])]) +]) + # RADOS support AC_ARG_WITH([rados], diff --git a/src/Makefile.am b/src/Makefile.am index 7f1be40..03148d2 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -65,6 +65,11 @@ if USE_POSIX_AIORI extraSOURCES += aiori-POSIX.c endif +if USE_AIO_AIORI +extraSOURCES += aiori-aio.c +extraLDADD += -laio +endif + if USE_PMDK_AIORI extraSOURCES += aiori-PMDK.c extraLDADD += -lpmem diff --git a/src/aiori-MMAP.c b/src/aiori-MMAP.c index 2c0db42..5fa13f8 100644 --- a/src/aiori-MMAP.c +++ b/src/aiori-MMAP.c @@ -22,6 +22,7 @@ #include "ior.h" #include "aiori.h" +#include "aiori-POSIX.h" #include "iordef.h" #include "utilities.h" @@ -86,7 +87,7 @@ static aiori_xfer_hint_t * hints = NULL; static void MMAP_xfer_hints(aiori_xfer_hint_t * params){ hints = params; - aiori_posix_xfer_hints(params); + POSIX_xfer_hints(params); } static int MMAP_check_params(aiori_mod_opt_t * options){ diff --git a/src/aiori-POSIX.c b/src/aiori-POSIX.c index ec95625..c46c99b 100755 --- a/src/aiori-POSIX.c +++ b/src/aiori-POSIX.c @@ -55,6 +55,8 @@ #include "iordef.h" #include "utilities.h" +#include "aiori-POSIX.h" + #ifndef open64 /* necessary for TRU64 -- */ # define open64 open /* unlikely, but may pose */ #endif /* not open64 */ /* conflicting prototypes */ @@ -70,32 +72,6 @@ /**************************** P R O T O T Y P E S *****************************/ static IOR_offset_t POSIX_Xfer(int, aiori_fd_t *, IOR_size_t *, IOR_offset_t, IOR_offset_t, aiori_mod_opt_t *); -static void POSIX_Fsync(aiori_fd_t *, aiori_mod_opt_t *); -static void POSIX_Sync(aiori_mod_opt_t * ); -static int POSIX_check_params(aiori_mod_opt_t * options); - -/************************** O P T I O N S *****************************/ -typedef struct{ - /* in case of a change, please update depending MMAP module too */ - int direct_io; - - /* Lustre variables */ - int lustre_set_striping; /* flag that we need to set lustre striping */ - int lustre_stripe_count; - int lustre_stripe_size; - int lustre_start_ost; - int lustre_ignore_locks; - - /* gpfs variables */ - int gpfs_hint_access; /* use gpfs "access range" hint */ - int gpfs_release_token; /* immediately release GPFS tokens after - creating or opening a file */ - /* beegfs variables */ - int beegfs_numTargets; /* number storage targets to use */ - int beegfs_chunkSize; /* srtipe pattern for new files */ - -} posix_options_t; - option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){ posix_options_t * o = malloc(sizeof(posix_options_t)); @@ -149,7 +125,7 @@ ior_aiori_t posix_aiori = { .xfer = POSIX_Xfer, .close = POSIX_Close, .delete = POSIX_Delete, - .xfer_hints = aiori_posix_xfer_hints, + .xfer_hints = POSIX_xfer_hints, .get_version = aiori_get_version, .fsync = POSIX_Fsync, .get_file_size = POSIX_GetFileSize, @@ -168,11 +144,11 @@ ior_aiori_t posix_aiori = { static aiori_xfer_hint_t * hints = NULL; -void aiori_posix_xfer_hints(aiori_xfer_hint_t * params){ +void POSIX_xfer_hints(aiori_xfer_hint_t * params){ hints = params; } -static int POSIX_check_params(aiori_mod_opt_t * param){ +int POSIX_check_params(aiori_mod_opt_t * param){ posix_options_t * o = (posix_options_t*) param; if (o->beegfs_chunkSize != -1 && (!ISPOWEROFTWO(o->beegfs_chunkSize) || o->beegfs_chunkSize < (1<<16))) ERR("beegfsChunkSize must be a power of two and >64k"); @@ -630,17 +606,14 @@ static IOR_offset_t POSIX_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer return (length); } -/* - * Perform fsync(). - */ -static void POSIX_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param) +void POSIX_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param) { if (fsync(*(int *)fd) != 0) EWARNF("fsync(%d) failed", *(int *)fd); } -static void POSIX_Sync(aiori_mod_opt_t * param) +void POSIX_Sync(aiori_mod_opt_t * param) { int ret = system("sync"); if (ret != 0){ diff --git a/src/aiori-POSIX.h b/src/aiori-POSIX.h new file mode 100644 index 0000000..1780cf7 --- /dev/null +++ b/src/aiori-POSIX.h @@ -0,0 +1,42 @@ +#ifndef AIORI_POSIX_H +#define AIORI_POSIX_H + +#include "aiori.h" + +/************************** O P T I O N S *****************************/ +typedef struct{ + /* in case of a change, please update depending MMAP module too */ + int direct_io; + + /* Lustre variables */ + int lustre_set_striping; /* flag that we need to set lustre striping */ + int lustre_stripe_count; + int lustre_stripe_size; + int lustre_start_ost; + int lustre_ignore_locks; + + /* gpfs variables */ + int gpfs_hint_access; /* use gpfs "access range" hint */ + int gpfs_release_token; /* immediately release GPFS tokens after + creating or opening a file */ + /* beegfs variables */ + int beegfs_numTargets; /* number storage targets to use */ + int beegfs_chunkSize; /* srtipe pattern for new files */ + +} posix_options_t; + +void POSIX_Sync(aiori_mod_opt_t * param); +int POSIX_check_params(aiori_mod_opt_t * param); +void POSIX_Fsync(aiori_fd_t *, aiori_mod_opt_t *); +int POSIX_check_params(aiori_mod_opt_t * options); +aiori_fd_t *POSIX_Create(char *testFileName, int flags, aiori_mod_opt_t * module_options); +int POSIX_Mknod(char *testFileName); +aiori_fd_t *POSIX_Open(char *testFileName, int flags, aiori_mod_opt_t * module_options); +IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName); +void POSIX_Delete(char *testFileName, aiori_mod_opt_t * module_options); +void POSIX_Close(aiori_fd_t *fd, aiori_mod_opt_t * module_options); +option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values); +void POSIX_xfer_hints(aiori_xfer_hint_t * params); + + +#endif diff --git a/src/aiori-aio.c b/src/aiori-aio.c new file mode 100644 index 0000000..3e21f64 --- /dev/null +++ b/src/aiori-aio.c @@ -0,0 +1,255 @@ +/* + This backend uses linux-aio + Requires: libaio-dev + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "ior.h" +#include "aiori.h" +#include "iordef.h" +#include "utilities.h" + +#include "aiori-POSIX.h" + +/************************** O P T I O N S *****************************/ +typedef struct{ + aiori_mod_opt_t * p; // posix options + int max_pending; + int granularity; // how frequent to submit, submit ever granularity elements + + // runtime data + io_context_t ioctx; // one context per fs + struct iocb ** iocbs; + int iocbs_pos; // how many are pending in iocbs + + int in_flight; // total pending ops + IOR_offset_t pending_bytes; // track pending IO volume for error checking +} aio_options_t; + +option_help * aio_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){ + aio_options_t * o = malloc(sizeof(aio_options_t)); + + if (init_values != NULL){ + memcpy(o, init_values, sizeof(aio_options_t)); + }else{ + memset(o, 0, sizeof(aio_options_t)); + o->max_pending = 128; + o->granularity = 16; + } + option_help * p_help = POSIX_options((aiori_mod_opt_t**)& o->p, init_values == NULL ? NULL : (aiori_mod_opt_t*) ((aio_options_t*)init_values)->p); + *init_backend_options = (aiori_mod_opt_t*) o; + + option_help h [] = { + {0, "aio.max-pending", "Max number of pending ops", OPTION_OPTIONAL_ARGUMENT, 'd', & o->max_pending}, + {0, "aio.granularity", "How frequent to submit pending IOs, submit every *granularity* elements", OPTION_OPTIONAL_ARGUMENT, 'd', & o->granularity}, + LAST_OPTION + }; + option_help * help = option_merge(h, p_help); + free(p_help); + return help; +} + + +/************************** D E C L A R A T I O N S ***************************/ + +typedef struct{ + aiori_fd_t * pfd; // the underlying POSIX fd +} aio_fd_t; + +/***************************** F U N C T I O N S ******************************/ + +static aiori_xfer_hint_t * hints = NULL; + +static void aio_xfer_hints(aiori_xfer_hint_t * params){ + hints = params; + POSIX_xfer_hints(params); +} + +static void aio_initialize(aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + if(io_setup(o->max_pending, & o->ioctx) != 0){ + ERRF("Couldn't initialize io context %s", strerror(errno)); + } + printf("%d\n", (o->max_pending)); + + o->iocbs = malloc(sizeof(struct iocb *) * o->granularity); + o->iocbs_pos = 0; + o->in_flight = 0; +} + +static void aio_finalize(aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + io_destroy(o->ioctx); +} + +static int aio_check_params(aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + POSIX_check_params((aiori_mod_opt_t*) o->p); + if(o->max_pending < 8){ + ERRF("max-pending = %d < 8", o->max_pending); + } + return 0; +} + +static aiori_fd_t *aio_Open(char *testFileName, int flags, aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + aio_fd_t * fd = malloc(sizeof(aio_fd_t)); + fd->pfd = POSIX_Open(testFileName, flags, o->p); + return (aiori_fd_t*) fd; +} + +static aiori_fd_t *aio_create(char *testFileName, int flags, aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + aio_fd_t * fd = malloc(sizeof(aio_fd_t)); + fd->pfd = POSIX_Create(testFileName, flags, o->p); + return (aiori_fd_t*) fd; +} + +/* called whenever the granularity is met */ +static void submit_pending(aio_options_t * o){ + if(o->iocbs_pos == 0){ + return; + } + int res; + res = io_submit(o->ioctx, o->iocbs_pos, o->iocbs); + //printf("AIO submit %d jobs\n", o->iocbs_pos); + if(res != o->iocbs_pos){ + if(errno == EAGAIN){ + ERR("AIO: errno == EAGAIN; this should't happen"); + } + ERRF("AIO: submitted %d, error: \"%s\" ; this should't happen", res, strerror(errno)); + } + o->iocbs_pos = 0; +} + +/* complete all pending ops */ +static void complete_all(aio_options_t * o){ + submit_pending(o); + + struct io_event events[o->in_flight]; + int num_events; + num_events = io_getevents(o->ioctx, o->in_flight, o->in_flight, events, NULL); + for (int i = 0; i < num_events; i++) { + struct io_event event = events[i]; + if(event.res == -1){ + ERR("AIO, error in io_getevents(), IO incomplete!"); + }else{ + o->pending_bytes -= event.res; + } + free(event.obj); + } + if(o->pending_bytes != 0){ + ERRF("AIO, error in flushing data, pending bytes: %lld", o->pending_bytes); + } + o->in_flight = 0; +} + +/* called if we must make *some* progress */ +static void process_some(aio_options_t * o){ + if(o->in_flight == 0){ + return; + } + struct io_event events[o->in_flight]; + int num_events; + int mn = o->in_flight < o->granularity ? o->in_flight : o->granularity; + num_events = io_getevents(o->ioctx, mn, o->in_flight, events, NULL); + //printf("Completed: %d\n", num_events); + for (int i = 0; i < num_events; i++) { + struct io_event event = events[i]; + if(event.res == -1){ + ERR("AIO, error in io_getevents(), IO incomplete!"); + }else{ + o->pending_bytes -= event.res; + } + free(event.obj); + } + o->in_flight -= num_events; +} + +static IOR_offset_t aio_Xfer(int access, aiori_fd_t *fd, IOR_size_t * buffer, + IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + aio_fd_t * afd = (aio_fd_t*) fd; + + if(o->in_flight >= o->max_pending){ + process_some(o); + } + o->pending_bytes += length; + + struct iocb * iocb = malloc(sizeof(struct iocb)); + if(access == WRITE){ + io_prep_pwrite(iocb, *(int*)afd->pfd, buffer, length, offset); + }else{ + io_prep_pread(iocb, *(int*)afd->pfd, buffer, length, offset); + } + o->iocbs[o->iocbs_pos] = iocb; + o->iocbs_pos++; + o->in_flight++; + + if(o->iocbs_pos == o->granularity){ + submit_pending(o); + } + return length; +} + +static void aio_Close(aiori_fd_t *fd, aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + aio_fd_t * afd = (aio_fd_t*) fd; + complete_all(o); + POSIX_Close(afd->pfd, o->p); +} + +static void aio_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + complete_all(o); + aio_fd_t * afd = (aio_fd_t*) fd; + POSIX_Fsync(afd->pfd, o->p); +} + +static void aio_Sync(aiori_mod_opt_t * param){ + aio_options_t * o = (aio_options_t*) param; + complete_all(o); + POSIX_Sync((aiori_mod_opt_t*) o->p); +} + + + +ior_aiori_t aio_aiori = { + .name = "AIO", + .name_legacy = NULL, + .create = aio_create, + .get_options = aio_options, + .initialize = aio_initialize, + .finalize = aio_finalize, + .xfer_hints = aio_xfer_hints, + .get_options = aio_options, + .fsync = aio_Fsync, + .open = aio_Open, + .xfer = aio_Xfer, + .close = aio_Close, + .sync = aio_Sync, + .check_params = aio_check_params, + .delete = POSIX_Delete, + .get_version = aiori_get_version, + .get_file_size = POSIX_GetFileSize, + .statfs = aiori_posix_statfs, + .mkdir = aiori_posix_mkdir, + .rmdir = aiori_posix_rmdir, + .access = aiori_posix_access, + .stat = aiori_posix_stat, + .enable_mdtest = true +}; diff --git a/src/aiori.c b/src/aiori.c index 05e4935..2d8b6c8 100644 --- a/src/aiori.c +++ b/src/aiori.c @@ -42,6 +42,9 @@ ior_aiori_t *available_aiori[] = { #ifdef USE_POSIX_AIORI &posix_aiori, #endif +#ifdef USE_AIO_AIORI + &aio_aiori, +#endif #ifdef USE_PMDK_AIORI &pmdk_aiori, #endif diff --git a/src/aiori.h b/src/aiori.h index 6b185d7..a1adc6d 100755 --- a/src/aiori.h +++ b/src/aiori.h @@ -120,6 +120,7 @@ enum bench_type { }; extern ior_aiori_t dummy_aiori; +extern ior_aiori_t aio_aiori; extern ior_aiori_t daos_aiori; extern ior_aiori_t dfs_aiori; extern ior_aiori_t hdf5_aiori; @@ -154,15 +155,6 @@ int aiori_posix_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * module_o int aiori_posix_rmdir (const char *path, aiori_mod_opt_t * module_options); int aiori_posix_access (const char *path, int mode, aiori_mod_opt_t * module_options); int aiori_posix_stat (const char *path, struct stat *buf, aiori_mod_opt_t * module_options); -void aiori_posix_xfer_hints(aiori_xfer_hint_t * params); - -aiori_fd_t *POSIX_Create(char *testFileName, int flags, aiori_mod_opt_t * module_options); -int POSIX_Mknod(char *testFileName); -aiori_fd_t *POSIX_Open(char *testFileName, int flags, aiori_mod_opt_t * module_options); -IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName); -void POSIX_Delete(char *testFileName, aiori_mod_opt_t * module_options); -void POSIX_Close(aiori_fd_t *fd, aiori_mod_opt_t * module_options); -option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values); /* NOTE: these 3 MPI-IO functions are exported for reuse by HDF5/PNetCDF */ diff --git a/src/mdtest.c b/src/mdtest.c index 083e2d4..c713796 100644 --- a/src/mdtest.c +++ b/src/mdtest.c @@ -850,6 +850,9 @@ void directory_test(const int iteration, const int ntasks, const char *path, ran /* create phase */ if(create_only) { + progress->stone_wall_timer_seconds = stone_wall_timer_seconds; + progress->items_done = 0; + progress->start_time = GetTimeStamp(); for (int dir_iter = 0; dir_iter < directory_loops; dir_iter ++){ prep_testdir(iteration, dir_iter); if (unique_dir_per_task) { @@ -873,6 +876,7 @@ void directory_test(const int iteration, const int ntasks, const char *path, ran create_remove_items(0, 1, 1, 0, temp_path, 0, progress); } } + progress->stone_wall_timer_seconds = 0; } phase_end(); @@ -1048,6 +1052,10 @@ void file_test(const int iteration, const int ntasks, const char *path, rank_pro /* create phase */ if (create_only ) { + progress->stone_wall_timer_seconds = stone_wall_timer_seconds; + progress->items_done = 0; + progress->start_time = GetTimeStamp(); + for (int dir_iter = 0; dir_iter < directory_loops; dir_iter ++){ prep_testdir(iteration, dir_iter); @@ -1061,8 +1069,6 @@ void file_test(const int iteration, const int ntasks, const char *path, rank_pro sprintf( temp_path, "%s/%s", testdir, path ); } - - VERBOSE(3,-1,"file_test: create path is '%s'", temp_path ); /* "touch" the files */ @@ -1663,8 +1669,7 @@ void create_remove_directory_tree(int create, static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t * summary_table){ rank_progress_t progress_o; memset(& progress_o, 0 , sizeof(progress_o)); - progress_o.start_time = GetTimeStamp(); - progress_o.stone_wall_timer_seconds = stone_wall_timer_seconds; + progress_o.stone_wall_timer_seconds = 0; progress_o.items_per_dir = items_per_dir; rank_progress_t * progress = & progress_o; @@ -1748,6 +1753,7 @@ static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t summary_table->stonewall_last_item[8] = num_dirs_in_tree; VERBOSE(1,-1,"V-1: main: Tree creation : %14.3f sec, %14.3f ops/sec", (endCreate - startCreate), summary_table->rate[8]); } + sprintf(unique_mk_dir, "%s.0", base_tree_name); sprintf(unique_chdir_dir, "%s.0", base_tree_name); sprintf(unique_stat_dir, "%s.0", base_tree_name); @@ -1790,6 +1796,7 @@ static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t DelaySecs(pre_delay); } VERBOSE(3,5,"will file_test on %s", unique_mk_dir); + file_test(j, i, unique_mk_dir, progress); } } @@ -1980,7 +1987,7 @@ mdtest_results_t * mdtest_run(int argc, char **argv, MPI_Comm world_com, FILE * {'v', NULL, "verbosity (each instance of option increments by one)", OPTION_FLAG, 'd', & verbose}, {'V', NULL, "verbosity value", OPTION_OPTIONAL_ARGUMENT, 'd', & verbose}, {'w', NULL, "bytes to write to each file after it is created", OPTION_OPTIONAL_ARGUMENT, 'l', & write_bytes}, - {'W', NULL, "number in seconds; stonewall timer, write as many seconds and ensure all processes did the same number of operations (currently only stops during create phase)", OPTION_OPTIONAL_ARGUMENT, 'd', & stone_wall_timer_seconds}, + {'W', NULL, "number in seconds; stonewall timer, write as many seconds and ensure all processes did the same number of operations (currently only stops during create phase and files)", OPTION_OPTIONAL_ARGUMENT, 'd', & stone_wall_timer_seconds}, {'x', NULL, "StoneWallingStatusFile; contains the number of iterations of the creation phase, can be used to split phases across runs", OPTION_OPTIONAL_ARGUMENT, 's', & stoneWallingStatusFile}, {'X', "verify-read", "Verify the data read", OPTION_FLAG, 'd', & verify_read}, {0, "verify-write", "Verify the data after a write by reading it back immediately", OPTION_FLAG, 'd', & verify_write},