Merge pull request #243 from hpc/feature-aio

Feature aio
master
Julian Kunkel 2020-08-03 10:06:07 +01:00 committed by GitHub
commit 47b63a8054
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 362 additions and 50 deletions

View File

@ -200,6 +200,19 @@ AS_IF([test "x$with_pmdk" != xno], [
[AC_MSG_ERROR([Library containing pmdk symbols not found])])
])
# LINUX AIO support
AC_ARG_WITH([aio],
[AS_HELP_STRING([--with-aio],
[support Linux AIO @<:@default=no@:>@])],
[],
[with_aio=no])
AM_CONDITIONAL([USE_AIO_AIORI], [test x$with_aio = xyes])
AS_IF([test "x$with_aio" != xno], [
AC_DEFINE([USE_AIO_AIORI], [], [Build AIO backend])
AC_CHECK_HEADERS(libaio.h,, [unset AIO])
AC_SEARCH_LIBS([aio], [io_setup], [AC_MSG_ERROR([Library containing AIO symbol io_setup not found])])
])
# RADOS support
AC_ARG_WITH([rados],

View File

@ -5,7 +5,7 @@ if USE_CAPS
bin_PROGRAMS += IOR MDTEST
endif
noinst_HEADERS = ior.h utilities.h parse_options.h aiori.h iordef.h ior-internal.h option.h mdtest.h aiori-debug.h
noinst_HEADERS = ior.h utilities.h parse_options.h aiori.h iordef.h ior-internal.h option.h mdtest.h aiori-debug.h aiori-POSIX.h
lib_LIBRARIES = libaiori.a
libaiori_a_SOURCES = ior.c mdtest.c utilities.c parse_options.c ior-output.c option.c
@ -65,6 +65,11 @@ if USE_POSIX_AIORI
extraSOURCES += aiori-POSIX.c
endif
if USE_AIO_AIORI
extraSOURCES += aiori-aio.c
extraLDADD += -laio
endif
if USE_PMDK_AIORI
extraSOURCES += aiori-PMDK.c
extraLDADD += -lpmem

View File

@ -22,6 +22,7 @@
#include "ior.h"
#include "aiori.h"
#include "aiori-POSIX.h"
#include "iordef.h"
#include "utilities.h"
@ -86,7 +87,7 @@ static aiori_xfer_hint_t * hints = NULL;
static void MMAP_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
aiori_posix_xfer_hints(params);
POSIX_xfer_hints(params);
}
static int MMAP_check_params(aiori_mod_opt_t * options){

View File

@ -55,6 +55,8 @@
#include "iordef.h"
#include "utilities.h"
#include "aiori-POSIX.h"
#ifndef open64 /* necessary for TRU64 -- */
# define open64 open /* unlikely, but may pose */
#endif /* not open64 */ /* conflicting prototypes */
@ -70,32 +72,6 @@
/**************************** P R O T O T Y P E S *****************************/
static IOR_offset_t POSIX_Xfer(int, aiori_fd_t *, IOR_size_t *,
IOR_offset_t, IOR_offset_t, aiori_mod_opt_t *);
static void POSIX_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
static void POSIX_Sync(aiori_mod_opt_t * );
static int POSIX_check_params(aiori_mod_opt_t * options);
/************************** O P T I O N S *****************************/
typedef struct{
/* in case of a change, please update depending MMAP module too */
int direct_io;
/* Lustre variables */
int lustre_set_striping; /* flag that we need to set lustre striping */
int lustre_stripe_count;
int lustre_stripe_size;
int lustre_start_ost;
int lustre_ignore_locks;
/* gpfs variables */
int gpfs_hint_access; /* use gpfs "access range" hint */
int gpfs_release_token; /* immediately release GPFS tokens after
creating or opening a file */
/* beegfs variables */
int beegfs_numTargets; /* number storage targets to use */
int beegfs_chunkSize; /* srtipe pattern for new files */
} posix_options_t;
option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
posix_options_t * o = malloc(sizeof(posix_options_t));
@ -149,7 +125,7 @@ ior_aiori_t posix_aiori = {
.xfer = POSIX_Xfer,
.close = POSIX_Close,
.delete = POSIX_Delete,
.xfer_hints = aiori_posix_xfer_hints,
.xfer_hints = POSIX_xfer_hints,
.get_version = aiori_get_version,
.fsync = POSIX_Fsync,
.get_file_size = POSIX_GetFileSize,
@ -168,11 +144,11 @@ ior_aiori_t posix_aiori = {
static aiori_xfer_hint_t * hints = NULL;
void aiori_posix_xfer_hints(aiori_xfer_hint_t * params){
void POSIX_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
}
static int POSIX_check_params(aiori_mod_opt_t * param){
int POSIX_check_params(aiori_mod_opt_t * param){
posix_options_t * o = (posix_options_t*) param;
if (o->beegfs_chunkSize != -1 && (!ISPOWEROFTWO(o->beegfs_chunkSize) || o->beegfs_chunkSize < (1<<16)))
ERR("beegfsChunkSize must be a power of two and >64k");
@ -630,17 +606,14 @@ static IOR_offset_t POSIX_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer
return (length);
}
/*
* Perform fsync().
*/
static void POSIX_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param)
void POSIX_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param)
{
if (fsync(*(int *)fd) != 0)
EWARNF("fsync(%d) failed", *(int *)fd);
}
static void POSIX_Sync(aiori_mod_opt_t * param)
void POSIX_Sync(aiori_mod_opt_t * param)
{
int ret = system("sync");
if (ret != 0){

42
src/aiori-POSIX.h Normal file
View File

@ -0,0 +1,42 @@
#ifndef AIORI_POSIX_H
#define AIORI_POSIX_H
#include "aiori.h"
/************************** O P T I O N S *****************************/
typedef struct{
/* in case of a change, please update depending MMAP module too */
int direct_io;
/* Lustre variables */
int lustre_set_striping; /* flag that we need to set lustre striping */
int lustre_stripe_count;
int lustre_stripe_size;
int lustre_start_ost;
int lustre_ignore_locks;
/* gpfs variables */
int gpfs_hint_access; /* use gpfs "access range" hint */
int gpfs_release_token; /* immediately release GPFS tokens after
creating or opening a file */
/* beegfs variables */
int beegfs_numTargets; /* number storage targets to use */
int beegfs_chunkSize; /* srtipe pattern for new files */
} posix_options_t;
void POSIX_Sync(aiori_mod_opt_t * param);
int POSIX_check_params(aiori_mod_opt_t * param);
void POSIX_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
int POSIX_check_params(aiori_mod_opt_t * options);
aiori_fd_t *POSIX_Create(char *testFileName, int flags, aiori_mod_opt_t * module_options);
int POSIX_Mknod(char *testFileName);
aiori_fd_t *POSIX_Open(char *testFileName, int flags, aiori_mod_opt_t * module_options);
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName);
void POSIX_Delete(char *testFileName, aiori_mod_opt_t * module_options);
void POSIX_Close(aiori_fd_t *fd, aiori_mod_opt_t * module_options);
option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
void POSIX_xfer_hints(aiori_xfer_hint_t * params);
#endif

258
src/aiori-aio.c Normal file
View File

@ -0,0 +1,258 @@
/*
This backend uses linux-aio
Requires: libaio-dev
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <libaio.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/stat.h>
#include <assert.h>
#include <unistd.h>
#include "ior.h"
#include "aiori.h"
#include "iordef.h"
#include "utilities.h"
#include "aiori-POSIX.h"
/************************** O P T I O N S *****************************/
typedef struct{
aiori_mod_opt_t * p; // posix options
int max_pending;
int granularity; // how frequent to submit, submit ever granularity elements
// runtime data
io_context_t ioctx; // one context per fs
struct iocb ** iocbs;
int iocbs_pos; // how many are pending in iocbs
int in_flight; // total pending ops
IOR_offset_t pending_bytes; // track pending IO volume for error checking
} aio_options_t;
option_help * aio_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
aio_options_t * o = malloc(sizeof(aio_options_t));
if (init_values != NULL){
memcpy(o, init_values, sizeof(aio_options_t));
}else{
memset(o, 0, sizeof(aio_options_t));
o->max_pending = 128;
o->granularity = 16;
}
option_help * p_help = POSIX_options((aiori_mod_opt_t**)& o->p, init_values == NULL ? NULL : (aiori_mod_opt_t*) ((aio_options_t*)init_values)->p);
*init_backend_options = (aiori_mod_opt_t*) o;
option_help h [] = {
{0, "aio.max-pending", "Max number of pending ops", OPTION_OPTIONAL_ARGUMENT, 'd', & o->max_pending},
{0, "aio.granularity", "How frequent to submit pending IOs, submit every *granularity* elements", OPTION_OPTIONAL_ARGUMENT, 'd', & o->granularity},
LAST_OPTION
};
option_help * help = option_merge(h, p_help);
free(p_help);
return help;
}
/************************** D E C L A R A T I O N S ***************************/
typedef struct{
aiori_fd_t * pfd; // the underlying POSIX fd
} aio_fd_t;
/***************************** F U N C T I O N S ******************************/
static aiori_xfer_hint_t * hints = NULL;
static void aio_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
POSIX_xfer_hints(params);
}
static void aio_initialize(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
if(io_setup(o->max_pending, & o->ioctx) != 0){
ERRF("Couldn't initialize io context %s", strerror(errno));
}
printf("%d\n", (o->max_pending));
o->iocbs = malloc(sizeof(struct iocb *) * o->granularity);
o->iocbs_pos = 0;
o->in_flight = 0;
}
static void aio_finalize(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
io_destroy(o->ioctx);
}
static int aio_check_params(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
POSIX_check_params((aiori_mod_opt_t*) o->p);
if(o->max_pending < 8){
ERRF("AIO max-pending = %d < 8", o->max_pending);
}
if(o->granularity > o->max_pending){
ERRF("AIO granularity must be < max-pending, is %d > %d", o->granularity, o->max_pending);
}
return 0;
}
static aiori_fd_t *aio_Open(char *testFileName, int flags, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * fd = malloc(sizeof(aio_fd_t));
fd->pfd = POSIX_Open(testFileName, flags, o->p);
return (aiori_fd_t*) fd;
}
static aiori_fd_t *aio_create(char *testFileName, int flags, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * fd = malloc(sizeof(aio_fd_t));
fd->pfd = POSIX_Create(testFileName, flags, o->p);
return (aiori_fd_t*) fd;
}
/* called whenever the granularity is met */
static void submit_pending(aio_options_t * o){
if(o->iocbs_pos == 0){
return;
}
int res;
res = io_submit(o->ioctx, o->iocbs_pos, o->iocbs);
//printf("AIO submit %d jobs\n", o->iocbs_pos);
if(res != o->iocbs_pos){
if(errno == EAGAIN){
ERR("AIO: errno == EAGAIN; this should't happen");
}
ERRF("AIO: submitted %d, error: \"%s\" ; this should't happen", res, strerror(errno));
}
o->iocbs_pos = 0;
}
/* complete all pending ops */
static void complete_all(aio_options_t * o){
submit_pending(o);
struct io_event events[o->in_flight];
int num_events;
num_events = io_getevents(o->ioctx, o->in_flight, o->in_flight, events, NULL);
for (int i = 0; i < num_events; i++) {
struct io_event event = events[i];
if(event.res == -1){
ERR("AIO, error in io_getevents(), IO incomplete!");
}else{
o->pending_bytes -= event.res;
}
free(event.obj);
}
if(o->pending_bytes != 0){
ERRF("AIO, error in flushing data, pending bytes: %lld", o->pending_bytes);
}
o->in_flight = 0;
}
/* called if we must make *some* progress */
static void process_some(aio_options_t * o){
if(o->in_flight == 0){
return;
}
struct io_event events[o->in_flight];
int num_events;
int mn = o->in_flight < o->granularity ? o->in_flight : o->granularity;
num_events = io_getevents(o->ioctx, mn, o->in_flight, events, NULL);
//printf("Completed: %d\n", num_events);
for (int i = 0; i < num_events; i++) {
struct io_event event = events[i];
if(event.res == -1){
ERR("AIO, error in io_getevents(), IO incomplete!");
}else{
o->pending_bytes -= event.res;
}
free(event.obj);
}
o->in_flight -= num_events;
}
static IOR_offset_t aio_Xfer(int access, aiori_fd_t *fd, IOR_size_t * buffer,
IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * afd = (aio_fd_t*) fd;
if(o->in_flight >= o->max_pending){
process_some(o);
}
o->pending_bytes += length;
struct iocb * iocb = malloc(sizeof(struct iocb));
if(access == WRITE){
io_prep_pwrite(iocb, *(int*)afd->pfd, buffer, length, offset);
}else{
io_prep_pread(iocb, *(int*)afd->pfd, buffer, length, offset);
}
o->iocbs[o->iocbs_pos] = iocb;
o->iocbs_pos++;
o->in_flight++;
if(o->iocbs_pos == o->granularity){
submit_pending(o);
}
return length;
}
static void aio_Close(aiori_fd_t *fd, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
aio_fd_t * afd = (aio_fd_t*) fd;
complete_all(o);
POSIX_Close(afd->pfd, o->p);
}
static void aio_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
complete_all(o);
aio_fd_t * afd = (aio_fd_t*) fd;
POSIX_Fsync(afd->pfd, o->p);
}
static void aio_Sync(aiori_mod_opt_t * param){
aio_options_t * o = (aio_options_t*) param;
complete_all(o);
POSIX_Sync((aiori_mod_opt_t*) o->p);
}
ior_aiori_t aio_aiori = {
.name = "AIO",
.name_legacy = NULL,
.create = aio_create,
.get_options = aio_options,
.initialize = aio_initialize,
.finalize = aio_finalize,
.xfer_hints = aio_xfer_hints,
.get_options = aio_options,
.fsync = aio_Fsync,
.open = aio_Open,
.xfer = aio_Xfer,
.close = aio_Close,
.sync = aio_Sync,
.check_params = aio_check_params,
.delete = POSIX_Delete,
.get_version = aiori_get_version,
.get_file_size = POSIX_GetFileSize,
.statfs = aiori_posix_statfs,
.mkdir = aiori_posix_mkdir,
.rmdir = aiori_posix_rmdir,
.access = aiori_posix_access,
.stat = aiori_posix_stat,
.enable_mdtest = true
};

