AIORI POSIX AIO support. Collect ops until granularity is reached, then submit pending IOs. Synchronize latest on close. Doesn't work with data verification and reuses the existing buffer. The implementation shows the potential AIO may have.

Extract also the POSIX header from AIORI to allow better reuse. #240
master
Julian M. Kunkel 2020-07-21 16:16:13 +01:00
parent 5e465ac8bf
commit f275671cc9
9 changed files with 340 additions and 49 deletions

View File

@ -200,6 +200,19 @@ AS_IF([test "x$with_pmdk" != xno], [
[AC_MSG_ERROR([Library containing pmdk symbols not found])])
])
# LINUX AIO support
AC_ARG_WITH([aio],
[AS_HELP_STRING([--with-aio],
[support Linux AIO @<:@default=no@:>@])],
[],
[with_aio=no])
AM_CONDITIONAL([USE_AIO_AIORI], [test x$with_aio = xyes])
AS_IF([test "x$with_aio" != xno], [
AC_DEFINE([USE_AIO_AIORI], [], [Build AIO backend])
AC_CHECK_HEADERS(libaio.h,, [unset AIO])
AC_SEARCH_LIBS([aio], [io_setup], [AC_MSG_ERROR([Library containing AIO symbol io_setup not found])])
])
# RADOS support
AC_ARG_WITH([rados],

View File

@ -65,6 +65,11 @@ if USE_POSIX_AIORI
extraSOURCES += aiori-POSIX.c
endif
if USE_AIO_AIORI
extraSOURCES += aiori-aio.c
extraLDADD += -laio
endif
if USE_PMDK_AIORI
extraSOURCES += aiori-PMDK.c
extraLDADD += -lpmem

View File

@ -22,6 +22,7 @@
#include "ior.h"
#include "aiori.h"
#include "aiori-POSIX.h"
#include "iordef.h"
#include "utilities.h"
@ -86,7 +87,7 @@ static aiori_xfer_hint_t * hints = NULL;
static void MMAP_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
aiori_posix_xfer_hints(params);
POSIX_xfer_hints(params);
}
static int MMAP_check_params(aiori_mod_opt_t * options){

View File

@ -55,6 +55,8 @@
#include "iordef.h"
#include "utilities.h"
#include "aiori-POSIX.h"
#ifndef open64 /* necessary for TRU64 -- */
# define open64 open /* unlikely, but may pose */
#endif /* not open64 */ /* conflicting prototypes */
@ -70,32 +72,6 @@
/**************************** P R O T O T Y P E S *****************************/
static IOR_offset_t POSIX_Xfer(int, aiori_fd_t *, IOR_size_t *,
IOR_offset_t, IOR_offset_t, aiori_mod_opt_t *);
static void POSIX_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
static void POSIX_Sync(aiori_mod_opt_t * );
static int POSIX_check_params(aiori_mod_opt_t * options);
/************************** O P T I O N S *****************************/
typedef struct{
/* in case of a change, please update depending MMAP module too */
int direct_io;
/* Lustre variables */
int lustre_set_striping; /* flag that we need to set lustre striping */
int lustre_stripe_count;
int lustre_stripe_size;
int lustre_start_ost;
int lustre_ignore_locks;
/* gpfs variables */
int gpfs_hint_access; /* use gpfs "access range" hint */
int gpfs_release_token; /* immediately release GPFS tokens after
creating or opening a file */
/* beegfs variables */
int beegfs_numTargets; /* number storage targets to use */
int beegfs_chunkSize; /* srtipe pattern for new files */
} posix_options_t;
option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
posix_options_t * o = malloc(sizeof(posix_options_t));
@ -149,7 +125,7 @@ ior_aiori_t posix_aiori = {
.xfer = POSIX_Xfer,
.close = POSIX_Close,
.delete = POSIX_Delete,
.xfer_hints = aiori_posix_xfer_hints,
.xfer_hints = POSIX_xfer_hints,
.get_version = aiori_get_version,
.fsync = POSIX_Fsync,
.get_file_size = POSIX_GetFileSize,
@ -168,11 +144,11 @@ ior_aiori_t posix_aiori = {
static aiori_xfer_hint_t * hints = NULL;
void aiori_posix_xfer_hints(aiori_xfer_hint_t * params){
void POSIX_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
}
static int POSIX_check_params(aiori_mod_opt_t * param){
int POSIX_check_params(aiori_mod_opt_t * param){
posix_options_t * o = (posix_options_t*) param;
if (o->beegfs_chunkSize != -1 && (!ISPOWEROFTWO(o->beegfs_chunkSize) || o->beegfs_chunkSize < (1<<16)))
ERR("beegfsChunkSize must be a power of two and >64k");
@ -630,17 +606,14 @@ static IOR_offset_t POSIX_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer
return (length);
}
/*
* Perform fsync().
*/
static void POSIX_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param)
void POSIX_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param)
{
if (fsync(*(int *)fd) != 0)
EWARNF("fsync(%d) failed", *(int *)fd);
}
static void POSIX_Sync(aiori_mod_opt_t * param)
void POSIX_Sync(aiori_mod_opt_t * param)
{
int ret = system("sync");
if (ret != 0){

42
src/aiori-POSIX.h Normal file
View File

@ -0,0 +1,42 @@
#ifndef AIORI_POSIX_H
#define AIORI_POSIX_H
#include "aiori.h"
/************************** O P T I O N S *****************************/
typedef struct{
/* in case of a change, please update depending MMAP module too */
int direct_io;
/* Lustre variables */
int lustre_set_striping; /* flag that we need to set lustre striping */
int lustre_stripe_count;
int lustre_stripe_size;
int lustre_start_ost;
int lustre_ignore_locks;
/* gpfs variables */
int gpfs_hint_access; /* use gpfs "access range" hint */
int gpfs_release_token; /* immediately release GPFS tokens after
creating or opening a file */
/* beegfs variables */
int beegfs_numTargets; /* number storage targets to use */
int beegfs_chunkSize; /* srtipe pattern for new files */
} posix_options_t;
void POSIX_Sync(aiori_mod_opt_t * param);
int POSIX_check_params(aiori_mod_opt_t * param);
void POSIX_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
int POSIX_check_params(aiori_mod_opt_t * options);
aiori_fd_t *POSIX_Create(char *testFileName, int flags, aiori_mod_opt_t * module_options);
int POSIX_Mknod(char *testFileName);
aiori_fd_t *POSIX_Open(char *testFileName, int flags, aiori_mod_opt_t * module_options);
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName);
void POSIX_Delete(char *testFileName, aiori_mod_opt_t * module_options);
void POSIX_Close(aiori_fd_t *fd, aiori_mod_opt_t * module_options);
option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
void POSIX_xfer_hints(aiori_xfer_hint_t * params);
#endif

255
src/aiori-aio.c Normal file
View File

@ -0,0 +1,255 @@
/*
This backend uses linux-aio
Requires: libaio-dev
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <libaio.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/stat.h>
#include <assert.h>
#include <unistd.h>
#include "ior.h"
#include "aiori.h"
#include "iordef.h"
#include "utilities.h"
#include "aiori-POSIX.h"
/************************** O P T I O N S *****************************/
typedef struct{
aiori_mod_opt_t * p; // posix options
int max_pending;
int granularity; // how frequent to submit, submit ever granularity elements
// runtime data
io_context_t ioctx; // one context per fs
struct iocb ** iocbs;
int iocbs_pos; // how many are pending in iocbs
int in_flight; // total pending ops
IOR_offset_t pending_bytes; // track pending IO volume for error checking
} aio_options_t;
option_help * aio_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
aio_options_t * o = malloc(sizeof(aio_options_t));
if (init_values != NULL){
memcpy(o, init_values, sizeof(aio_options_t));
}else{
memset(o, 0, sizeof(aio_options_t));
o->max_pending = 128;
o->granularity = 16;
}
option_help * p_help = POSIX_options((aiori_mod_opt_t**)& o->p, init_values == NULL ? NULL : (aiori_mod_opt_t*) ((aio_options_t*)init_values)->p);
*init_backend_options = (aiori_mod_opt_t*) o;
option_help h [] = {
{0, "aio.max-pending", "Max number of pending ops", OPTION_OPTIONAL_ARGUMENT, 'd', & o->max_pending},
{0, "aio.granularity", "How frequent to submit pending IOs, submit every *granularity* elements", OPTION_OPTIONAL_ARGUMENT, 'd', & o->granularity},
LAST_OPTION
};
option_help * help = option_merge(h, p_help);
free(p_help);
return help;
}
/************************** D E C L A R A T I O N S ***************************/
typedef struct{
aiori_fd_t * pfd; // the underlying POSIX fd
} aio_fd_t;
/***************************** F U N C T I O N S ******************************/
static aiori_xfer_hint_t * hints = NULL;
static void aio_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
POSIX_xfer_hints(params);
}
static void aio_initialize(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
if(io_setup(o->max_pending, & o->ioctx) != 0){
ERRF("Couldn't initialize io context %s", strerror(errno));
}
printf("%d\n", (o->max_pending));
o->iocbs = malloc(sizeof(struct iocb *) * o->granularity);
o->iocbs_pos = 0;
o->in_flight = 0;
}
static void aio_finalize(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
io_destroy(o->ioctx);
}
static int aio_check_params(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
POSIX_check_params((aiori_mod_opt_t*) o->p);
if(o->max_pending < 8){
ERRF("max-pending = %d < 8", o->max_pending);
}
return 0;
}
static aiori_fd_t *aio_Open(char *testFileName, int flags, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * fd = malloc(sizeof(aio_fd_t));
fd->pfd = POSIX_Open(testFileName, flags, o->p);
return (aiori_fd_t*) fd;
}
static aiori_fd_t *aio_create(char *testFileName, int flags, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * fd = malloc(sizeof(aio_fd_t));
fd->pfd = POSIX_Create(testFileName, flags, o->p);
return (aiori_fd_t*) fd;
}
/* called whenever the granularity is met */
static void submit_pending(aio_options_t * o){
if(o->iocbs_pos == 0){
return;
}
int res;
res = io_submit(o->ioctx, o->iocbs_pos, o->iocbs);
//printf("AIO submit %d jobs\n", o->iocbs_pos);
if(res != o->iocbs_pos){
if(errno == EAGAIN){
ERR("AIO: errno == EAGAIN; this should't happen");
}
ERRF("AIO: submitted %d, error: \"%s\" ; this should't happen", res, strerror(errno));
}
o->iocbs_pos = 0;
}
/* complete all pending ops */
static void complete_all(aio_options_t * o){
submit_pending(o);
struct io_event events[o->in_flight];
int num_events;
num_events = io_getevents(o->ioctx, o->in_flight, o->in_flight, events, NULL);
for (int i = 0; i < num_events; i++) {
struct io_event event = events[i];
if(event.res == -1){
ERR("AIO, error in io_getevents(), IO incomplete!");
}else{
o->pending_bytes -= event.res;
}
free(event.obj);
}
if(o->pending_bytes != 0){
ERRF("AIO, error in flushing data, pending bytes: %lld", o->pending_bytes);
}
o->in_flight = 0;
}
/* called if we must make *some* progress */
static void process_some(aio_options_t * o){
if(o->in_flight == 0){
return;
}
struct io_event events[o->in_flight];
int num_events;
int mn = o->in_flight < o->granularity ? o->in_flight : o->granularity;
num_events = io_getevents(o->ioctx, mn, o->in_flight, events, NULL);
//printf("Completed: %d\n", num_events);
for (int i = 0; i < num_events; i++) {
struct io_event event = events[i];
if(event.res == -1){
ERR("AIO, error in io_getevents(), IO incomplete!");
}else{
o->pending_bytes -= event.res;
}
free(event.obj);
}
o->in_flight -= num_events;
}
static IOR_offset_t aio_Xfer(int access, aiori_fd_t *fd, IOR_size_t * buffer,
IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * afd = (aio_fd_t*) fd;
if(o->in_flight >= o->max_pending){
process_some(o);
}
o->pending_bytes += length;
struct iocb * iocb = malloc(sizeof(struct iocb));
if(access == WRITE){
io_prep_pwrite(iocb, *(int*)afd->pfd, buffer, length, offset);
}else{
io_prep_pread(iocb, *(int*)afd->pfd, buffer, length, offset);
}
o->iocbs[o->iocbs_pos] = iocb;
o->iocbs_pos++;
o->in_flight++;
if(o->iocbs_pos == o->granularity){
submit_pending(o);
}
return length;
}
static void aio_Close(aiori_fd_t *fd, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * afd = (aio_fd_t*) fd;
complete_all(o);
POSIX_Close(afd->pfd, o->p);
}
static void aio_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
complete_all(o);
aio_fd_t * afd = (aio_fd_t*) fd;
POSIX_Fsync(afd->pfd, o->p);
}
static void aio_Sync(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
complete_all(o);
POSIX_Sync((aiori_mod_opt_t*) o->p);
}
ior_aiori_t aio_aiori = {
.name = "AIO",
.name_legacy = NULL,
.create = aio_create,
.get_options = aio_options,
.initialize = aio_initialize,
.finalize = aio_finalize,
.xfer_hints = aio_xfer_hints,
.get_options = aio_options,
.fsync = aio_Fsync,
.open = aio_Open,
.xfer = aio_Xfer,
.close = aio_Close,
.sync = aio_Sync,
.check_params = aio_check_params,
.delete = POSIX_Delete,
.get_version = aiori_get_version,
.get_file_size = POSIX_GetFileSize,
.statfs = aiori_posix_statfs,
.mkdir = aiori_posix_mkdir,
.rmdir = aiori_posix_rmdir,
.access = aiori_posix_access,
.stat = aiori_posix_stat,
.enable_mdtest = true
};

