From 9649a0c520a732cdae17cc774672e181525ed526 Mon Sep 17 00:00:00 2001 From: Mark Nelson Date: Thu, 5 Mar 2020 01:40:05 +0000 Subject: [PATCH] src/aiori-CEPHFS: New libcephfs backend Signed-off-by: Mark Nelson --- configure.ac | 14 ++ src/Makefile.am | 6 + src/aiori-CEPHFS.c | 361 +++++++++++++++++++++++++++++++++++++++++++++ src/aiori.c | 3 + src/aiori.h | 1 + src/ior.c | 3 +- src/ior.h | 1 - 7 files changed, 387 insertions(+), 2 deletions(-) create mode 100755 src/aiori-CEPHFS.c diff --git a/configure.ac b/configure.ac index 5bdd78b..cc7ddba 100755 --- a/configure.ac +++ b/configure.ac @@ -197,6 +197,20 @@ AM_COND_IF([USE_RADOS_AIORI],[ AC_DEFINE([USE_RADOS_AIORI], [], [Build RADOS backend AIORI]) ]) +# CEPHFS support +AC_ARG_WITH([cephfs], + [AS_HELP_STRING([--with-cephfs], + [support IO with libcephfs backend @<:@default=no@:>@])], + [], + [with_cephfs=no]) +AS_IF([test "x$with_cephfs" != xno], [ + CPPFLAGS="$CPPFLAGS -D_FILE_OFFSET_BITS=64 -std=gnu11" +]) +AM_CONDITIONAL([USE_CEPHFS_AIORI], [test x$with_cephfs = xyes]) +AM_COND_IF([USE_CEPHFS_AIORI],[ + AC_DEFINE([USE_CEPHFS_AIORI], [], [Build CEPHFS backend AIORI]) +]) + # DAOS Backends (DAOS and DFS) IO support require DAOS and CART/GURT AC_ARG_WITH([cart], [AS_HELP_STRING([--with-cart], diff --git a/src/Makefile.am b/src/Makefile.am index 56b4279..1c37a6e 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -70,6 +70,12 @@ extraSOURCES += aiori-RADOS.c extraLDADD += -lrados endif +if USE_CEPHFS_AIORI +extraSOURCES += aiori-CEPHFS.c +extraLDADD += -lcephfs +endif + + if USE_DAOS_AIORI extraSOURCES += aiori-DAOS.c aiori-DFS.c endif diff --git a/src/aiori-CEPHFS.c b/src/aiori-CEPHFS.c new file mode 100755 index 0000000..d5c6154 --- /dev/null +++ b/src/aiori-CEPHFS.c @@ -0,0 +1,361 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + */ +/******************************************************************************\ +* * +* (C) 2015 The University of Chicago * +* (C) 2020 Red Hat, Inc. * +* * +* See COPYRIGHT in top-level directory. * +* * +******************************************************************************** +* +* Implement abstract I/O interface for CEPHFS. +* +\******************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include +#include +#include + +#include "ior.h" +#include "iordef.h" +#include "aiori.h" +#include "utilities.h" + +#define CEPH_O_RDONLY 00000000 +#define CEPH_O_WRONLY 00000001 +#define CEPH_O_RDWR 00000002 +#define CEPH_O_CREAT 00000100 +#define CEPH_O_EXCL 00000200 +#define CEPH_O_TRUNC 00001000 +#define CEPH_O_LAZY 00020000 +#define CEPH_O_DIRECTORY 00200000 +#define CEPH_O_NOFOLLOW 00400000 + +/************************** O P T I O N S *****************************/ +struct cephfs_options{ + char * user; + char * conf; + char * prefix; +}; + +static struct cephfs_options o = { + .user = NULL, + .conf = NULL, + .prefix = NULL, +}; + +static option_help options [] = { + {0, "cephfs.user", "Username for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.user}, + {0, "cephfs.conf", "Config file for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.conf}, + {0, "cephfs.prefix", "mount prefix", OPTION_OPTIONAL_ARGUMENT, 's', & o.prefix}, + LAST_OPTION +}; + +static struct ceph_mount_info *cmount; + +/**************************** P R O T O T Y P E S *****************************/ +static void CEPHFS_Init(); +static void CEPHFS_Final(); +static void *CEPHFS_Create(char *, IOR_param_t *); +static void *CEPHFS_Open(char *, IOR_param_t *); +static IOR_offset_t CEPHFS_Xfer(int, void *, IOR_size_t *, + IOR_offset_t, IOR_param_t *); +static void CEPHFS_Close(void *, IOR_param_t *); +static void CEPHFS_Delete(char *, IOR_param_t *); +static void CEPHFS_Fsync(void *, IOR_param_t *); +static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t *, MPI_Comm, char *); +static int CEPHFS_StatFS(const char *, ior_aiori_statfs_t *, IOR_param_t *); +static int CEPHFS_MkDir(const char *, mode_t, IOR_param_t *); +static int CEPHFS_RmDir(const char *, IOR_param_t *); +static int CEPHFS_Access(const char *, int, IOR_param_t *); +static int CEPHFS_Stat(const char *, struct stat *, IOR_param_t *); +static option_help * CEPHFS_options(); + +/************************** D E C L A R A T I O N S ***************************/ +ior_aiori_t cephfs_aiori = { + .name = "CEPHFS", + .name_legacy = NULL, + .initialize = CEPHFS_Init, + .finalize = CEPHFS_Final, + .create = CEPHFS_Create, + .open = CEPHFS_Open, + .xfer = CEPHFS_Xfer, + .close = CEPHFS_Close, + .delete = CEPHFS_Delete, + .get_version = aiori_get_version, + .fsync = CEPHFS_Fsync, + .get_file_size = CEPHFS_GetFileSize, + .statfs = CEPHFS_StatFS, + .mkdir = CEPHFS_MkDir, + .rmdir = CEPHFS_RmDir, + .access = CEPHFS_Access, + .stat = CEPHFS_Stat, + .get_options = CEPHFS_options, +}; + +#define CEPHFS_ERR(__err_str, __ret) do { \ + errno = -__ret; \ + ERR(__err_str); \ +} while(0) + +/***************************** F U N C T I O N S ******************************/ +static const char* pfix(const char* path) { + const char* npath = path; + const char* prefix = o.prefix; + while (*prefix) { + if(*prefix++ != *npath++) { + return path; + } + } + return npath; +} + +static option_help * CEPHFS_options(){ + return options; +} + +static void CEPHFS_Init() +{ + /* Short circuit if the mount handle already exists */ + if (cmount) { + return; + } + + int ret; + /* create CEPHFS mount handle */ + ret = ceph_create(&cmount, o.user); + if (ret) { + CEPHFS_ERR("unable to create CEPHFS mount handle", ret); + } + + /* set the handle using the Ceph config */ + ret = ceph_conf_read_file(cmount, o.conf); + if (ret) { + CEPHFS_ERR("unable to read ceph config file", ret); + } + + /* mount the handle */ + ret = ceph_mount(cmount, "/"); + if (ret) { + CEPHFS_ERR("unable to mount cephfs", ret); + ceph_shutdown(cmount); + + } + + Inode *root; + + /* try retrieving the root cephfs inode */ + ret = ceph_ll_lookup_root(cmount, &root); + if (ret) { + CEPHFS_ERR("uanble to retrieve root cephfs inode", ret); + ceph_shutdown(cmount); + + } + + return; +} + +static void CEPHFS_Final() +{ + /* shutdown */ + ceph_shutdown(cmount); +} + +static void *CEPHFS_Create(char *testFileName, IOR_param_t * param) +{ + return CEPHFS_Open(testFileName, param); +} + +static void *CEPHFS_Open(char *testFileName, IOR_param_t * param) +{ + const char *file = pfix(testFileName); + int* fd; + fd = (int *)malloc(sizeof(int)); + + mode_t mode = 0664; + int flags = (int) 0; + + /* set IOR file flags to CephFS flags */ + /* -- file open flags -- */ + if (param->openFlags & IOR_RDONLY) { + flags |= CEPH_O_RDONLY; + } + if (param->openFlags & IOR_WRONLY) { + flags |= CEPH_O_WRONLY; + } + if (param->openFlags & IOR_RDWR) { + flags |= CEPH_O_RDWR; + } + if (param->openFlags & IOR_APPEND) { + fprintf(stdout, "File append not implemented in CephFS\n"); + } + if (param->openFlags & IOR_CREAT) { + flags |= CEPH_O_CREAT; + } + if (param->openFlags & IOR_EXCL) { + flags |= CEPH_O_EXCL; + } + if (param->openFlags & IOR_TRUNC) { + flags |= CEPH_O_TRUNC; + } + if (param->openFlags & IOR_DIRECT) { + fprintf(stdout, "O_DIRECT not implemented in CephFS\n"); + } + *fd = ceph_open(cmount, file, flags, mode); + if (*fd < 0) { + CEPHFS_ERR("ceph_open failed", *fd); + } + return (void *) fd; +} + +static IOR_offset_t CEPHFS_Xfer(int access, void *file, IOR_size_t * buffer, + IOR_offset_t length, IOR_param_t * param) +{ + uint64_t size = (uint64_t) length; + char *buf = (char *) buffer; + int fd = *(int *) file; + int ret; + + if (access == WRITE) + { + ret = ceph_write(cmount, fd, buf, size, param->offset); + if (ret < 0) { + CEPHFS_ERR("unable to write file to CephFS", ret); + } else if (ret < size) { + CEPHFS_ERR("short write to CephFS", ret); + } + if (param->fsyncPerWrite == TRUE) { + CEPHFS_Fsync(&fd, param); + } + } + else /* READ */ + { + ret = ceph_read(cmount, fd, buf, size, param->offset); + if (ret < 0) { + CEPHFS_ERR("unable to read file from CephFS", ret); + } else if (ret < size) { + CEPHFS_ERR("short read from CephFS", ret); + } + + } + return length; +} + +static void CEPHFS_Fsync(void *file, IOR_param_t * param) +{ + int fd = *(int *) file; + int ret = ceph_fsync(cmount, fd, 0); + if (ret < 0) { + CEPHFS_ERR("ceph_fsync failed", ret); + } +} + +static void CEPHFS_Close(void *file, IOR_param_t * param) +{ + int fd = *(int *) file; + int ret = ceph_close(cmount, fd); + if (ret < 0) { + CEPHFS_ERR("ceph_close failed", ret); + } + free(file); + + return; +} + +static void CEPHFS_Delete(char *testFileName, IOR_param_t * param) +{ + int ret = ceph_unlink(cmount, pfix(testFileName)); + if (ret < 0) { + CEPHFS_ERR("ceph_unlink failed", ret); + } + return; +} + +static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t * param, MPI_Comm testComm, + char *testFileName) +{ + struct stat stat_buf; + IOR_offset_t aggFileSizeFromStat, tmpMin, tmpMax, tmpSum; + + int ret = ceph_stat(cmount, pfix(testFileName), &stat_buf); + if (ret < 0) { + CEPHFS_ERR("ceph_stat failed", ret); + } + aggFileSizeFromStat = stat_buf.st_size; + + if (param->filePerProc == TRUE) { + MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1, + MPI_LONG_LONG_INT, MPI_SUM, testComm), + "cannot total data moved"); + aggFileSizeFromStat = tmpSum; + } else { + MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1, + MPI_LONG_LONG_INT, MPI_MIN, testComm), + "cannot total data moved"); + MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1, + MPI_LONG_LONG_INT, MPI_MAX, testComm), + "cannot total data moved"); + if (tmpMin != tmpMax) { + if (rank == 0) { + WARN("inconsistent file size by different tasks"); + } + /* incorrect, but now consistent across tasks */ + aggFileSizeFromStat = tmpMin; + } + } + + return (aggFileSizeFromStat); + +} + +static int CEPHFS_StatFS(const char *path, ior_aiori_statfs_t *stat_buf, + IOR_param_t *param) +{ +#if defined(HAVE_STATVFS) + struct statvfs statfs_buf; + int ret = ceph_statfs(cmount, pfix(path), &statfs_buf); + if (ret < 0) { + CEPHFS_ERR("ceph_statfs failed", ret); + return -1; + } + + stat_buf->f_bsize = statfs_buf.f_bsize; + stat_buf->f_blocks = statfs_buf.f_blocks; + stat_buf->f_bfree = statfs_buf.f_bfree; + stat_buf->f_files = statfs_buf.f_files; + stat_buf->f_ffree = statfs_buf.f_ffree; + + return 0; +#else + WARN("ceph_statfs requires statvfs!"); + return -1; +#endif +} + +static int CEPHFS_MkDir(const char *path, mode_t mode, IOR_param_t *param) +{ + return ceph_mkdir(cmount, pfix(path), mode); +} + +static int CEPHFS_RmDir(const char *path, IOR_param_t *param) +{ + return ceph_rmdir(cmount, pfix(path)); +} + +static int CEPHFS_Access(const char *testFileName, int mode, IOR_param_t *param) +{ + struct stat buf; + return ceph_stat(cmount, pfix(testFileName), &buf); +} + +static int CEPHFS_Stat(const char *testFileName, struct stat *buf, IOR_param_t *param) +{ + return ceph_stat(cmount, pfix(testFileName), buf); +} diff --git a/src/aiori.c b/src/aiori.c index a72180d..71f99d1 100644 --- a/src/aiori.c +++ b/src/aiori.c @@ -73,6 +73,9 @@ ior_aiori_t *available_aiori[] = { #ifdef USE_RADOS_AIORI &rados_aiori, #endif +#ifdef USE_CEPHFS_AIORI + &cephfs_aiori, +#endif #ifdef USE_GFARM_AIORI &gfarm_aiori, #endif diff --git a/src/aiori.h b/src/aiori.h index 4be7bd5..4d416c2 100755 --- a/src/aiori.h +++ b/src/aiori.h @@ -109,6 +109,7 @@ extern ior_aiori_t s3_aiori; extern ior_aiori_t s3_plus_aiori; extern ior_aiori_t s3_emc_aiori; extern ior_aiori_t rados_aiori; +extern ior_aiori_t cephfs_aiori; extern ior_aiori_t gfarm_aiori; void aiori_initialize(IOR_test_t * tests); diff --git a/src/ior.c b/src/ior.c index c7c2c27..13868a2 100755 --- a/src/ior.c +++ b/src/ior.c @@ -1618,7 +1618,8 @@ static void ValidateTests(IOR_param_t * test) && (strcasecmp(test->api, "MMAP") != 0) && (strcasecmp(test->api, "HDFS") != 0) && (strcasecmp(test->api, "Gfarm") != 0) - && (strcasecmp(test->api, "RADOS") != 0)) && test->fsync) + && (strcasecmp(test->api, "RADOS") != 0) + && (strcasecmp(test->api, "CEPHFS") != 0)) && test->fsync) WARN_RESET("fsync() not supported in selected backend", test, &defaults, fsync); if ((strcasecmp(test->api, "MPIIO") != 0) && test->preallocate) diff --git a/src/ior.h b/src/ior.h index 6b9d1d7..758b048 100755 --- a/src/ior.h +++ b/src/ior.h @@ -35,7 +35,6 @@ typedef void *rados_t; typedef void *rados_ioctx_t; #endif - #include "option.h" #include "iordef.h"