mdtest/src/aiori-CEPHFS.c

386 lines
12 KiB
C
Executable File

/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*/
/******************************************************************************\
* *
* (C) 2015 The University of Chicago *
* (C) 2020 Red Hat, Inc. *
* *
* See COPYRIGHT in top-level directory. *
* *
********************************************************************************
*
* Implement abstract I/O interface for CEPHFS.
*
\******************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <cephfs/libcephfs.h>
#include "ior.h"
#include "iordef.h"
#include "aiori.h"
#include "utilities.h"
#define CEPH_O_RDONLY 00000000
#define CEPH_O_WRONLY 00000001
#define CEPH_O_RDWR 00000002
#define CEPH_O_CREAT 00000100
#define CEPH_O_EXCL 00000200
#define CEPH_O_TRUNC 00001000
#define CEPH_O_LAZY 00020000
#define CEPH_O_DIRECTORY 00200000
#define CEPH_O_NOFOLLOW 00400000
/************************** O P T I O N S *****************************/
struct cephfs_options{
char * user;
char * conf;
char * prefix;
};
static struct cephfs_options o = {
.user = NULL,
.conf = NULL,
.prefix = NULL,
};
static option_help options [] = {
{0, "cephfs.user", "Username for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.user},
{0, "cephfs.conf", "Config file for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.conf},
{0, "cephfs.prefix", "mount prefix", OPTION_OPTIONAL_ARGUMENT, 's', & o.prefix},
LAST_OPTION
};
static struct ceph_mount_info *cmount;
/**************************** P R O T O T Y P E S *****************************/
static void CEPHFS_Init();
static void CEPHFS_Final();
static void *CEPHFS_Create(char *, IOR_param_t *);
static void *CEPHFS_Open(char *, IOR_param_t *);
static IOR_offset_t CEPHFS_Xfer(int, void *, IOR_size_t *,
IOR_offset_t, IOR_param_t *);
static void CEPHFS_Close(void *, IOR_param_t *);
static void CEPHFS_Delete(char *, IOR_param_t *);
static void CEPHFS_Fsync(void *, IOR_param_t *);
static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t *, MPI_Comm, char *);
static int CEPHFS_StatFS(const char *, ior_aiori_statfs_t *, IOR_param_t *);
static int CEPHFS_MkDir(const char *, mode_t, IOR_param_t *);
static int CEPHFS_RmDir(const char *, IOR_param_t *);
static int CEPHFS_Access(const char *, int, IOR_param_t *);
static int CEPHFS_Stat(const char *, struct stat *, IOR_param_t *);
static void CEPHFS_Sync(IOR_param_t *);
static option_help * CEPHFS_options();
/************************** D E C L A R A T I O N S ***************************/
ior_aiori_t cephfs_aiori = {
.name = "CEPHFS",
.name_legacy = NULL,
.initialize = CEPHFS_Init,
.finalize = CEPHFS_Final,
.create = CEPHFS_Create,
.open = CEPHFS_Open,
.xfer = CEPHFS_Xfer,
.close = CEPHFS_Close,
.delete = CEPHFS_Delete,
.get_version = aiori_get_version,
.fsync = CEPHFS_Fsync,
.get_file_size = CEPHFS_GetFileSize,
.statfs = CEPHFS_StatFS,
.mkdir = CEPHFS_MkDir,
.rmdir = CEPHFS_RmDir,
.access = CEPHFS_Access,
.stat = CEPHFS_Stat,
.sync = CEPHFS_Sync,
.get_options = CEPHFS_options,
};
#define CEPHFS_ERR(__err_str, __ret) do { \
errno = -__ret; \
ERR(__err_str); \
} while(0)
/***************************** F U N C T I O N S ******************************/
static const char* pfix(const char* path) {
const char* npath = path;
const char* prefix = o.prefix;
while (*prefix) {
if(*prefix++ != *npath++) {
return path;
}
}
return npath;
}
static option_help * CEPHFS_options(){
return options;
}
static void CEPHFS_Init()
{
/* Short circuit if the options haven't been filled yet. */
if (!o.user || !o.conf || !o.prefix) {
WARN("CEPHFS_Init() called before options have been populated!");
return;
}
/* Short circuit if the mount handle already exists */
if (cmount) {
return;
}
int ret;
/* create CEPHFS mount handle */
ret = ceph_create(&cmount, o.user);
if (ret) {
CEPHFS_ERR("unable to create CEPHFS mount handle", ret);
}
/* set the handle using the Ceph config */
ret = ceph_conf_read_file(cmount, o.conf);
if (ret) {
CEPHFS_ERR("unable to read ceph config file", ret);
}
/* mount the handle */
ret = ceph_mount(cmount, "/");
if (ret) {
CEPHFS_ERR("unable to mount cephfs", ret);
ceph_shutdown(cmount);
}
Inode *root;
/* try retrieving the root cephfs inode */
ret = ceph_ll_lookup_root(cmount, &root);
if (ret) {
CEPHFS_ERR("uanble to retrieve root cephfs inode", ret);
ceph_shutdown(cmount);
}
return;
}
static void CEPHFS_Final()
{
/* shutdown */
int ret = ceph_unmount(cmount);
if (ret < 0) {
CEPHFS_ERR("ceph_umount failed", ret);
}
ret = ceph_release(cmount);
if (ret < 0) {
CEPHFS_ERR("ceph_release failed", ret);
}
cmount = NULL;
}
static void *CEPHFS_Create(char *testFileName, IOR_param_t * param)
{
return CEPHFS_Open(testFileName, param);
}
static void *CEPHFS_Open(char *testFileName, IOR_param_t * param)
{
const char *file = pfix(testFileName);
int* fd;
fd = (int *)malloc(sizeof(int));
mode_t mode = 0664;
int flags = (int) 0;
/* set IOR file flags to CephFS flags */
/* -- file open flags -- */
if (param->openFlags & IOR_RDONLY) {
flags |= CEPH_O_RDONLY;
}
if (param->openFlags & IOR_WRONLY) {
flags |= CEPH_O_WRONLY;
}
if (param->openFlags & IOR_RDWR) {
flags |= CEPH_O_RDWR;
}
if (param->openFlags & IOR_APPEND) {
fprintf(stdout, "File append not implemented in CephFS\n");
}
if (param->openFlags & IOR_CREAT) {
flags |= CEPH_O_CREAT;
}
if (param->openFlags & IOR_EXCL) {
flags |= CEPH_O_EXCL;
}
if (param->openFlags & IOR_TRUNC) {
flags |= CEPH_O_TRUNC;
}
if (param->openFlags & IOR_DIRECT) {
fprintf(stdout, "O_DIRECT not implemented in CephFS\n");
}
*fd = ceph_open(cmount, file, flags, mode);
if (*fd < 0) {
CEPHFS_ERR("ceph_open failed", *fd);
}
return (void *) fd;
}
static IOR_offset_t CEPHFS_Xfer(int access, void *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_param_t * param)
{
uint64_t size = (uint64_t) length;
char *buf = (char *) buffer;
int fd = *(int *) file;
int ret;
if (access == WRITE)
{
ret = ceph_write(cmount, fd, buf, size, param->offset);
if (ret < 0) {
CEPHFS_ERR("unable to write file to CephFS", ret);
} else if (ret < size) {
CEPHFS_ERR("short write to CephFS", ret);
}
if (param->fsyncPerWrite == TRUE) {
CEPHFS_Fsync(&fd, param);
}
}
else /* READ */
{
ret = ceph_read(cmount, fd, buf, size, param->offset);
if (ret < 0) {
CEPHFS_ERR("unable to read file from CephFS", ret);
} else if (ret < size) {
CEPHFS_ERR("short read from CephFS", ret);
}
}
return length;
}
static void CEPHFS_Fsync(void *file, IOR_param_t * param)
{
int fd = *(int *) file;
int ret = ceph_fsync(cmount, fd, 0);
if (ret < 0) {
CEPHFS_ERR("ceph_fsync failed", ret);
}
}
static void CEPHFS_Close(void *file, IOR_param_t * param)
{
int fd = *(int *) file;
int ret = ceph_close(cmount, fd);
if (ret < 0) {
CEPHFS_ERR("ceph_close failed", ret);
}
free(file);
return;
}
static void CEPHFS_Delete(char *testFileName, IOR_param_t * param)
{
int ret = ceph_unlink(cmount, pfix(testFileName));
if (ret < 0) {
CEPHFS_ERR("ceph_unlink failed", ret);
}
return;
}
static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t * param, MPI_Comm testComm,
char *testFileName)
{
struct stat stat_buf;
IOR_offset_t aggFileSizeFromStat, tmpMin, tmpMax, tmpSum;
int ret = ceph_stat(cmount, pfix(testFileName), &stat_buf);
if (ret < 0) {
CEPHFS_ERR("ceph_stat failed", ret);
}
aggFileSizeFromStat = stat_buf.st_size;
if (param->filePerProc == TRUE) {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1,
MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
aggFileSizeFromStat = tmpSum;
} else {
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1,
MPI_LONG_LONG_INT, MPI_MIN, testComm),
"cannot total data moved");
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1,
MPI_LONG_LONG_INT, MPI_MAX, testComm),
"cannot total data moved");
if (tmpMin != tmpMax) {
if (rank == 0) {
WARN("inconsistent file size by different tasks");
}
/* incorrect, but now consistent across tasks */
aggFileSizeFromStat = tmpMin;
}
}
return (aggFileSizeFromStat);
}
static int CEPHFS_StatFS(const char *path, ior_aiori_statfs_t *stat_buf,
IOR_param_t *param)
{
#if defined(HAVE_STATVFS)
struct statvfs statfs_buf;
int ret = ceph_statfs(cmount, pfix(path), &statfs_buf);
if (ret < 0) {
CEPHFS_ERR("ceph_statfs failed", ret);
return -1;
}
stat_buf->f_bsize = statfs_buf.f_bsize;
stat_buf->f_blocks = statfs_buf.f_blocks;
stat_buf->f_bfree = statfs_buf.f_bfree;
stat_buf->f_files = statfs_buf.f_files;
stat_buf->f_ffree = statfs_buf.f_ffree;
return 0;
#else
WARN("ceph_statfs requires statvfs!");
return -1;
#endif
}
static int CEPHFS_MkDir(const char *path, mode_t mode, IOR_param_t *param)
{
return ceph_mkdir(cmount, pfix(path), mode);
}
static int CEPHFS_RmDir(const char *path, IOR_param_t *param)
{
return ceph_rmdir(cmount, pfix(path));
}
static int CEPHFS_Access(const char *testFileName, int mode, IOR_param_t *param)
{
struct stat buf;
return ceph_stat(cmount, pfix(testFileName), &buf);
}
static int CEPHFS_Stat(const char *testFileName, struct stat *buf, IOR_param_t *param)
{
return ceph_stat(cmount, pfix(testFileName), buf);
}
static void CEPHFS_Sync(IOR_param_t *param)
{
int ret = ceph_sync_fs(cmount);
if (ret < 0) {
CEPHFS_ERR("ceph_sync_fs failed", ret);
}
}