View File

@ -42,6 +42,9 @@ ior_aiori_t *available_aiori[] = {
#ifdef USE_POSIX_AIORI
&posix_aiori,
#endif
#ifdef USE_AIO_AIORI
&aio_aiori,
#endif
#ifdef USE_PMDK_AIORI
&pmdk_aiori,
#endif

View File

@ -120,6 +120,7 @@ enum bench_type {
};
extern ior_aiori_t dummy_aiori;
extern ior_aiori_t aio_aiori;
extern ior_aiori_t daos_aiori;
extern ior_aiori_t dfs_aiori;
extern ior_aiori_t hdf5_aiori;
@ -154,15 +155,6 @@ int aiori_posix_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * module_o
int aiori_posix_rmdir (const char *path, aiori_mod_opt_t * module_options);
int aiori_posix_access (const char *path, int mode, aiori_mod_opt_t * module_options);
int aiori_posix_stat (const char *path, struct stat *buf, aiori_mod_opt_t * module_options);
void aiori_posix_xfer_hints(aiori_xfer_hint_t * params);
aiori_fd_t *POSIX_Create(char *testFileName, int flags, aiori_mod_opt_t * module_options);
int POSIX_Mknod(char *testFileName);
aiori_fd_t *POSIX_Open(char *testFileName, int flags, aiori_mod_opt_t * module_options);
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName);
void POSIX_Delete(char *testFileName, aiori_mod_opt_t * module_options);
void POSIX_Close(aiori_fd_t *fd, aiori_mod_opt_t * module_options);
option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
/* NOTE: these 3 MPI-IO functions are exported for reuse by HDF5/PNetCDF */

