Merge branch 'feature-cleanup-aiori'

master
Julian M. Kunkel 2020-07-21 09:26:03 +01:00
commit 8ca388fc78
20 changed files with 893 additions and 312 deletions

View File

@ -308,19 +308,54 @@ AM_COND_IF([AWS4C_DIR],[
])
# Amazon S3 support [see also: --with-aws4c]
AC_ARG_WITH([S3],
[AS_HELP_STRING([--with-S3],
[support IO with Amazon S3 backend @<:@default=no@:>@])],
# Amazon S3 support using the libs3 API
AC_ARG_WITH([S3-libs3],
[AS_HELP_STRING([--with-S3-libs3],
[support IO with Amazon libS3 @<:@default=no@:>@])],
[],
[with_S3=no])
AM_CONDITIONAL([USE_S3_AIORI], [test x$with_S3 = xyes])
AM_COND_IF([USE_S3_AIORI],[
AC_DEFINE([USE_S3_AIORI], [], [Build Amazon-S3 backend AIORI])
[with_S3_libs3=no])
AM_CONDITIONAL([USE_S3_LIBS3_AIORI], [test x$with_S3_libs3 = xyes])
AM_COND_IF([USE_S3_LIBS3_AIORI],[
AC_DEFINE([USE_S3_LIBS3_AIORI], [], [Build Amazon-S3 backend AIORI using libs3])
])
err=0
AS_IF([test "x$with_S3" != xno], [
AS_IF([test "x$with_S3_libs3" != xno], [
AC_MSG_NOTICE([beginning of S3-related checks])
ORIG_CPPFLAGS=$CPPFLAGS
ORIG_LDFLAGS=$LDFLAGS
AC_CHECK_HEADERS([libs3.h], [], [err=1])
# Autotools thinks searching for a library means I want it added to LIBS
ORIG_LIBS=$LIBS
AC_CHECK_LIB([s3], [S3_initialize], [], [err=1])
LIBS=$ORIG_LIBS
AC_MSG_NOTICE([end of S3-related checks])
if test "$err" == 1; then
AC_MSG_FAILURE([S3 support is missing. dnl Make sure you have access to libs3. dnl])
fi
# restore user's values
CPPFLAGS=$ORIG_CPPFLAGS
LDFLAGS=$ORIG_LDFLAGS
])
# Amazon S3 support [see also: --with-aws4c]
AC_ARG_WITH([S3-4c],
[AS_HELP_STRING([--with-S3-4c],
[support IO with Amazon S3 backend @<:@default=no@:>@])],
[],
[with_S3_4c=no])
AM_CONDITIONAL([USE_S3_4C_AIORI], [test x$with_S3_4c = xyes])
AM_COND_IF([USE_S3_4C_AIORI],[
AC_DEFINE([USE_S3_4C_AIORI], [], [Build Amazon-S3 backend AIORI using lib4c])
])
err=0
AS_IF([test "x$with_S3_4c" != xno], [
AC_MSG_NOTICE([beginning of S3-related checks])
# save user's values, while we use AC_CHECK_HEADERS with $AWS4C_DIR

View File

@ -90,8 +90,8 @@ extraSOURCES += aiori-Gfarm.c
extraLDADD += -lgfarm
endif
if USE_S3_AIORI
extraSOURCES += aiori-S3.c
if USE_S3_4C_AIORI
extraSOURCES += aiori-S3-4c.c
if AWS4C_DIR
extraCPPFLAGS += $(AWS4C_CPPFLAGS)
extraLDFLAGS += $(AWS4C_LDFLAGS)
@ -102,6 +102,11 @@ extraLDADD += -laws4c
extraLDADD += -laws4c_extra
endif
if USE_S3_LIBS3_AIORI
extraSOURCES += aiori-S3-libs3.c
extraLDADD += -ls3
endif
if WITH_LUSTRE
extraLDADD += -llustreapi
endif
@ -128,3 +133,9 @@ MDTEST_CPPFLAGS = $(mdtest_CPPFLAGS)
libaiori_a_SOURCES += $(extraSOURCES)
libaiori_a_CPPFLAGS = $(extraCPPFLAGS)
# Generate config file with build flags to allow reuse of library
all-local: build.conf
build.conf:
@echo LDFLAGS=$(LDFLAGS) $(extraLDFLAGS) $(extraLDADD) > build.conf
@echo CFLAGS=$(CFLAGS) $(extraCPPFLAGS) >> build.conf

View File

@ -114,7 +114,7 @@ static void DFS_Delete(char *, aiori_mod_opt_t *);
static char* DFS_GetVersion();
static void DFS_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
static void DFS_Sync(aiori_mod_opt_t *);
static IOR_offset_t DFS_GetFileSize(aiori_mod_opt_t *, MPI_Comm, char *);
static IOR_offset_t DFS_GetFileSize(aiori_mod_opt_t *, char *);
static int DFS_Statfs (const char *, ior_aiori_statfs_t *, aiori_mod_opt_t *);
static int DFS_Stat (const char *, struct stat *, aiori_mod_opt_t *);
static int DFS_Mkdir (const char *, mode_t, aiori_mod_opt_t *);
@ -774,7 +774,7 @@ static char* DFS_GetVersion()
* Use DFS stat() to return aggregate file size.
*/
static IOR_offset_t
DFS_GetFileSize(aiori_mod_opt_t * test, MPI_Comm comm, char *testFileName)
DFS_GetFileSize(aiori_mod_opt_t * test, char *testFileName)
{
dfs_obj_t *obj;
daos_size_t fsize, tmpMin, tmpMax, tmpSum;
@ -792,27 +792,6 @@ DFS_GetFileSize(aiori_mod_opt_t * test, MPI_Comm comm, char *testFileName)
dfs_release(obj);
if (hints->filePerProc == TRUE) {
MPI_CHECK(MPI_Allreduce(&fsize, &tmpSum, 1,
MPI_LONG_LONG_INT, MPI_SUM, comm),
"cannot total data moved");
fsize = tmpSum;
} else {
MPI_CHECK(MPI_Allreduce(&fsize, &tmpMin, 1,
MPI_LONG_LONG_INT, MPI_MIN, comm),
"cannot total data moved");
MPI_CHECK(MPI_Allreduce(&fsize, &tmpMax, 1,
MPI_LONG_LONG_INT, MPI_MAX, comm),
"cannot total data moved");
if (tmpMin != tmpMax) {
if (rank == 0) {
WARN("inconsistent file size by different tasks");
}
/* incorrect, but now consistent across tasks */
fsize = tmpMin;
}
}
return (fsize);
}

View File

@ -108,7 +108,7 @@ static char * DUMMY_getVersion()
return "0.5";
}
static IOR_offset_t DUMMY_GetFileSize(aiori_mod_opt_t * options, MPI_Comm testComm, char *testFileName)
static IOR_offset_t DUMMY_GetFileSize(aiori_mod_opt_t * options, char *testFileName)
{
if(verbose > 4){
fprintf(out_logfile, "DUMMY getFileSize: %s\n", testFileName);

View File

@ -91,7 +91,7 @@ static void HDF5_Close(aiori_fd_t *, aiori_mod_opt_t *);
static void HDF5_Delete(char *, aiori_mod_opt_t *);
static char* HDF5_GetVersion();
static void HDF5_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
static IOR_offset_t HDF5_GetFileSize(aiori_mod_opt_t *, MPI_Comm, char *);
static IOR_offset_t HDF5_GetFileSize(aiori_mod_opt_t *, char *);
static int HDF5_Access(const char *, int, aiori_mod_opt_t *);
static void HDF5_init_xfer_options(aiori_xfer_hint_t * params);
static int HDF5_check_params(aiori_mod_opt_t * options);
@ -660,11 +660,11 @@ static void SetupDataSet(void *fd, int flags, aiori_mod_opt_t * param)
* Use MPIIO call to get file size.
*/
static IOR_offset_t
HDF5_GetFileSize(aiori_mod_opt_t * test, MPI_Comm testComm, char *testFileName)
HDF5_GetFileSize(aiori_mod_opt_t * test, char *testFileName)
{
if(hints->dryRun)
return 0;
return(MPIIO_GetFileSize(test, testComm, testFileName));
return(MPIIO_GetFileSize(test, testFileName));
}
/*

View File

@ -21,8 +21,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <errno.h> /* sys_errlist */
#include <fcntl.h> /* IO operations */
#include <errno.h> /* sys_errlist */
#include <fcntl.h> /* IO operations */
#include "ior.h"
#include "iordef.h"
@ -30,63 +30,70 @@
#include "utilities.h"
#include "ime_native.h"
#ifndef O_BINARY /* Required on Windows */
#define IME_UNUSED(x) (void)(x) /* Silence compiler warnings */
#ifndef O_BINARY /* Required on Windows */
# define O_BINARY 0
#endif
/**************************** P R O T O T Y P E S *****************************/
static void *IME_Create(char *, IOR_param_t *);
static void *IME_Open(char *, IOR_param_t *);
static void IME_Close(void *, IOR_param_t *);
static void IME_Delete(char *, IOR_param_t *);
static char *IME_GetVersion();
static void IME_Fsync(void *, IOR_param_t *);
static int IME_Access(const char *, int, IOR_param_t *);
static IOR_offset_t IME_GetFileSize(IOR_param_t *, MPI_Comm, char *);
static IOR_offset_t IME_Xfer(int, void *, IOR_size_t *,
IOR_offset_t, IOR_param_t *);
static int IME_StatFS(const char *, ior_aiori_statfs_t *,
IOR_param_t *);
static int IME_RmDir(const char *, IOR_param_t *);
static int IME_MkDir(const char *, mode_t, IOR_param_t *);
static int IME_Stat(const char *, struct stat *, IOR_param_t *);
aiori_fd_t *IME_Create(char *, int, aiori_mod_opt_t *);
aiori_fd_t *IME_Open(char *, int, aiori_mod_opt_t *);
void IME_Close(aiori_fd_t *, aiori_mod_opt_t *);
void IME_Delete(char *, aiori_mod_opt_t *);
char *IME_GetVersion();
void IME_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
int IME_Access(const char *, int, aiori_mod_opt_t *);
IOR_offset_t IME_GetFileSize(aiori_mod_opt_t *, char *);
IOR_offset_t IME_Xfer(int, aiori_fd_t *, IOR_size_t *, IOR_offset_t,
IOR_offset_t, aiori_mod_opt_t *);
int IME_Statfs(const char *, ior_aiori_statfs_t *,
aiori_mod_opt_t *);
int IME_Rmdir(const char *, aiori_mod_opt_t *);
int IME_Mkdir(const char *, mode_t, aiori_mod_opt_t *);
int IME_Stat(const char *, struct stat *, aiori_mod_opt_t *);
void IME_Xferhints(aiori_xfer_hint_t *params);
#if (IME_NATIVE_API_VERSION >= 132)
static int IME_Mknod(char *);
static void IME_Sync(IOR_param_t *);
int IME_Mknod(char *);
void IME_Sync(aiori_mod_opt_t *param);
#endif
static void IME_Initialize();
static void IME_Finalize();
void IME_Initialize();
void IME_Finalize();
/************************** O P T I O N S *****************************/
/****************************** O P T I O N S *********************************/
typedef struct{
int direct_io;
int direct_io;
} ime_options_t;
option_help *IME_Options(aiori_mod_opt_t **init_backend_options,
aiori_mod_opt_t *init_values)
{
ime_options_t *o = malloc(sizeof(ime_options_t));
option_help * IME_options(void ** init_backend_options, void * init_values){
ime_options_t * o = malloc(sizeof(ime_options_t));
if (init_values != NULL)
memcpy(o, init_values, sizeof(ime_options_t));
else
o->direct_io = 0;
if (init_values != NULL){
memcpy(o, init_values, sizeof(ime_options_t));
}else{
o->direct_io = 0;
}
*init_backend_options = (aiori_mod_opt_t*)o;
*init_backend_options = o;
option_help h[] = {
{0, "ime.odirect", "Direct I/O Mode", OPTION_FLAG, 'd', & o->direct_io},
LAST_OPTION
};
option_help *help = malloc(sizeof(h));
memcpy(help, h, sizeof(h));
option_help h [] = {
{0, "ime.odirect", "Direct I/O Mode", OPTION_FLAG, 'd', & o->direct_io},
LAST_OPTION
};
option_help * help = malloc(sizeof(h));
memcpy(help, h, sizeof(h));
return help;
return help;
}
/************************** D E C L A R A T I O N S ***************************/
extern int rank;
@ -100,19 +107,20 @@ ior_aiori_t ime_aiori = {
.create = IME_Create,
.open = IME_Open,
.xfer = IME_Xfer,
.xfer_hints = IME_Xferhints,
.close = IME_Close,
.delete = IME_Delete,
.get_version = IME_GetVersion,
.fsync = IME_Fsync,
.get_file_size = IME_GetFileSize,
.access = IME_Access,
.statfs = IME_StatFS,
.rmdir = IME_RmDir,
.mkdir = IME_MkDir,
.statfs = IME_Statfs,
.rmdir = IME_Rmdir,
.mkdir = IME_Mkdir,
.stat = IME_Stat,
.initialize = IME_Initialize,
.finalize = IME_Finalize,
.get_options = IME_options,
.get_options = IME_Options,
#if (IME_NATIVE_API_VERSION >= 132)
.sync = IME_Sync,
.mknod = IME_Mknod,
@ -120,30 +128,48 @@ ior_aiori_t ime_aiori = {
.enable_mdtest = true,
};
static aiori_xfer_hint_t *hints = NULL;
static bool ime_initialized = false;
/***************************** F U N C T I O N S ******************************/
void IME_Xferhints(aiori_xfer_hint_t *params)
{
hints = params;
}
/*
* Initialize IME (before MPI is started).
*/
static void IME_Initialize()
void IME_Initialize()
{
if (ime_initialized)
return;
ime_native_init();
ime_initialized = true;
}
/*
* Finlize IME (after MPI is shutdown).
*/
static void IME_Finalize()
void IME_Finalize()
{
if (!ime_initialized)
return;
(void)ime_native_finalize();
ime_initialized = true;
}
/*
* Try to access a file through the IME interface.
*/
static int IME_Access(const char *path, int mode, IOR_param_t *param)
int IME_Access(const char *path, int mode, aiori_mod_opt_t *module_options)
{
(void)param;
IME_UNUSED(module_options);
return ime_native_access(path, mode);
}
@ -151,41 +177,43 @@ static int IME_Access(const char *path, int mode, IOR_param_t *param)
/*
* Create and open a file through the IME interface.
*/
static void *IME_Create(char *testFileName, IOR_param_t *param)
aiori_fd_t *IME_Create(char *testFileName, int flags, aiori_mod_opt_t *param)
{
return IME_Open(testFileName, param);
return IME_Open(testFileName, flags, param);
}
/*
* Open a file through the IME interface.
*/
static void *IME_Open(char *testFileName, IOR_param_t *param)
aiori_fd_t *IME_Open(char *testFileName, int flags, aiori_mod_opt_t *param)
{
int fd_oflag = O_BINARY;
int *fd;
if (hints->dryRun)
return NULL;
fd = (int *)malloc(sizeof(int));
if (fd == NULL)
ERR("Unable to malloc file descriptor");
ime_options_t * o = (ime_options_t*) param->backend_options;
if (o->direct_io == TRUE){
set_o_direct_flag(&fd_oflag);
}
ime_options_t *o = (ime_options_t*) param;
if (o->direct_io == TRUE)
set_o_direct_flag(&fd_oflag);
if (param->openFlags & IOR_RDONLY)
if (flags & IOR_RDONLY)
fd_oflag |= O_RDONLY;
if (param->openFlags & IOR_WRONLY)
if (flags & IOR_WRONLY)
fd_oflag |= O_WRONLY;
if (param->openFlags & IOR_RDWR)
if (flags & IOR_RDWR)
fd_oflag |= O_RDWR;
if (param->openFlags & IOR_APPEND)
if (flags & IOR_APPEND)
fd_oflag |= O_APPEND;
if (param->openFlags & IOR_CREAT)
if (flags & IOR_CREAT)
fd_oflag |= O_CREAT;
if (param->openFlags & IOR_EXCL)
if (flags & IOR_EXCL)
fd_oflag |= O_EXCL;
if (param->openFlags & IOR_TRUNC)
if (flags & IOR_TRUNC)
fd_oflag |= O_TRUNC;
*fd = ime_native_open(testFileName, fd_oflag, 0664);
@ -194,14 +222,14 @@ static void *IME_Open(char *testFileName, IOR_param_t *param)
ERR("cannot open file");
}
return((void *)fd);
return (aiori_fd_t*) fd;
}
/*
* Write or read access to file using the IM interface.
*/
static IOR_offset_t IME_Xfer(int access, void *file, IOR_size_t *buffer,
IOR_offset_t length, IOR_param_t *param)
IOR_offset_t IME_Xfer(int access, aiori_fd_t *file, IOR_size_t *buffer,
IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t *param)
{
int xferRetries = 0;
long long remaining = (long long)length;
@ -209,25 +237,28 @@ static IOR_offset_t IME_Xfer(int access, void *file, IOR_size_t *buffer,
int fd = *(int *)file;
long long rc;
if (hints->dryRun)
return length;
while (remaining > 0) {
/* write/read file */
if (access == WRITE) { /* WRITE */
if (verbose >= VERBOSE_4) {
fprintf(stdout, "task %d writing to offset %lld\n",
rank, param->offset + length - remaining);
rank, offset + length - remaining);
}
rc = ime_native_pwrite(fd, ptr, remaining, param->offset);
rc = ime_native_pwrite(fd, ptr, remaining, offset);
if (param->fsyncPerWrite)
IME_Fsync(&fd, param);
if (hints->fsyncPerWrite)
IME_Fsync(file, param);
} else { /* READ or CHECK */
if (verbose >= VERBOSE_4) {
fprintf(stdout, "task %d reading from offset %lld\n",
rank, param->offset + length - remaining);
rank, offset + length - remaining);
}
rc = ime_native_pread(fd, ptr, remaining, param->offset);
rc = ime_native_pread(fd, ptr, remaining, offset);
if (rc == 0)
ERR("hit EOF prematurely");
else if (rc < 0)
@ -238,9 +269,9 @@ static IOR_offset_t IME_Xfer(int access, void *file, IOR_size_t *buffer,
fprintf(stdout, "WARNING: Task %d, partial %s, %lld of "
"%lld bytes at offset %lld\n",
rank, access == WRITE ? "write" : "read", rc,
remaining, param->offset + length - remaining );
remaining, offset + length - remaining );
if (param->singleXferAttempt) {
if (hints->singleXferAttempt) {
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1),
"barrier error");
}
@ -264,7 +295,7 @@ static IOR_offset_t IME_Xfer(int access, void *file, IOR_size_t *buffer,
/*
* Perform fsync().
*/
static void IME_Fsync(void *fd, IOR_param_t *param)
void IME_Fsync(aiori_fd_t *fd, aiori_mod_opt_t *param)
{
if (ime_native_fsync(*(int *)fd) != 0)
WARN("cannot perform fsync on file");
@ -273,33 +304,34 @@ static void IME_Fsync(void *fd, IOR_param_t *param)
/*
* Close a file through the IME interface.
*/
static void IME_Close(void *fd, IOR_param_t *param)
void IME_Close(aiori_fd_t *file, aiori_mod_opt_t *param)
{
if (ime_native_close(*(int *)fd) != 0)
{
free(fd);
ERR("cannot close file");
}
else
free(fd);
if (hints->dryRun)
return;
if (ime_native_close(*(int*)file) != 0)
ERRF("Cannot close file descriptor: %d", *(int*)file);
free(file);
}
/*
* Delete a file through the IME interface.
*/
static void IME_Delete(char *testFileName, IOR_param_t *param)
void IME_Delete(char *testFileName, aiori_mod_opt_t *param)
{
char errmsg[256];
sprintf(errmsg, "[RANK %03d]:cannot delete file %s\n",
rank, testFileName);
if (hints->dryRun)
return;
if (ime_native_unlink(testFileName) != 0)
WARN(errmsg);
EWARNF("[RANK %03d]: cannot delete file \"%s\"\n",
rank, testFileName);
}
/*
* Determine API version.
*/
static char *IME_GetVersion()
char *IME_GetVersion()
{
static char ver[1024] = {};
#if (IME_NATIVE_API_VERSION >= 120)
@ -310,18 +342,17 @@ static char *IME_GetVersion()
return ver;
}
static int IME_StatFS(const char *path, ior_aiori_statfs_t *stat_buf,
IOR_param_t *param)
int IME_Statfs(const char *path, ior_aiori_statfs_t *stat_buf,
aiori_mod_opt_t *module_options)
{
(void)param;
IME_UNUSED(module_options);
#if (IME_NATIVE_API_VERSION >= 130)
struct statvfs statfs_buf;
int ret = ime_native_statvfs(path, &statfs_buf);
if (ret)
return ret;
return ret;
stat_buf->f_bsize = statfs_buf.f_bsize;
stat_buf->f_blocks = statfs_buf.f_blocks;
stat_buf->f_bfree = statfs_buf.f_bfree;
@ -330,38 +361,37 @@ static int IME_StatFS(const char *path, ior_aiori_statfs_t *stat_buf,
return 0;
#else
(void)path;
(void)stat_buf;
IME_UNUSED(path);
IME_UNUSED(stat_buf);
WARN("statfs is currently not supported in IME backend!");
return -1;
#endif
}
static int IME_MkDir(const char *path, mode_t mode, IOR_param_t *param)
int IME_Mkdir(const char *path, mode_t mode, aiori_mod_opt_t * module_options)
{
(void)param;
IME_UNUSED(module_options);
#if (IME_NATIVE_API_VERSION >= 130)
return ime_native_mkdir(path, mode);
#else
(void)path;
(void)mode;
IME_UNUSED(path);
IME_UNUSED(mode);
WARN("mkdir not supported in IME backend!");
return -1;
#endif
}
static int IME_RmDir(const char *path, IOR_param_t *param)
int IME_Rmdir(const char *path, aiori_mod_opt_t *module_options)
{
(void)param;
IME_UNUSED(module_options);
#if (IME_NATIVE_API_VERSION >= 130)
return ime_native_rmdir(path);
#else
(void)path;
IME_UNUSED(path);
WARN("rmdir not supported in IME backend!");
return -1;
@ -371,9 +401,10 @@ static int IME_RmDir(const char *path, IOR_param_t *param)
/*
* Perform stat() through the IME interface.
*/
static int IME_Stat(const char *path, struct stat *buf, IOR_param_t *param)
int IME_Stat(const char *path, struct stat *buf,
aiori_mod_opt_t *module_options)
{
(void)param;
IME_UNUSED(module_options);
return ime_native_stat(path, buf);
}
@ -381,62 +412,40 @@ static int IME_Stat(const char *path, struct stat *buf, IOR_param_t *param)
/*
* Use IME stat() to return aggregate file size.
*/
static IOR_offset_t IME_GetFileSize(IOR_param_t *test, MPI_Comm testComm,
char *testFileName)
IOR_offset_t IME_GetFileSize(aiori_mod_opt_t *test, char *testFileName)
{
struct stat stat_buf;
IOR_offset_t aggFileSizeFromStat, tmpMin, tmpMax, tmpSum;
if (ime_native_stat(testFileName, &stat_buf) != 0) {
ERR("cannot get status of written file");
}
aggFileSizeFromStat = stat_buf.st_size;
if (hints->dryRun)
return 0;
if (test->filePerProc) {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1,
MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
aggFileSizeFromStat = tmpSum;
} else {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1,
MPI_LONG_LONG_INT, MPI_MIN, testComm),
"cannot total data moved");
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1,
MPI_LONG_LONG_INT, MPI_MAX, testComm),
"cannot total data moved");
if (ime_native_stat(testFileName, &stat_buf) != 0)
ERRF("cannot get status of written file %s",
testFileName);
if (tmpMin != tmpMax) {
if (rank == 0) {
WARN("inconsistent file size by different tasks");
}
/* incorrect, but now consistent across tasks */
aggFileSizeFromStat = tmpMin;
}
}
return(aggFileSizeFromStat);
return stat_buf.st_size;
}
#if (IME_NATIVE_API_VERSION >= 132)
/*
* Create a file through mknod interface.
*/
static int IME_Mknod(char *testFileName)
int IME_Mknod(char *testFileName)
{
int ret = ime_native_mknod(testFileName, S_IFREG | S_IRUSR, 0);
if (ret < 0)
ERR("mknod failed");
int ret = ime_native_mknod(testFileName, S_IFREG | S_IRUSR, 0);
if (ret < 0)
ERR("mknod failed");
return ret;
return ret;
}
/*
* Use IME sync to flush page cache of all opened files.
*/
static void IME_Sync(IOR_param_t * param)
void IME_Sync(aiori_mod_opt_t *param)
{
int ret = ime_native_sync(0);
if (ret != 0)
FAIL("Error executing the sync command.");
int ret = ime_native_sync(0);
if (ret != 0)
FAIL("Error executing the sync command.");
}
#endif

View File

@ -562,8 +562,7 @@ static IOR_offset_t SeekOffset(MPI_File fd, IOR_offset_t offset,
* Use MPI_File_get_size() to return aggregate file size.
* NOTE: This function is used by the HDF5 and NCMPI backends.
*/
IOR_offset_t MPIIO_GetFileSize(aiori_mod_opt_t * module_options, MPI_Comm testComm,
char *testFileName)
IOR_offset_t MPIIO_GetFileSize(aiori_mod_opt_t * module_options, char *testFileName)
{
mpiio_options_t * test = (mpiio_options_t*) module_options;
if(hints->dryRun)
@ -589,26 +588,5 @@ IOR_offset_t MPIIO_GetFileSize(aiori_mod_opt_t * module_options, MPI_Comm testCo
if (mpiHints != MPI_INFO_NULL)
MPI_CHECK(MPI_Info_free(&mpiHints), "MPI_Info_free failed");
if (hints->filePerProc == TRUE) {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1,
MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
aggFileSizeFromStat = tmpSum;
} else {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1,
MPI_LONG_LONG_INT, MPI_MIN, testComm),
"cannot total data moved");
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1,
MPI_LONG_LONG_INT, MPI_MAX, testComm),
"cannot total data moved");
if (tmpMin != tmpMax) {
if (rank == 0) {
WARN("inconsistent file size by different tasks");
}
/* incorrect, but now consistent across tasks */
aggFileSizeFromStat = tmpMin;
}
}
return (aggFileSizeFromStat);
}

View File

@ -28,14 +28,19 @@ static option_help options [] = {
/**************************** P R O T O T Y P E S *****************************/
static option_help * PMDK_options();
static void *PMDK_Create(char *, IOR_param_t *);
static void *PMDK_Open(char *, IOR_param_t *);
static IOR_offset_t PMDK_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *);
static void PMDK_Fsync(void *, IOR_param_t *);
static void PMDK_Close(void *, IOR_param_t *);
static void PMDK_Delete(char *, IOR_param_t *);
static IOR_offset_t PMDK_GetFileSize(IOR_param_t *, MPI_Comm, char *);
static aiori_fd_t *PMDK_Create(char *,int iorflags, aiori_mod_opt_t *);
static aiori_fd_t *PMDK_Open(char *, int iorflags, aiori_mod_opt_t *);
static IOR_offset_t PMDK_Xfer(int, aiori_fd_t *, IOR_size_t *, IOR_offset_t, IOR_offset_t, aiori_mod_opt_t *);
static void PMDK_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
static void PMDK_Close(aiori_fd_t *, aiori_mod_opt_t *);
static void PMDK_Delete(char *, aiori_mod_opt_t *);
static IOR_offset_t PMDK_GetFileSize(aiori_mod_opt_t *, char *);
static aiori_xfer_hint_t * hints = NULL;
static void PMDK_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
}
/************************** D E C L A R A T I O N S ***************************/
@ -55,6 +60,7 @@ ior_aiori_t pmdk_aiori = {
.delete = PMDK_Delete,
.get_version = aiori_get_version,
.fsync = PMDK_Fsync,
.xfer_hints = PMDK_xfer_hints,
.get_file_size = PMDK_GetFileSize,
.statfs = aiori_posix_statfs,
.mkdir = aiori_posix_mkdir,
@ -78,18 +84,18 @@ static option_help * PMDK_options(){
/*
* Create and open a memory space through the PMDK interface.
*/
static void *PMDK_Create(char * testFileName, IOR_param_t * param){
static aiori_fd_t *PMDK_Create(char * testFileName, int iorflags, aiori_mod_opt_t * param){
char *pmemaddr = NULL;
int is_pmem;
size_t mapped_len;
size_t open_length;
if(!param->filePerProc){
if(! hints->filePerProc){
fprintf(stdout, "\nPMDK functionality can only be used with filePerProc functionality\n");
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1), "MPI_Abort() error");
}
open_length = param->blockSize * param->segmentCount;
open_length = hints->blockSize * hints->segmentCount;
if((pmemaddr = pmem_map_file(testFileName, open_length,
PMEM_FILE_CREATE|PMEM_FILE_EXCL,
@ -98,7 +104,7 @@ static void *PMDK_Create(char * testFileName, IOR_param_t * param){
perror("pmem_map_file");
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1), "MPI_Abort() error");
}
if(!is_pmem){
fprintf(stdout, "\n is_pmem is %d\n",is_pmem);
fprintf(stdout, "\npmem_map_file thinks the hardware being used is not pmem\n");
@ -106,7 +112,7 @@ static void *PMDK_Create(char * testFileName, IOR_param_t * param){
}
return((void *)pmemaddr);
} /* PMDK_Create() */
@ -115,20 +121,19 @@ static void *PMDK_Create(char * testFileName, IOR_param_t * param){
/*
* Open a memory space through the PMDK interface.
*/
static void *PMDK_Open(char * testFileName, IOR_param_t * param){
static aiori_fd_t *PMDK_Open(char * testFileName,int iorflags, aiori_mod_opt_t * param){
char *pmemaddr = NULL;
int is_pmem;
size_t mapped_len;
size_t open_length;
if(!param->filePerProc){
if(!hints->filePerProc){
fprintf(stdout, "\nPMDK functionality can only be used with filePerProc functionality\n");
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1), "MPI_Abort() error");
}
open_length = param->blockSize * param->segmentCount;
open_length = hints->blockSize * hints->segmentCount;
if((pmemaddr = pmem_map_file(testFileName, 0,
PMEM_FILE_EXCL,
@ -138,12 +143,12 @@ static void *PMDK_Open(char * testFileName, IOR_param_t * param){
fprintf(stdout, "\n %ld %ld\n",open_length, mapped_len);
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1), "MPI_Abort() error");
}
if(!is_pmem){
fprintf(stdout, "pmem_map_file thinks the hardware being used is not pmem\n");
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1), "MPI_Abort() error");
}
return((void *)pmemaddr);
} /* PMDK_Open() */
@ -153,8 +158,8 @@ static void *PMDK_Open(char * testFileName, IOR_param_t * param){
* Write or read access to a memory space created with PMDK. Include drain/flush functionality.
*/
static IOR_offset_t PMDK_Xfer(int access, void *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_param_t * param){
static IOR_offset_t PMDK_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param){
int xferRetries = 0;
long long remaining = (long long)length;
char * ptr = (char *)buffer;
@ -162,11 +167,11 @@ static IOR_offset_t PMDK_Xfer(int access, void *file, IOR_size_t * buffer,
long long i;
long long offset_size;
offset_size = param->offset;
offset_size = offset;
if(access == WRITE){
if(param->fsync){
pmem_memcpy_nodrain(&file[offset_size], ptr, length);
if(hints->fsyncPerWrite){
pmem_memcpy_nodrain(&file[offset_size], ptr, length);
}else{
pmem_memcpy_persist(&file[offset_size], ptr, length);
}
@ -183,7 +188,7 @@ static IOR_offset_t PMDK_Xfer(int access, void *file, IOR_size_t * buffer,
* Perform fsync().
*/
static void PMDK_Fsync(void *fd, IOR_param_t * param)
static void PMDK_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * param)
{
pmem_drain();
} /* PMDK_Fsync() */
@ -194,11 +199,10 @@ static void PMDK_Fsync(void *fd, IOR_param_t * param)
* Stub for close functionality that is not required for PMDK
*/
static void PMDK_Close(void *fd, IOR_param_t * param){
static void PMDK_Close(aiori_fd_t *fd, aiori_mod_opt_t * param){
size_t open_length;
open_length = param->transferSize;
open_length = hints->transferSize;
pmem_unmap(fd, open_length);
} /* PMDK_Close() */
@ -207,38 +211,25 @@ static void PMDK_Close(void *fd, IOR_param_t * param){
* Delete the file backing a memory space through PMDK
*/
static void PMDK_Delete(char *testFileName, IOR_param_t * param)
static void PMDK_Delete(char *testFileName, aiori_mod_opt_t * param)
{
char errmsg[256];
sprintf(errmsg,"[RANK %03d]:cannot delete file %s\n",rank,testFileName);
if (unlink(testFileName) != 0) WARN(errmsg);
} /* PMDK_Delete() */
/******************************************************************************/
/*
* Determine api version.
*/
static void PMDK_SetVersion(IOR_param_t *test)
{
strcpy(test->apiVersion, test->api);
} /* PMDK_SetVersion() */
/******************************************************************************/
/*
* Use POSIX stat() to return aggregate file size.
*/
static IOR_offset_t PMDK_GetFileSize(IOR_param_t * test,
MPI_Comm testComm,
static IOR_offset_t PMDK_GetFileSize(aiori_mod_opt_t * test,
char * testFileName)
{
struct stat stat_buf;
IOR_offset_t aggFileSizeFromStat,
tmpMin, tmpMax, tmpSum;
if (test->filePerProc == FALSE) {
if (hints->filePerProc == FALSE) {
fprintf(stdout, "\nPMDK functionality can only be used with filePerProc functionality\n");
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1), "MPI_Abort() error");
}
@ -248,10 +239,5 @@ static IOR_offset_t PMDK_GetFileSize(IOR_param_t * test,
}
aggFileSizeFromStat = stat_buf.st_size;
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1,
MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
aggFileSizeFromStat = tmpSum;
return(aggFileSizeFromStat);
} /* PMDK_GetFileSize() */

View File

@ -676,8 +676,7 @@ void POSIX_Delete(char *testFileName, aiori_mod_opt_t * param)
/*
* Use POSIX stat() to return aggregate file size.
*/
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, MPI_Comm testComm,
char *testFileName)
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName)
{
if(hints->dryRun)
return 0;
@ -689,26 +688,5 @@ IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, MPI_Comm testComm,
}
aggFileSizeFromStat = stat_buf.st_size;
if (hints->filePerProc == TRUE) {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1,
MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
aggFileSizeFromStat = tmpSum;
} else {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1,
MPI_LONG_LONG_INT, MPI_MIN, testComm),
"cannot total data moved");
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1,
MPI_LONG_LONG_INT, MPI_MAX, testComm),
"cannot total data moved");
if (tmpMin != tmpMax) {
if (rank == 0) {
WARN("inconsistent file size by different tasks");
}
/* incorrect, but now consistent across tasks */
aggFileSizeFromStat = tmpMin;
}
}
return (aggFileSizeFromStat);
}

View File

@ -130,7 +130,7 @@ const char* bucket_name = "ior";
# define IOR_CURL_NOCONTINUE 0x02
# define IOR_CURL_S3_EMC_EXT 0x04 /* allow EMC extensions to S3? */
#ifdef USE_S3_AIORI
#ifdef USE_S3_4C_AIORI
# include <curl/curl.h>
# include "aws4c.h"
#else
@ -167,8 +167,8 @@ static int S3_check_params(IOR_param_t *);
// "Pure S3"
// N:1 writes use multi-part upload
// N:N fails if "transfer-size" != "block-size" (because that requires "append")
ior_aiori_t s3_aiori = {
.name = "S3",
ior_aiori_t s3_4c_aiori = {
.name = "S3-4c",
.name_legacy = NULL,
.create = S3_Create,
.open = S3_Open,

540
src/aiori-S3-libs3.c Normal file
View File

@ -0,0 +1,540 @@
/*
* S3 implementation using the newer libs3
* https://github.com/bji/libs3
* Use one object per file chunk
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <libs3.h>
#include "ior.h"
#include "aiori.h"
#include "aiori-debug.h"
#include "utilities.h"
static aiori_xfer_hint_t * hints = NULL;
static void s3_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
}
/************************** O P T I O N S *****************************/
typedef struct {
int bucket_per_file;
char * access_key;
char * secret_key;
char * host;
char * bucket_prefix;
char * bucket_prefix_cur;
char * locationConstraint;
char * authRegion;
int timeout;
int dont_suffix;
int s3_compatible;
int use_ssl;
S3BucketContext bucket_context;
S3Protocol s3_protocol;
} s3_options_t;
static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
s3_options_t * o = malloc(sizeof(s3_options_t));
if (init_values != NULL){
memcpy(o, init_values, sizeof(s3_options_t));
}else{
memset(o, 0, sizeof(s3_options_t));
}
*init_backend_options = (aiori_mod_opt_t*) o;
o->bucket_prefix = "ior";
option_help h [] = {
{0, "S3.bucket-per-file", "Use one bucket to map one file/directory, otherwise one bucket is used to store all dirs/files.", OPTION_FLAG, 'd', & o->bucket_per_file},
{0, "S3.bucket-name-prefix", "The prefix of the bucket(s).", OPTION_OPTIONAL_ARGUMENT, 's', & o->bucket_prefix},
{0, "S3.dont-suffix-bucket", "By default a hash will be added to the bucket name to increase uniqueness, this disables the option.", OPTION_FLAG, 'd', & o->dont_suffix },
{0, "S3.s3-compatible", "to be selected when using S3 compatible storage", OPTION_FLAG, 'd', & o->s3_compatible },
{0, "S3.use-ssl", "used to specify that SSL is needed for the connection", OPTION_FLAG, 'd', & o->use_ssl },
{0, "S3.host", "The host optionally followed by:port.", OPTION_OPTIONAL_ARGUMENT, 's', & o->host},
{0, "S3.secret-key", "The secret key.", OPTION_OPTIONAL_ARGUMENT, 's', & o->secret_key},
{0, "S3.access-key", "The access key.", OPTION_OPTIONAL_ARGUMENT, 's', & o->access_key},
LAST_OPTION
};
option_help * help = malloc(sizeof(h));
memcpy(help, h, sizeof(h));
return help;
}
static void def_file_name(s3_options_t * o, char * out_name, char const * path){
if(o->bucket_per_file){
out_name += sprintf(out_name, "%s-", o->bucket_prefix_cur);
}
// duplicate path except "/"
while(*path != 0){
char c = *path;
if(((c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') )){
*out_name = *path;
out_name++;
}else if(c >= 'A' && c <= 'Z'){
*out_name = *path + ('a' - 'A');
out_name++;
}
path++;
}
*out_name = '\0';
}
static void def_bucket_name(s3_options_t * o, char * out_name, char const * path){
// S3_MAX_BUCKET_NAME_SIZE
if(o->bucket_per_file){
out_name += sprintf(out_name, "%s-", o->bucket_prefix_cur);
}
// duplicate path except "/"
while(*path != 0){
char c = *path;
if(((c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') )){
*out_name = *path;
out_name++;
}else if(c >= 'A' && c <= 'Z'){
*out_name = *path + ('a' - 'A');
out_name++;
}
path++;
}
*out_name = '\0';
// S3Status S3_validate_bucket_name(const char *bucketName, S3UriStyle uriStyle);
}
struct data_handling{
IOR_size_t * buf;
int64_t size;
};
static S3Status s3status = S3StatusInterrupted;
static S3ErrorDetails s3error = {NULL};
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData){
s3status = S3StatusOK;
return s3status;
}
static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) {
s3status = status;
if (error == NULL){
s3error.message = NULL;
}else{
s3error = *error;
}
return;
}
#define CHECK_ERROR(p) \
if (s3status != S3StatusOK){ \
EWARNF("S3 %s:%d (path:%s) \"%s\": %s %s", __FUNCTION__, __LINE__, p, S3_get_status_name(s3status), s3error.message, s3error.furtherDetails ? s3error.furtherDetails : ""); \
}
static S3ResponseHandler responseHandler = { &responsePropertiesCallback, &responseCompleteCallback };
static char * S3_getVersion()
{
return "0.5";
}
static void S3_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * options)
{
// Not needed
}
static void S3_Sync(aiori_mod_opt_t * options)
{
// Not needed
}
static S3Status S3ListResponseCallback(const char *ownerId, const char *ownerDisplayName, const char *bucketName, int64_t creationDateSeconds, void *callbackData){
uint64_t * count = (uint64_t*) callbackData;
*count++;
return S3StatusOK;
}
static S3ListServiceHandler listhandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & S3ListResponseCallback};
static int S3_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options){
stat->f_bsize = 1;
stat->f_blocks = 1;
stat->f_bfree = 1;
stat->f_bavail = 1;
stat->f_ffree = 1;
s3_options_t * o = (s3_options_t*) options;
// use the number of bucket as files
uint64_t buckets = 0;
S3_list_service(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host,
o->authRegion, NULL, o->timeout, & listhandler, & buckets);
stat->f_files = buckets;
CHECK_ERROR(o->authRegion);
return 0;
}
static S3Status S3multipart_handler(const char *upload_id, void *callbackData){
*((char const**)(callbackData)) = upload_id;
return S3StatusOK;
}
static S3MultipartInitialHandler multipart_handler = { {&responsePropertiesCallback, &responseCompleteCallback }, & S3multipart_handler};
typedef struct{
char * object;
} S3_fd_t;
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData){
struct data_handling * dh = (struct data_handling *) callbackData;
const int64_t size = dh->size > bufferSize ? bufferSize : dh->size;
if(size == 0) return 0;
memcpy(buffer, dh->buf, size);
dh->buf = (IOR_size_t*) ((char*)(dh->buf) + size);
dh->size -= size;
return size;
}
static S3PutObjectHandler putObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & putObjectDataCallback };
static aiori_fd_t *S3_Create(char *path, int iorflags, aiori_mod_opt_t * options)
{
char * upload_id;
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
if(iorflags & IOR_CREAT){
if(o->bucket_per_file){
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, S3CannedAclPrivate, o->locationConstraint, NULL, o->timeout, & responseHandler, NULL);
}else{
struct data_handling dh = { .buf = NULL, .size = 0 };
S3_put_object(& o->bucket_context, p, 0, NULL, NULL, o->timeout, &putObjectHandler, & dh);
}
if (s3status != S3StatusOK){
CHECK_ERROR(p);
return NULL;
}
}
S3_fd_t * fd = malloc(sizeof(S3_fd_t));
fd->object = strdup(p);
return (aiori_fd_t*) fd;
}
static S3Status statResponsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData){
// check the size
struct stat *buf = (struct stat*) callbackData;
if(buf != NULL){
buf->st_size = properties->contentLength;
buf->st_mtime = properties->lastModified;
}
s3status = S3StatusOK;
return s3status;
}
static S3ResponseHandler statResponseHandler = { &statResponsePropertiesCallback, &responseCompleteCallback };
static aiori_fd_t *S3_Open(char *path, int flags, aiori_mod_opt_t * options)
{
if(flags & IOR_CREAT){
return S3_Create(path, flags, options);
}
if(flags & IOR_WRONLY){
WARN("S3 IOR_WRONLY is not supported");
}
if(flags & IOR_RDWR){
WARN("S3 IOR_RDWR is not supported");
}
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
if (o->bucket_per_file){
S3_test_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key,
NULL, o->host, p, o->authRegion, 0, NULL,
NULL, o->timeout, & responseHandler, NULL);
}else{
struct stat buf;
S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, & buf);
}
if (s3status != S3StatusOK){
CHECK_ERROR(p);
return NULL;
}
S3_fd_t * fd = malloc(sizeof(S3_fd_t));
fd->object = strdup(p);
return (aiori_fd_t*) fd;
}
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData){
struct data_handling * dh = (struct data_handling *) callbackData;
const int64_t size = dh->size > bufferSize ? bufferSize : dh->size;
memcpy(dh->buf, buffer, size);
dh->buf = (IOR_size_t*) ((char*)(dh->buf) + size);
dh->size -= size;
return S3StatusOK;
}
static S3GetObjectHandler getObjectHandler = { { &responsePropertiesCallback, &responseCompleteCallback }, & getObjectDataCallback };
static IOR_offset_t S3_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options){
S3_fd_t * fd = (S3_fd_t *) afd;
struct data_handling dh = { .buf = buffer, .size = length };
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
if(o->bucket_per_file){
o->bucket_context.bucketName = fd->object;
if(offset != 0){
sprintf(p, "%ld-%ld", (long) offset, (long) length);
}else{
sprintf(p, "0");
}
}else{
if(offset != 0){
sprintf(p, "%s-%ld-%ld", fd->object, (long) offset, (long) length);
}else{
sprintf(p, "%s", fd->object);
}
}
if(access == WRITE){
S3_put_object(& o->bucket_context, p, length, NULL, NULL, o->timeout, &putObjectHandler, & dh);
}else{
S3_get_object(& o->bucket_context, p, NULL, 0, length, NULL, o->timeout, &getObjectHandler, & dh);
}
if (! o->s3_compatible){
CHECK_ERROR(p);
}
return length;
}
static void S3_Close(aiori_fd_t * afd, aiori_mod_opt_t * options)
{
S3_fd_t * fd = (S3_fd_t *) afd;
free(fd->object);
free(afd);
}
typedef struct {
int status; // do not reorder!
s3_options_t * o;
int truncated;
char const *nextMarker;
} s3_delete_req;
S3Status list_delete_cb(int isTruncated, const char *nextMarker, int contentsCount, const S3ListBucketContent *contents, int commonPrefixesCount, const char **commonPrefixes, void *callbackData){
s3_delete_req * req = (s3_delete_req*) callbackData;
for(int i=0; i < contentsCount; i++){
S3_delete_object(& req->o->bucket_context, contents[i].key, NULL, req->o->timeout, & responseHandler, NULL);
}
req->truncated = isTruncated;
if(isTruncated){
req->nextMarker = nextMarker;
}
return S3StatusOK;
}
static S3ListBucketHandler list_delete_handler = {{&responsePropertiesCallback, &responseCompleteCallback }, list_delete_cb};
static void S3_Delete(char *path, aiori_mod_opt_t * options)
{
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
if(o->bucket_per_file){
o->bucket_context.bucketName = p;
s3_delete_req req = {0, o, 0, NULL};
do{
S3_list_bucket(& o->bucket_context, NULL, req.nextMarker, NULL, INT_MAX, NULL, o->timeout, & list_delete_handler, & req);
}while(req.truncated);
S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, NULL, o->timeout, & responseHandler, NULL);
}else{
s3_delete_req req = {0, o, 0, NULL};
do{
S3_list_bucket(& o->bucket_context, p, req.nextMarker, NULL, INT_MAX, NULL, o->timeout, & list_delete_handler, & req);
}while(req.truncated);
S3_delete_object(& o->bucket_context, p, NULL, o->timeout, & responseHandler, NULL);
}
CHECK_ERROR(p);
}
static int S3_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_bucket_name(o, p, path);
if (o->bucket_per_file){
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, S3CannedAclPrivate, o->locationConstraint, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR(p);
return 0;
}else{
struct data_handling dh = { .buf = NULL, .size = 0 };
S3_put_object(& o->bucket_context, p, 0, NULL, NULL, o->timeout, & putObjectHandler, & dh);
if (! o->s3_compatible){
CHECK_ERROR(p);
}
return 0;
}
}
static int S3_rmdir (const char *path, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_bucket_name(o, p, path);
if (o->bucket_per_file){
S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, o->host, p, o->authRegion, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR(p);
return 0;
}else{
S3_delete_object(& o->bucket_context, p, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR(p);
return 0;
}
}
static int S3_stat(const char *path, struct stat *buf, aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
char p[FILENAME_MAX];
def_file_name(o, p, path);
memset(buf, 0, sizeof(struct stat));
// TODO count the individual file fragment sizes together
if (o->bucket_per_file){
S3_test_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key,
NULL, o->host, p, o->authRegion, 0, NULL,
NULL, o->timeout, & responseHandler, NULL);
}else{
S3_head_object(& o->bucket_context, p, NULL, o->timeout, & statResponseHandler, buf);
}
if (s3status != S3StatusOK){
return -1;
}
return 0;
}
static int S3_access (const char *path, int mode, aiori_mod_opt_t * options){
struct stat buf;
return S3_stat(path, & buf, options);
}
static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName)
{
struct stat buf;
if(S3_stat(testFileName, & buf, options) != 0) return -1;
return buf.st_size;
}
static int S3_check_params(aiori_mod_opt_t * options){
if(hints->blockSize != hints->transferSize){
ERR("S3 Blocksize must be transferSize");
}
return 0;
}
static void S3_init(aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
int ret = S3_initialize(NULL, S3_INIT_ALL, o->host);
if(ret != S3StatusOK)
FAIL("Could not initialize S3 library");
// create a bucket id based on access-key using a trivial checksumming
if(! o->dont_suffix){
uint64_t c = 0;
char * r = o->access_key;
for(uint64_t pos = 1; (*r) != '\0' ; r++, pos*=10) {
c += (*r) * pos;
}
int count = snprintf(NULL, 0, "%s%lu", o->bucket_prefix, c % 1000);
char * old_prefix = o->bucket_prefix;
o->bucket_prefix_cur = malloc(count + 1);
sprintf(o->bucket_prefix_cur, "%s%lu", old_prefix, c % 1000);
}else{
o->bucket_prefix_cur = o->bucket_prefix;
}
// init bucket context
memset(& o->bucket_context, 0, sizeof(o->bucket_context));
o->bucket_context.hostName = o->host;
o->bucket_context.bucketName = o->bucket_prefix_cur;
if (o->use_ssl){
o->s3_protocol = S3ProtocolHTTPS;
}else{
o->s3_protocol = S3ProtocolHTTP;
}
o->bucket_context.protocol = o->s3_protocol;
o->bucket_context.uriStyle = S3UriStylePath;
o->bucket_context.accessKeyId = o->access_key;
o->bucket_context.secretAccessKey = o->secret_key;
if (! o->bucket_per_file && rank == 0){
S3_create_bucket(o->s3_protocol, o->access_key, o->secret_key, NULL, o->host, o->bucket_context.bucketName, o->authRegion, S3CannedAclPrivate, o->locationConstraint, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR(o->bucket_context.bucketName);
}
if ( ret != S3StatusOK ){
FAIL("S3 error %s", S3_get_status_name(ret));
}
}
static void S3_final(aiori_mod_opt_t * options){
s3_options_t * o = (s3_options_t*) options;
if (! o->bucket_per_file && rank == 0){
S3_delete_bucket(o->s3_protocol, S3UriStylePath, o->access_key, o->secret_key, NULL, o->host, o->bucket_context.bucketName, o->authRegion, NULL, o->timeout, & responseHandler, NULL);
CHECK_ERROR(o->bucket_context.bucketName);
}
S3_deinitialize();
}
ior_aiori_t S3_libS3_aiori = {
.name = "S3-libs3",
.name_legacy = NULL,
.create = S3_Create,
.open = S3_Open,
.xfer = S3_Xfer,
.close = S3_Close,
.delete = S3_Delete,
.get_version = S3_getVersion,
.fsync = S3_Fsync,
.xfer_hints = s3_xfer_hints,
.get_file_size = S3_GetFileSize,
.statfs = S3_statfs,
.mkdir = S3_mkdir,
.rmdir = S3_rmdir,
.access = S3_access,
.stat = S3_stat,
.initialize = S3_init,
.finalize = S3_final,
.get_options = S3_options,
.check_params = S3_check_params,
.sync = S3_Sync,
.enable_mdtest = true
};

View File

@ -4,6 +4,7 @@
/* This file contains only debug relevant helpers */
#include <stdio.h>
#include <mpi.h>
extern FILE * out_logfile;
extern int verbose; /* verbose output */

View File

@ -68,8 +68,11 @@ ior_aiori_t *available_aiori[] = {
#ifdef USE_MMAP_AIORI
&mmap_aiori,
#endif
#ifdef USE_S3_AIORI
&s3_aiori,
#ifdef USE_S3_LIBS3_AIORI
&S3_libS3_aiori,
#endif
#ifdef USE_S3_4C_AIORI
&s3_4c_aiori,
&s3_plus_aiori,
&s3_emc_aiori,
#endif
@ -100,6 +103,7 @@ void * airoi_update_module_options(const ior_aiori_t * backend, options_all_t *
}
options_all_t * airoi_create_all_module_options(option_help * global_options){
if(! out_logfile) out_logfile = stdout;
int airoi_c = aiori_count();
options_all_t * opt = malloc(sizeof(options_all_t));
opt->module_count = airoi_c + 1;

View File

@ -15,12 +15,6 @@
#ifndef _AIORI_H
#define _AIORI_H
#include <mpi.h>
#ifndef MPI_FILE_NULL
# include <mpio.h>
#endif /* not MPI_FILE_NULL */
#include <sys/stat.h>
#include <stdbool.h>
@ -101,12 +95,12 @@ typedef struct ior_aiori {
*/
void (*xfer_hints)(aiori_xfer_hint_t * params);
IOR_offset_t (*xfer)(int access, aiori_fd_t *, IOR_size_t *,
IOR_offset_t size, IOR_offset_t offset, aiori_mod_opt_t *);
void (*close)(aiori_fd_t *, aiori_mod_opt_t *);
void (*delete)(char *, aiori_mod_opt_t *);
IOR_offset_t size, IOR_offset_t offset, aiori_mod_opt_t * module_options);
void (*close)(aiori_fd_t *, aiori_mod_opt_t * module_options);
void (*delete)(char *, aiori_mod_opt_t * module_options);
char* (*get_version)(void);
void (*fsync)(aiori_fd_t *, aiori_mod_opt_t *);
IOR_offset_t (*get_file_size)(aiori_mod_opt_t * module_options, MPI_Comm, char *);
void (*fsync)(aiori_fd_t *, aiori_mod_opt_t * module_options);
IOR_offset_t (*get_file_size)(aiori_mod_opt_t * module_options, char * filename);
int (*statfs) (const char *, ior_aiori_statfs_t *, aiori_mod_opt_t * module_options);
int (*mkdir) (const char *path, mode_t mode, aiori_mod_opt_t * module_options);
int (*rmdir) (const char *path, aiori_mod_opt_t * module_options);
@ -136,7 +130,8 @@ extern ior_aiori_t ncmpi_aiori;
extern ior_aiori_t posix_aiori;
extern ior_aiori_t pmdk_aiori;
extern ior_aiori_t mmap_aiori;
extern ior_aiori_t s3_aiori;
extern ior_aiori_t S3_libS3_aiori;
extern ior_aiori_t s3_4c_aiori;
extern ior_aiori_t s3_plus_aiori;
extern ior_aiori_t s3_emc_aiori;
extern ior_aiori_t rados_aiori;
@ -164,7 +159,7 @@ void aiori_posix_xfer_hints(aiori_xfer_hint_t * params);
aiori_fd_t *POSIX_Create(char *testFileName, int flags, aiori_mod_opt_t * module_options);
int POSIX_Mknod(char *testFileName);
aiori_fd_t *POSIX_Open(char *testFileName, int flags, aiori_mod_opt_t * module_options);
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, MPI_Comm testComm, char *testFileName);
IOR_offset_t POSIX_GetFileSize(aiori_mod_opt_t * test, char *testFileName);
void POSIX_Delete(char *testFileName, aiori_mod_opt_t * module_options);
void POSIX_Close(aiori_fd_t *fd, aiori_mod_opt_t * module_options);
option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
@ -172,7 +167,7 @@ option_help * POSIX_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_o
/* NOTE: these 3 MPI-IO functions are exported for reuse by HDF5/PNetCDF */
void MPIIO_Delete(char *testFileName, aiori_mod_opt_t * module_options);
IOR_offset_t MPIIO_GetFileSize(aiori_mod_opt_t * options, MPI_Comm testComm, char *testFileName);
int MPIIO_Access(const char *, int, aiori_mod_opt_t *);
IOR_offset_t MPIIO_GetFileSize(aiori_mod_opt_t * options, char *testFileName);
int MPIIO_Access(const char *, int, aiori_mod_opt_t * module_options);
#endif /* not _AIORI_H */

View File

@ -313,14 +313,39 @@ CheckForOutliers(IOR_param_t *test, const double *timer, const int access)
* Check if actual file size equals expected size; if not use actual for
* calculating performance rate.
*/
static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep,
const int access)
static void CheckFileSize(IOR_test_t *test, char * testFilename, IOR_offset_t dataMoved, int rep, const int access)
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
IOR_point_t *point = (access == WRITE) ? &results[rep].write :
&results[rep].read;
/* get the size of the file */
IOR_offset_t aggFileSizeFromStat, tmpMin, tmpMax, tmpSum;
aggFileSizeFromStat = backend->get_file_size(params->backend_options, testFilename);
if (params->hints.filePerProc == TRUE) {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1,
MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot reduce total data moved");
aggFileSizeFromStat = tmpSum;
} else {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1,
MPI_LONG_LONG_INT, MPI_MIN, testComm),
"cannot reduce total data moved");
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1,
MPI_LONG_LONG_INT, MPI_MAX, testComm),
"cannot reduce total data moved");
if (tmpMin != tmpMax) {
if (rank == 0) {
WARN("inconsistent file size by different tasks");
}
/* incorrect, but now consistent across tasks */
aggFileSizeFromStat = tmpMin;
}
}
point->aggFileSizeFromStat = aggFileSizeFromStat;
MPI_CHECK(MPI_Allreduce(&dataMoved, &point->aggFileSizeFromXfer,
1, MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
@ -1354,6 +1379,7 @@ static void TestIoSys(IOR_test_t *test)
params->open = WRITE;
timer[0] = GetTimeStamp();
fd = backend->create(testFileName, IOR_WRONLY | IOR_CREAT | IOR_TRUNC, params->backend_options);
if(fd == NULL) FAIL("Cannot create file");
timer[1] = GetTimeStamp();
if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm),
@ -1379,13 +1405,9 @@ static void TestIoSys(IOR_test_t *test)
timer[5] = GetTimeStamp();
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
/* get the size of the file just written */
results[rep].write.aggFileSizeFromStat =
backend->get_file_size(params->backend_options, testComm, testFileName);
/* check if stat() of file doesn't equal expected file size,
use actual amount of byte moved */
CheckFileSize(test, dataMoved, rep, WRITE);
CheckFileSize(test, testFileName, dataMoved, rep, WRITE);
if (verbose >= VERBOSE_3)
WriteTimes(params, timer, rep, WRITE);
@ -1428,6 +1450,7 @@ static void TestIoSys(IOR_test_t *test)
GetTestFileName(testFileName, params);
params->open = WRITECHECK;
fd = backend->open(testFileName, IOR_RDONLY, params->backend_options);
if(fd == NULL) FAIL("Cannot open file");
dataMoved = WriteOrRead(params, &results[rep], fd, WRITECHECK, &ioBuffers);
backend->close(fd, params->backend_options);
rankOffset = 0;
@ -1500,6 +1523,7 @@ static void TestIoSys(IOR_test_t *test)
params->open = READ;
timer[0] = GetTimeStamp();
fd = backend->open(testFileName, IOR_RDONLY, params->backend_options);
if(fd == NULL) FAIL("Cannot open file");
timer[1] = GetTimeStamp();
if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm),
@ -1519,14 +1543,9 @@ static void TestIoSys(IOR_test_t *test)
backend->close(fd, params->backend_options);
timer[5] = GetTimeStamp();
/* get the size of the file just read */
results[rep].read.aggFileSizeFromStat =
backend->get_file_size(params->backend_options, testComm,
testFileName);
/* check if stat() of file doesn't equal expected file size,
use actual amount of byte moved */
CheckFileSize(test, dataMoved, rep, READ);
CheckFileSize(test, testFileName, dataMoved, rep, READ);
if (verbose >= VERBOSE_3)
WriteTimes(params, timer, rep, READ);