View File

@ -42,6 +42,9 @@ ior_aiori_t *available_aiori[] = {
#ifdef USE_POSIX_AIORI
&posix_aiori,
#endif
#ifdef USE_AIO_AIORI
&aio_aiori,
#endif
#ifdef USE_PMDK_AIORI
&pmdk_aiori,
#endif

View File

@ -120,6 +120,7 @@ enum bench_type {
};
extern ior_aiori_t dummy_aiori;
extern ior_aiori_t aio_aiori;
extern ior_aiori_t daos_aiori;
extern ior_aiori_t dfs_aiori;
extern ior_aiori_t hdf5_aiori;
@ -154,15 +155,6 @@ int aiori_posix_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * module_o
int aiori_posix_rmdir (const char *path, aiori_mod_opt_t * module_options);
int aiori_posix_access (const char *path, int mode, aiori_mod_opt_t * module_options);
int aiori_posix_stat (const char *path, struct stat *buf, aiori_mod_opt_t * module_options);
void aiori_posix_xfer_hints(aiori_xfer_hint_t * params);
aiori_fd_t *POSIX_Create(char *testFileName, int flags, aiori_mod_opt_t * module_options);
int POSIX_Mknod(char *testFileName);
aiori_fd_t *POSIX_Open(char *testFileName, int flags, aiori_mod_opt_t * module_options);
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName);
void POSIX_Delete(char *testFileName, aiori_mod_opt_t * module_options);
void POSIX_Close(aiori_fd_t *fd, aiori_mod_opt_t * module_options);
option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
/* NOTE: these 3 MPI-IO functions are exported for reuse by HDF5/PNetCDF */

