diff --git a/configure.ac b/configure.ac index 00e4882..2e76042 100755 --- a/configure.ac +++ b/configure.ac @@ -142,6 +142,16 @@ AM_COND_IF([USE_POSIX_AIORI],[ AC_DEFINE([USE_POSIX_AIORI], [], [Build POSIX backend AIORI]) ]) +# RADOS support +AC_ARG_WITH([rados], + [AS_HELP_STRING([--with-rados], + [support IO with librados backend @<:@default=no@:>@])], + [], + [with_rados=no]) +AM_CONDITIONAL([USE_RADOS_AIORI], [test x$with_rados = xyes]) +AM_COND_IF([USE_RADOS_AIORI],[ + AC_DEFINE([USE_RADOS_AIORI], [], [Build RADOS backend AIORI]) +]) diff --git a/src/Makefile.am b/src/Makefile.am index 4369c77..2bb9e96 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -53,6 +53,10 @@ if USE_POSIX_AIORI extraSOURCES += aiori-POSIX.c endif +if USE_RADOS_AIORI +extraSOURCES += aiori-RADOS.c +extraLDADD += -lrados +endif if USE_S3_AIORI extraSOURCES += aiori-S3.c diff --git a/src/aiori-MPIIO.c b/src/aiori-MPIIO.c index 6251348..d8c8d5e 100755 --- a/src/aiori-MPIIO.c +++ b/src/aiori-MPIIO.c @@ -38,10 +38,9 @@ static void *MPIIO_Open(char *, IOR_param_t *); static IOR_offset_t MPIIO_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *); static void MPIIO_Close(void *, IOR_param_t *); -void MPIIO_Delete(char *, IOR_param_t *); static void MPIIO_SetVersion(IOR_param_t *); static void MPIIO_Fsync(void *, IOR_param_t *); -int MPIIO_Access(const char *, int, IOR_param_t *); + /************************** D E C L A R A T I O N S ***************************/ @@ -368,6 +367,8 @@ static IOR_offset_t MPIIO_Xfer(int access, void *fd, IOR_size_t * buffer, } } } + if((access == WRITE) && (param->fsyncPerWrite == TRUE)) + MPIIO_Fsync(fd, param); return (length); } @@ -376,8 +377,8 @@ static IOR_offset_t MPIIO_Xfer(int access, void *fd, IOR_size_t * buffer, */ static void MPIIO_Fsync(void *fdp, IOR_param_t * param) { - MPI_File * fd = (MPI_File*) fdp; - MPI_File_sync(*fd); + if (MPI_File_sync(*(MPI_File *)fd) != MPI_SUCCESS) + EWARN("fsync() failed"); } /* @@ -506,4 +507,4 @@ IOR_offset_t MPIIO_GetFileSize(IOR_param_t * test, MPI_Comm testComm, } return (aggFileSizeFromStat); -} +} \ No newline at end of file diff --git a/src/aiori-RADOS.c b/src/aiori-RADOS.c new file mode 100755 index 0000000..6d88a7c --- /dev/null +++ b/src/aiori-RADOS.c @@ -0,0 +1,347 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + */ +/******************************************************************************\ +* * +* (C) 2015 The University of Chicago * +* * +* See COPYRIGHT in top-level directory. * +* * +******************************************************************************** +* +* Implement abstract I/O interface for RADOS. +* +\******************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include +#include +#include + +#include "ior.h" +#include "iordef.h" +#include "aiori.h" +#include "utilities.h" + +/**************************** P R O T O T Y P E S *****************************/ +static void *RADOS_Create(char *, IOR_param_t *); +static void *RADOS_Open(char *, IOR_param_t *); +static IOR_offset_t RADOS_Xfer(int, void *, IOR_size_t *, + IOR_offset_t, IOR_param_t *); +static void RADOS_Close(void *, IOR_param_t *); +static void RADOS_Delete(char *, IOR_param_t *); +static void RADOS_SetVersion(IOR_param_t *); +static void RADOS_Fsync(void *, IOR_param_t *); +static IOR_offset_t RADOS_GetFileSize(IOR_param_t *, MPI_Comm, char *); +static int RADOS_StatFS(const char *, ior_aiori_statfs_t *, IOR_param_t *); +static int RADOS_MkDir(const char *, mode_t, IOR_param_t *); +static int RADOS_RmDir(const char *, IOR_param_t *); +static int RADOS_Access(const char *, int, IOR_param_t *); +static int RADOS_Stat(const char *, struct stat *, IOR_param_t *); + +/************************** D E C L A R A T I O N S ***************************/ + +ior_aiori_t rados_aiori = { + .name = "RADOS", + .create = RADOS_Create, + .open = RADOS_Open, + .xfer = RADOS_Xfer, + .close = RADOS_Close, + .delete = RADOS_Delete, + .set_version = RADOS_SetVersion, + .fsync = RADOS_Fsync, + .get_file_size = RADOS_GetFileSize, + .statfs = RADOS_StatFS, + .mkdir = RADOS_MkDir, + .rmdir = RADOS_RmDir, + .access = RADOS_Access, + .stat = RADOS_Stat, +}; + +#define RADOS_ERR(__err_str, __ret) do { \ + errno = -__ret; \ + ERR(__err_str); \ +} while(0) + +/***************************** F U N C T I O N S ******************************/ + +static void RADOS_Cluster_Init(IOR_param_t * param) +{ + int ret; + + /* create RADOS cluster handle */ + /* XXX: HARDCODED RADOS USER NAME */ + ret = rados_create(¶m->rados_cluster, "admin"); + if (ret) + RADOS_ERR("unable to create RADOS cluster handle", ret); + + /* set the handle using the Ceph config */ + /* XXX: HARDCODED RADOS CONF PATH */ + ret = rados_conf_read_file(param->rados_cluster, "/etc/ceph/ceph.conf"); + if (ret) + RADOS_ERR("unable to read RADOS config file", ret); + + /* connect to the RADOS cluster */ + ret = rados_connect(param->rados_cluster); + if (ret) + RADOS_ERR("unable to connect to the RADOS cluster", ret); + + /* create an io context for the pool we are operating on */ + /* XXX: HARDCODED RADOS POOL NAME */ + ret = rados_ioctx_create(param->rados_cluster, "cephfs_data", + ¶m->rados_ioctx); + if (ret) + RADOS_ERR("unable to create an I/O context for the RADOS cluster", ret); + + return; +} + +static void RADOS_Cluster_Finalize(IOR_param_t * param) +{ + /* ioctx destroy */ + rados_ioctx_destroy(param->rados_ioctx); + + /* shutdown */ + rados_shutdown(param->rados_cluster); +} + +static void *RADOS_Create_Or_Open(char *testFileName, IOR_param_t * param, int create_flag) +{ + int ret; + char *oid; + + RADOS_Cluster_Init(param); + + if (param->useO_DIRECT == TRUE) + WARN("direct I/O mode is not implemented in RADOS\n"); + + oid = strdup(testFileName); + if (!oid) + ERR("unable to allocate RADOS oid"); + + if (create_flag) + { + rados_write_op_t create_op; + int rados_create_flag; + + if (param->openFlags & IOR_EXCL) + rados_create_flag = LIBRADOS_CREATE_EXCLUSIVE; + else + rados_create_flag = LIBRADOS_CREATE_IDEMPOTENT; + + /* create a RADOS "write op" for creating the object */ + create_op = rados_create_write_op(); + rados_write_op_create(create_op, rados_create_flag, NULL); + ret = rados_write_op_operate(create_op, param->rados_ioctx, oid, + NULL, 0); + rados_release_write_op(create_op); + if (ret) + RADOS_ERR("unable to create RADOS object", ret); + } + else + { + /* XXX actually, we should probably assert oid existence here? */ + } + + return (void *)oid; +} + +static void *RADOS_Create(char *testFileName, IOR_param_t * param) +{ + return RADOS_Create_Or_Open(testFileName, param, TRUE); +} + +static void *RADOS_Open(char *testFileName, IOR_param_t * param) +{ + if (param->openFlags & IOR_CREAT) + return RADOS_Create_Or_Open(testFileName, param, TRUE); + else + return RADOS_Create_Or_Open(testFileName, param, FALSE); +} + +static IOR_offset_t RADOS_Xfer(int access, void *fd, IOR_size_t * buffer, + IOR_offset_t length, IOR_param_t * param) +{ + int ret; + char *oid = (char *)fd; + + if (access == WRITE) + { + rados_write_op_t write_op; + + write_op = rados_create_write_op(); + rados_write_op_write(write_op, (const char *)buffer, + length, param->offset); + ret = rados_write_op_operate(write_op, param->rados_ioctx, + oid, NULL, 0); + rados_release_write_op(write_op); + if (ret) + RADOS_ERR("unable to write RADOS object", ret); + } + else /* READ */ + { + int read_ret; + size_t bytes_read; + rados_read_op_t read_op; + + read_op = rados_create_read_op(); + rados_read_op_read(read_op, param->offset, length, (char *)buffer, + &bytes_read, &read_ret); + ret = rados_read_op_operate(read_op, param->rados_ioctx, oid, 0); + rados_release_read_op(read_op); + if (ret || read_ret || ((IOR_offset_t)bytes_read != length)) + RADOS_ERR("unable to read RADOS object", ret); + } + + return length; +} + +static void RADOS_Fsync(void *fd, IOR_param_t * param) +{ + return; +} + +static void RADOS_Close(void *fd, IOR_param_t * param) +{ + char *oid = (char *)fd; + + /* object does not need to be "closed", but we should tear the cluster down */ + RADOS_Cluster_Finalize(param); + free(oid); + + return; +} + +static void RADOS_Delete(char *testFileName, IOR_param_t * param) +{ + int ret; + char *oid = testFileName; + rados_write_op_t remove_op; + + /* we have to reestablish cluster connection here... */ + RADOS_Cluster_Init(param); + + /* remove the object */ + remove_op = rados_create_write_op(); + rados_write_op_remove(remove_op); + ret = rados_write_op_operate(remove_op, param->rados_ioctx, + oid, NULL, 0); + rados_release_write_op(remove_op); + if (ret) + RADOS_ERR("unable to remove RADOS object", ret); + + RADOS_Cluster_Finalize(param); + + return; +} + +static void RADOS_SetVersion(IOR_param_t * test) +{ + strcpy(test->apiVersion, test->api); + return; +} + +static IOR_offset_t RADOS_GetFileSize(IOR_param_t * test, MPI_Comm testComm, + char *testFileName) +{ + int ret; + char *oid = testFileName; + rados_read_op_t stat_op; + uint64_t oid_size; + int stat_ret; + IOR_offset_t aggSizeFromStat, tmpMin, tmpMax, tmpSum; + + /* we have to reestablish cluster connection here... */ + RADOS_Cluster_Init(test); + + /* stat the object */ + stat_op = rados_create_read_op(); + rados_read_op_stat(stat_op, &oid_size, NULL, &stat_ret); + ret = rados_read_op_operate(stat_op, test->rados_ioctx, oid, 0); + rados_release_read_op(stat_op); + if (ret || stat_ret) + RADOS_ERR("unable to stat RADOS object", stat_ret); + aggSizeFromStat = oid_size; + + if (test->filePerProc == TRUE) + { + MPI_CHECK(MPI_Allreduce(&aggSizeFromStat, &tmpSum, 1, + MPI_LONG_LONG_INT, MPI_SUM, testComm), + "cannot total data moved"); + aggSizeFromStat = tmpSum; + } + else + { + MPI_CHECK(MPI_Allreduce(&aggSizeFromStat, &tmpMin, 1, + MPI_LONG_LONG_INT, MPI_MIN, testComm), + "cannot total data moved"); + MPI_CHECK(MPI_Allreduce(&aggSizeFromStat, &tmpMax, 1, + MPI_LONG_LONG_INT, MPI_MAX, testComm), + "cannot total data moved"); + if (tmpMin != tmpMax) + { + if (rank == 0) + WARN("inconsistent file size by different tasks"); + + /* incorrect, but now consistent across tasks */ + aggSizeFromStat = tmpMin; + } + } + + RADOS_Cluster_Finalize(test); + + return aggSizeFromStat; +} + +static int RADOS_StatFS(const char *oid, ior_aiori_statfs_t *stat_buf, + IOR_param_t *param) +{ + WARN("statfs not supported in RADOS backend!"); + return -1; +} + +static int RADOS_MkDir(const char *oid, mode_t mode, IOR_param_t *param) +{ + WARN("mkdir not supported in RADOS backend!"); + return -1; +} + +static int RADOS_RmDir(const char *oid, IOR_param_t *param) +{ + WARN("rmdir not supported in RADOS backend!"); + return -1; +} + +static int RADOS_Access(const char *oid, int mode, IOR_param_t *param) +{ + rados_read_op_t read_op; + int ret; + int prval; + uint64_t oid_size; + + /* we have to reestablish cluster connection here... */ + RADOS_Cluster_Init(param); + + /* use read_op stat to check for oid existence */ + read_op = rados_create_read_op(); + rados_read_op_stat(read_op, &oid_size, NULL, &prval); + ret = rados_read_op_operate(read_op, param->rados_ioctx, oid, 0); + rados_release_read_op(read_op); + + RADOS_Cluster_Finalize(param); + + if (ret | prval) + return -1; + else + return 0; +} + +static int RADOS_Stat(const char *oid, struct stat *buf, IOR_param_t *param) +{ + WARN("stat not supported in RADOS backend!"); + return -1; +} diff --git a/src/aiori.c b/src/aiori.c index a32a147..c36de4a 100644 --- a/src/aiori.c +++ b/src/aiori.c @@ -51,6 +51,9 @@ ior_aiori_t *available_aiori[] = { &s3_aiori, &s3_plus_aiori, &s3_emc_aiori, +#endif +#ifdef USE_RADOS_AIORI + &rados_aiori, #endif NULL }; diff --git a/src/aiori.h b/src/aiori.h index a253aca..94c99e6 100755 --- a/src/aiori.h +++ b/src/aiori.h @@ -89,6 +89,7 @@ extern ior_aiori_t mmap_aiori; extern ior_aiori_t s3_aiori; extern ior_aiori_t s3_plus_aiori; extern ior_aiori_t s3_emc_aiori; +extern ior_aiori_t rados_aiori; const ior_aiori_t *aiori_select (const char *api); int aiori_count (void); @@ -101,18 +102,10 @@ int aiori_posix_rmdir (const char *path, IOR_param_t * param); int aiori_posix_access (const char *path, int mode, IOR_param_t * param); int aiori_posix_stat (const char *path, struct stat *buf, IOR_param_t * param); -IOR_offset_t MPIIO_GetFileSize(IOR_param_t * test, MPI_Comm testComm, - char *testFileName); /* NOTE: these 3 MPI-IO functions are exported for reuse by HDF5/PNetCDF */ void MPIIO_Delete(char *testFileName, IOR_param_t * param); +IOR_offset_t MPIIO_GetFileSize(IOR_param_t * test, MPI_Comm testComm, + char *testFileName); int MPIIO_Access(const char *, int, IOR_param_t *); -void *POSIX_Create(char *testFileName, IOR_param_t *test); -void *POSIX_Open(char *testFileName, IOR_param_t *test); -void POSIX_Close(void *fd, IOR_param_t *test); -void POSIX_Delete(char *testFileName, IOR_param_t *test); -void POSIX_SetVersion(IOR_param_t *test); -IOR_offset_t POSIX_GetFileSize(IOR_param_t *test, MPI_Comm testComm, - char *testFileName); - #endif /* not _AIORI_H */ diff --git a/src/ior.c b/src/ior.c index c09aeed..b53f867 100755 --- a/src/ior.c +++ b/src/ior.c @@ -1685,11 +1685,11 @@ static void ValidateTests(IOR_param_t * test) if ((strcasecmp(test->api, "POSIX") != 0) && test->singleXferAttempt) WARN_RESET("retry only available in POSIX", test, &defaults, singleXferAttempt); - if ((strcasecmp(test->api, "POSIX") != 0) && - (strcasecmp(test->api, "MMAP") != 0) && - (strcasecmp(test->api, "MPIIO") != 0) - && test->fsync) - WARN_RESET("fsync() only available in POSIX/MMAP", + if (((strcasecmp(test->api, "POSIX") != 0) + && (strcasecmp(test->api, "MPIIO") != 0) + && (strcasecmp(test->api, "MMAP") != 0) && + && (strcasecmp(test->api, "HDFS") != 0)) && test->fsync) + WARN_RESET("fsync() not supported in selected backend", test, &defaults, fsync); if ((strcasecmp(test->api, "MPIIO") != 0) && test->preallocate) WARN_RESET("preallocation only available in MPIIO", diff --git a/src/ior.h b/src/ior.h index bfda441..e8e465a 100755 --- a/src/ior.h +++ b/src/ior.h @@ -37,7 +37,12 @@ typedef void IOBuf; /* unused, but needs a type */ #endif - +#ifdef USE_RADOS_AIORI +# include +#else + typedef void *rados_t; + typedef void *rados_ioctx_t; +#endif #include "iordef.h" /******************** DATA Packet Type ***************************************/ @@ -188,6 +193,10 @@ typedef struct # define MAX_UPLOAD_ID_SIZE 256 /* seems to be 32, actually */ char UploadId[MAX_UPLOAD_ID_SIZE +1]; /* key for multi-part-uploads */ + /* RADOS variables */ + rados_t rados_cluster; /* RADOS cluster handle */ + rados_ioctx_t rados_ioctx; /* I/O context for our pool in the RADOS cluster */ + /* NCMPI variables */ int var_id; /* variable id handle for data set */