View File

@ -39,6 +39,12 @@
#include "iordef.h"
#include "aiori.h"
#include <mpi.h>
#ifndef MPI_FILE_NULL
# include <mpio.h>
#endif /* not MPI_FILE_NULL */
#define ISPOWEROFTWO(x) ((x != 0) && !(x & (x - 1)))
/******************** DATA Packet Type ***************************************/
/* Holds the types of data packets: generic, offset, timestamp, incompressible */

View File

@ -354,8 +354,9 @@ void mdtest_verify_data(int item, char * buffer, size_t bytes){
for( ; i < bytes; i++){
if(buffer[i] != (char) (i + 1)){
VERBOSE(0, -1, "Error verifying byte %zu for item %d", i, item);
VERBOSE(5, -1, "Error verifying byte %zu for item %d", i, item);
verification_error++;
break;
}
}
}
@ -1931,6 +1932,7 @@ mdtest_results_t * mdtest_run(int argc, char **argv, MPI_Comm world_com, FILE *
int last = 0;
int stride = 1;
int iterations = 1;
int created_root_dir = 0; // was the root directory existing or newly created
verbose = 0;
int no_barriers = 0;
@ -2188,6 +2190,7 @@ mdtest_results_t * mdtest_run(int argc, char **argv, MPI_Comm world_com, FILE *
if (backend->mkdir(testdirpath, DIRMODE, backend_options) != 0) {
FAIL("Unable to create test directory path %s", testdirpath);
}
created_root_dir = 1;
}
/* display disk usage */
@ -2294,6 +2297,10 @@ mdtest_results_t * mdtest_run(int argc, char **argv, MPI_Comm world_com, FILE *
}
}
if (created_root_dir && backend->rmdir(testdirpath, backend_options) != 0) {
FAIL("Unable to remote test directory path %s", testdirpath);
}
if(verification_error){
VERBOSE(0, -1, "\nERROR: verifying the data read! Take the performance values with care!\n");
}