View File

@ -850,6 +850,9 @@ void directory_test(const int iteration, const int ntasks, const char *path, ran
/* create phase */
if(create_only) {
progress->stone_wall_timer_seconds = stone_wall_timer_seconds;
progress->items_done = 0;
progress->start_time = GetTimeStamp();
for (int dir_iter = 0; dir_iter < directory_loops; dir_iter ++){
prep_testdir(iteration, dir_iter);
if (unique_dir_per_task) {
@ -873,6 +876,7 @@ void directory_test(const int iteration, const int ntasks, const char *path, ran
create_remove_items(0, 1, 1, 0, temp_path, 0, progress);
}
}
progress->stone_wall_timer_seconds = 0;
}
phase_end();
@ -1048,6 +1052,10 @@ void file_test(const int iteration, const int ntasks, const char *path, rank_pro
/* create phase */
if (create_only ) {
progress->stone_wall_timer_seconds = stone_wall_timer_seconds;
progress->items_done = 0;
progress->start_time = GetTimeStamp();
for (int dir_iter = 0; dir_iter < directory_loops; dir_iter ++){
prep_testdir(iteration, dir_iter);
@ -1061,8 +1069,6 @@ void file_test(const int iteration, const int ntasks, const char *path, rank_pro
sprintf( temp_path, "%s/%s", testdir, path );
}
VERBOSE(3,-1,"file_test: create path is '%s'", temp_path );
/* "touch" the files */
@ -1663,8 +1669,7 @@ void create_remove_directory_tree(int create,
static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t * summary_table){
rank_progress_t progress_o;
memset(& progress_o, 0 , sizeof(progress_o));
progress_o.start_time = GetTimeStamp();
progress_o.stone_wall_timer_seconds = stone_wall_timer_seconds;
progress_o.stone_wall_timer_seconds = 0;
progress_o.items_per_dir = items_per_dir;
rank_progress_t * progress = & progress_o;
@ -1748,6 +1753,7 @@ static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t
summary_table->stonewall_last_item[8] = num_dirs_in_tree;
VERBOSE(1,-1,"V-1: main: Tree creation : %14.3f sec, %14.3f ops/sec", (endCreate - startCreate), summary_table->rate[8]);
}
sprintf(unique_mk_dir, "%s.0", base_tree_name);
sprintf(unique_chdir_dir, "%s.0", base_tree_name);
sprintf(unique_stat_dir, "%s.0", base_tree_name);
@ -1790,6 +1796,7 @@ static void mdtest_iteration(int i, int j, MPI_Group testgroup, mdtest_results_t
DelaySecs(pre_delay);
}
VERBOSE(3,5,"will file_test on %s", unique_mk_dir);
file_test(j, i, unique_mk_dir, progress);
}
}
@ -1980,7 +1987,7 @@ mdtest_results_t * mdtest_run(int argc, char **argv, MPI_Comm world_com, FILE *
{'v', NULL, "verbosity (each instance of option increments by one)", OPTION_FLAG, 'd', & verbose},
{'V', NULL, "verbosity value", OPTION_OPTIONAL_ARGUMENT, 'd', & verbose},
{'w', NULL, "bytes to write to each file after it is created", OPTION_OPTIONAL_ARGUMENT, 'l', & write_bytes},
{'W', NULL, "number in seconds; stonewall timer, write as many seconds and ensure all processes did the same number of operations (currently only stops during create phase)", OPTION_OPTIONAL_ARGUMENT, 'd', & stone_wall_timer_seconds},
{'W', NULL, "number in seconds; stonewall timer, write as many seconds and ensure all processes did the same number of operations (currently only stops during create phase and files)", OPTION_OPTIONAL_ARGUMENT, 'd', & stone_wall_timer_seconds},
{'x', NULL, "StoneWallingStatusFile; contains the number of iterations of the creation phase, can be used to split phases across runs", OPTION_OPTIONAL_ARGUMENT, 's', & stoneWallingStatusFile},
{'X', "verify-read", "Verify the data read", OPTION_FLAG, 'd', & verify_read},
{0, "verify-write", "Verify the data after a write by reading it back immediately", OPTION_FLAG, 'd', & verify_write},

View File

@ -7,6 +7,23 @@
#include <option.h>
/* merge two option lists and return the total size */
option_help * option_merge(option_help * a, option_help * b){
int count_a = 0;
for(option_help * i = a; i->type != 0; i++){
count_a++;
}
int count = count_a + 1; // LAST_OPTION is one
for(option_help * i = b; i->type != 0; i++){
count++;
}
option_help * h = malloc(sizeof(option_help) * count);
memcpy(h, a, sizeof(option_help) * count_a);
memcpy(h + count_a, b, sizeof(option_help) * (count - count_a));
return h;
}
/*
* Takes a string of the form 64, 8m, 128k, 4g, etc. and converts to bytes.
*/

View File

@ -43,6 +43,7 @@ void option_print_current(option_help * args);
//@return the number of parsed arguments
int option_parse(int argc, char ** argv, options_all_t * args);
int option_parse_str(char*val, options_all_t * opt_all);
option_help * option_merge(option_help * a, option_help * b);
/* Parse a single line */
int option_parse_key_value(char * key, char * value, options_all_t * opt_all);