View File

@ -850,6 +850,9 @@ void directory_test(const int iteration, const int ntasks, const char *path, ran
/* create phase */
if(create_only) {
progress->stone_wall_timer_seconds = stone_wall_timer_seconds;
progress->items_done = 0;
progress->start_time = GetTimeStamp();
for (int dir_iter = 0; dir_iter < directory_loops; dir_iter ++){
prep_testdir(iteration, dir_iter);
if (unique_dir_per_task) {
@ -873,6 +876,7 @@ void directory_test(const int iteration, const int ntasks, const char *path, ran
create_remove_items(0, 1, 1, 0, temp_path, 0, progress);
}
}
progress->stone_wall_timer_seconds = 0;
}
phase_end();
@ -1048,6 +1052,10 @@ void file_test(const int iteration, const int ntasks, const char *path, rank_pro
/* create phase */
if (create_only ) {
progress->stone_wall_timer_seconds = stone_wall_timer_seconds;
progress->items_done = 0;
progress->start_time = GetTimeStamp();
for (int dir_iter = 0; dir_iter < directory_loops; dir_iter ++){
prep_testdir(iteration, dir_iter);
@ -1061,8 +1069,6 @@ void file_test(const int iteration, const int ntasks, const char *path, rank_pro
sprintf( temp_path, "%s/%s", testdir, path );
}
VERBOSE(3,-1,"file_test: create path is '%s'", temp_path );
/* "touch" the files */
@ -1663,8 +1669,7 @@ void create_remove_directory_tree(int create,
static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t * summary_table){
rank_progress_t progress_o;
memset(& progress_o, 0 , sizeof(progress_o));
progress_o.start_time = GetTimeStamp();
progress_o.stone_wall_timer_seconds = stone_wall_timer_seconds;
progress_o.stone_wall_timer_seconds = 0;
progress_o.items_per_dir = items_per_dir;
rank_progress_t * progress = & progress_o;
@ -1748,6 +1753,7 @@ static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t
summary_table->stonewall_last_item[8] = num_dirs_in_tree;
VERBOSE(1,-1,"V-1: main: Tree creation : %14.3f sec, %14.3f ops/sec", (endCreate - startCreate), summary_table->rate[8]);
}
sprintf(unique_mk_dir, "%s.0", base_tree_name);
sprintf(unique_chdir_dir, "%s.0", base_tree_name);
sprintf(unique_stat_dir, "%s.0", base_tree_name);
@ -1790,6 +1796,7 @@ static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t
DelaySecs(pre_delay);
}
VERBOSE(3,5,"will file_test on %s", unique_mk_dir);
file_test(j, i, unique_mk_dir, progress);
}
}
@ -1980,7 +1987,7 @@ mdtest_results_t * mdtest_run(int argc, char **argv, MPI_Comm world_com, FILE *
{'v', NULL, "verbosity (each instance of option increments by one)", OPTION_FLAG, 'd', & verbose},
{'V', NULL, "verbosity value", OPTION_OPTIONAL_ARGUMENT, 'd', & verbose},
{'w', NULL, "bytes to write to each file after it is created", OPTION_OPTIONAL_ARGUMENT, 'l', & write_bytes},
{'W', NULL, "number in seconds; stonewall timer, write as many seconds and ensure all processes did the same number of operations (currently only stops during create phase)", OPTION_OPTIONAL_ARGUMENT, 'd', & stone_wall_timer_seconds},
{'W', NULL, "number in seconds; stonewall timer, write as many seconds and ensure all processes did the same number of operations (currently only stops during create phase and files)", OPTION_OPTIONAL_ARGUMENT, 'd', & stone_wall_timer_seconds},
{'x', NULL, "StoneWallingStatusFile; contains the number of iterations of the creation phase, can be used to split phases across runs", OPTION_OPTIONAL_ARGUMENT, 's', & stoneWallingStatusFile},
{'X', "verify-read", "Verify the data read", OPTION_FLAG, 'd', & verify_read},
{0, "verify-write", "Verify the data after a write by reading it back immediately", OPTION_FLAG, 'd', & verify_write},