View File

@ -60,8 +60,8 @@ int rankOffset = 0;
int verbose = VERBOSE_0; /* verbose output */
MPI_Comm testComm;
MPI_Comm mpi_comm_world;
FILE * out_logfile;
FILE * out_resultfile;
FILE * out_logfile = NULL;
FILE * out_resultfile = NULL;
enum OutputFormat_t outputFormat;
/***************************** F U N C T I O N S ******************************/

33
testing/s3.sh Executable file
View File

@ -0,0 +1,33 @@
#!/bin/bash
# Test basic S3 behavior using minio.
ROOT="$(dirname ${BASH_SOURCE[0]})"
TYPE="basic"
if [[ ! -e $ROOT/minio ]] ; then
wget https://dl.min.io/server/minio/release/linux-amd64/minio
mv minio $ROOT
chmod +x $ROOT/minio
fi
export MINIO_ACCESS_KEY=accesskey
export MINIO_SECRET_KEY=secretkey
$ROOT/minio --quiet server /dev/shm &
export IOR_EXTRA="-o test"
export MDTEST_EXTRA="-d test"
source $ROOT/test-lib.sh
I=100 # Start with this ID
IOR 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey -b $((10*1024*1024)) -t $((10*1024*1024))
MDTEST 2 -a S3-libs3 -L --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey -n 10
MDTEST 2 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey -n 5 -w 1024 -e 1024
IOR 1 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey -b $((10*1024)) -t $((10*1024)) --S3.bucket-per-file
MDTEST 1 -a S3-libs3 -L --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey --S3.bucket-per-file -n 5
MDTEST 1 -a S3-libs3 --S3.host=localhost:9000 --S3.secret-key=secretkey --S3.access-key=accesskey --S3.bucket-per-file -n 10 -w 1024 -e 1024
kill -9 %1

View File

@ -40,7 +40,7 @@ I=0
function IOR(){
RANKS=$1
shift
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/ior ${@} ${IOR_EXTRA} -o ${IOR_TMP}/ior"
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/ior ${@} -o ${IOR_TMP}/ior ${IOR_EXTRA}"
$WHAT 1>"${IOR_OUT}/test_out.$I" 2>&1
if [[ $? != 0 ]]; then
echo -n "ERR"
@ -56,7 +56,7 @@ function MDTEST(){
RANKS=$1
shift
rm -rf ${IOR_TMP}/mdest
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/mdtest ${@} ${MDTEST_EXTRA} -d ${IOR_TMP}/mdest -V=4"
WHAT="${IOR_MPIRUN} $RANKS ${IOR_BIN_DIR}/mdtest ${@} -d ${IOR_TMP}/mdest ${MDTEST_EXTRA} -V=4"
$WHAT 1>"${IOR_OUT}/test_out.$I" 2>&1
if [[ $? != 0 ]]; then
echo -n "ERR"