Add the DAOS ior driver.

Signed-off-by: Mohamad Chaarawi <mohamad.chaarawi@intel.com>
master
Mohamad Chaarawi 2018-08-27 17:22:38 +00:00
parent c9bb4bb7d2
commit f4b03efd72
13 changed files with 1631 additions and 20 deletions

9
.gitignore vendored
View File

@ -1,3 +1,4 @@
tags
Makefile
Makefile.in
aclocal.m4
@ -9,10 +10,12 @@ config/config.sub
config/depcomp
config/install-sh
config/missing
config/test-driver
configure
contrib/.deps/
contrib/Makefile
contrib/Makefile.in
contrib/cbif
doc/Makefile
doc/Makefile.in
src/.deps/
@ -28,7 +31,13 @@ contrib/cbif.o
src/*.o
src/*.i
src/*.s
src/*.a
src/ior
src/mdtest
src/testlib
src/test/.deps/
src/test/.dirstamp
src/test/lib.o
doc/doxygen/build
doc/sphinx/_*/

48
README_DAOS Normal file
View File

@ -0,0 +1,48 @@
Building with DAOS API
----------------------
At step 1 above, one must specify "--with-daos". If the DAOS
headers and libraries are not installed at respective system
default locations, then one may also needs to set CPPFLAGS and
LDFLAGS accordingly.
Running with DAOS API
---------------------
One must specify an existing pool using "-O
daospool=<pool_uuid>". IOR must be launched in a way that
attaches the IOR process group to the DAOS server process group.
One must also specify a container UUID using "-o
<container_uuid>". If the "-E" option is given, then this UUID
shall denote an existing container created by a "matching" IOR
run. Otherwise, IOR will create a new container with this UUID.
In the latter case, one may use uuidgen(1) to generate the UUID
of the new container.
When benchmarking write performance, one likely do not want
"-W", which causes the write phase to do one additional memory
copy for every I/O. This is due to IOR's assumption that when a
DAOS_Xfer() call returns the buffer may be released. Therefore,
random data is written when "-W" is absent, while data is copied
from IOR buffers when "-W" is present.
See doc/USER_GUIDE for all options and directives. Note that not
all combinations of options are supported.
Examples that should work include:
- "ior -a DAOS -w -W -o <container_uuid> -O
daospool=<pool_uuid>,daospoolsvc=<svc_ranks>" writes into a new container
and verifies the data, using default daosRecordSize, transferSize,
daosStripeSize, blockSize, daosAios, etc.
- "ior -a DAOS -w -W -r -R -o <container_uuid> -b 1g -t 4m -C -O
daospool=<pool_uuid>,daospoolsvc=<svc_ranks>,daosrecordsize=1m,
daosstripesize=4m, daosstripecount=256,daosaios=8" does all IOR tests and
shifts ranks during checkWrite and checkRead.
- "ior -a DAOS -w -r -o <container_uuid> -b 8g -t 1m -C -O
daospool=<pool_uuid>,daospoolsvc=<svc_ranks>,daosrecordsize=1m,daosstripesize=4m,
daosstripecount=256,daosaios=8" may be a base to be tuned for performance
benchmarking.

View File

@ -166,6 +166,16 @@ AM_COND_IF([USE_RADOS_AIORI],[
AC_DEFINE([USE_RADOS_AIORI], [], [Build RADOS backend AIORI])
])
# DAOS support
AC_ARG_WITH([daos],
[AS_HELP_STRING([--with-daos],
[support IO with DAOS backend @<:@default=no@:>@])],
[],
[with_daos=no])
AM_CONDITIONAL([USE_DAOS_AIORI], [test x$with_daos = xyes])
AM_COND_IF([USE_DAOS_AIORI],[
AC_DEFINE([USE_DAOS_AIORI], [], [Build DAOS backend AIORI])
])
# aws4c is needed for the S3 backend (see --with-S3, below).

View File

@ -367,6 +367,43 @@ BeeGFS-SPECIFIC (POSIX only):
* beegfsChunkSize - set the striping chunk size. Must be a power of two,
and greater than 64kiB, (e.g.: 256k, 1M, ...)
DAOS-ONLY:
==========
* daosGroup - group name [NULL]
* daosPool - UUID of the pool []
* daosPoolSvc - pool service replica ranks (e.g., 1:2:3:4:5) []
* daosRecordSize - size (in bytes) of an akey record [256k]
NOTE: must divide transferSize
* daosStripeSize - size (in bytes) of a chunk in a stripe [512k]
NOTE: must be a multiple of transferSize
* daosStripeCount - number of stripes [64 * number of targets]
NOTE: i.e., number of dkeys
* daosStripeMax - max length of each stripe [0]
NOTE: must be a multiple of daosStripeSize
NOTE: for write testing with small storage
NOTE: offsets in a stripe larger than daosStripeMax
are mapped to offset % daosStripeMax
* daosAios - max number of asychonous I/Os [1]
* daosWriteOnly - skip flushing and committing [0=FALSE]
* daosEpoch - epoch to read or write [0]
NOTE: 0 denotes reading GHCE or writing GHCE + 1
* daosWait - epoch to wait when opening the container [0]
* daosKill - kill a target in the middle of the test [0]
NOTE: must also specify daosObjectClass=repl
* daosObjectClass - object class (tiny, small, large, repl, repl_max)
[large]
***********************
* 5. VERBOSITY LEVELS *

View File

@ -70,6 +70,11 @@ extraSOURCES += aiori-RADOS.c
extraLDADD += -lrados
endif
if USE_DAOS_AIORI
extraSOURCES += aiori-DAOS.c list.h
extraLDADD += -ldaos -ldaos_common -luuid
endif
if USE_S3_AIORI
extraSOURCES += aiori-S3.c
if AWS4C_DIR

889
src/aiori-DAOS.c Normal file
View File

@ -0,0 +1,889 @@
/*
* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*/
/*
* SPECIAL LICENSE RIGHTS-OPEN SOURCE SOFTWARE
* The Government's rights to use, modify, reproduce, release, perform, display,
* or disclose this software are subject to the terms of Contract No. B599860,
* and the terms of the GNU General Public License version 2.
* Any reproduction of computer software, computer software documentation, or
* portions thereof marked with this legend must also reproduce the markings.
*/
/*
* Copyright (c) 2013, 2016 Intel Corporation.
*/
/*
* This file implements the abstract I/O interface for DAOS.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdint.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <libgen.h>
#include <stdbool.h>
#include <daos.h>
#include <daos_types.h>
#include "ior.h"
#include "aiori.h"
#include "iordef.h"
#include "list.h"
/**************************** P R O T O T Y P E S *****************************/
static void DAOS_Init(IOR_param_t *);
static void DAOS_Fini(IOR_param_t *);
static void *DAOS_Create(char *, IOR_param_t *);
static void *DAOS_Open(char *, IOR_param_t *);
static IOR_offset_t DAOS_Xfer(int, void *, IOR_size_t *,
IOR_offset_t, IOR_param_t *);
static void DAOS_Close(void *, IOR_param_t *);
static void DAOS_Delete(char *, IOR_param_t *);
static char* DAOS_GetVersion();
static void DAOS_Fsync(void *, IOR_param_t *);
static IOR_offset_t DAOS_GetFileSize(IOR_param_t *, MPI_Comm, char *);
/************************** D E C L A R A T I O N S ***************************/
ior_aiori_t daos_aiori = {
.name = "DAOS",
.create = DAOS_Create,
.open = DAOS_Open,
.xfer = DAOS_Xfer,
.close = DAOS_Close,
.delete = DAOS_Delete,
.get_version = DAOS_GetVersion,
.fsync = DAOS_Fsync,
.get_file_size = DAOS_GetFileSize,
.initialize = DAOS_Init,
.finalize = DAOS_Fini,
};
enum handleType {
POOL_HANDLE,
CONTAINER_HANDLE
};
struct fileDescriptor {
daos_handle_t container;
daos_cont_info_t containerInfo;
daos_handle_t object;
daos_epoch_t epoch;
};
struct aio {
cfs_list_t a_list;
char a_dkeyBuf[32];
daos_key_t a_dkey;
daos_recx_t a_recx;
unsigned char a_csumBuf[32];
daos_csum_buf_t a_csum;
daos_epoch_range_t a_epochRange;
daos_iod_t a_iod;
daos_iov_t a_iov;
daos_sg_list_t a_sgl;
struct daos_event a_event;
};
static daos_handle_t eventQueue;
static struct daos_event **events;
static unsigned char *buffers;
static int nAios;
static daos_handle_t pool;
static daos_pool_info_t poolInfo;
static daos_oclass_id_t objectClass = DAOS_OC_LARGE_RW;
static CFS_LIST_HEAD(aios);
static IOR_offset_t total_size;
/***************************** F U N C T I O N S ******************************/
/* For DAOS methods. */
#define DCHECK(rc, format, ...) \
do { \
int _rc = (rc); \
\
if (_rc < 0) { \
fprintf(stdout, "ior ERROR (%s:%d): %d: %d: " \
format"\n", __FILE__, __LINE__, rank, _rc, \
##__VA_ARGS__); \
fflush(stdout); \
MPI_Abort(MPI_COMM_WORLD, -1); \
} \
} while (0)
#define INFO(level, param, format, ...) \
do { \
if (param->verbose >= level) \
printf("[%d] "format"\n", rank, ##__VA_ARGS__); \
} while (0)
/* For generic errors like invalid command line options. */
#define GERR(format, ...) \
do { \
fprintf(stdout, format"\n", ##__VA_ARGS__); \
MPI_CHECK(MPI_Abort(MPI_COMM_WORLD, -1), "MPI_Abort() error"); \
} while (0)
/* Distribute process 0's pool or container handle to others. */
static void HandleDistribute(daos_handle_t *handle, enum handleType type,
IOR_param_t *param)
{
daos_iov_t global;
int rc;
assert(type == POOL_HANDLE || !daos_handle_is_inval(pool));
global.iov_buf = NULL;
global.iov_buf_len = 0;
global.iov_len = 0;
if (rank == 0) {
/* Get the global handle size. */
if (type == POOL_HANDLE)
rc = daos_pool_local2global(*handle, &global);
else
rc = daos_cont_local2global(*handle, &global);
DCHECK(rc, "Failed to get global handle size");
}
MPI_CHECK(MPI_Bcast(&global.iov_buf_len, 1, MPI_UINT64_T, 0,
param->testComm),
"Failed to bcast global handle buffer size");
global.iov_buf = malloc(global.iov_buf_len);
if (global.iov_buf == NULL)
ERR("Failed to allocate global handle buffer");
if (rank == 0) {
if (type == POOL_HANDLE)
rc = daos_pool_local2global(*handle, &global);
else
rc = daos_cont_local2global(*handle, &global);
DCHECK(rc, "Failed to create global handle");
}
MPI_CHECK(MPI_Bcast(global.iov_buf, global.iov_buf_len, MPI_BYTE, 0,
param->testComm),
"Failed to bcast global pool handle");
if (rank != 0) {
/* A larger-than-actual length works just fine. */
global.iov_len = global.iov_buf_len;
if (type == POOL_HANDLE)
rc = daos_pool_global2local(global, handle);
else
rc = daos_cont_global2local(pool, global, handle);
DCHECK(rc, "Failed to get local handle");
}
free(global.iov_buf);
}
static void ContainerOpen(char *testFileName, IOR_param_t *param,
daos_handle_t *container, daos_cont_info_t *info)
{
int rc;
if (rank == 0) {
uuid_t uuid;
unsigned int dFlags;
rc = uuid_parse(testFileName, uuid);
DCHECK(rc, "Failed to parse 'testFile': %s", testFileName);
if (param->open == WRITE &&
param->useExistingTestFile == FALSE) {
INFO(VERBOSE_2, param, "Creating container %s",
testFileName);
rc = daos_cont_create(pool, uuid, NULL /* ev */);
DCHECK(rc, "Failed to create container %s",
testFileName);
}
INFO(VERBOSE_2, param, "Openning container %s", testFileName);
if (param->open == WRITE)
dFlags = DAOS_COO_RW;
else
dFlags = DAOS_COO_RO;
rc = daos_cont_open(pool, uuid, dFlags, container, info,
NULL /* ev */);
DCHECK(rc, "Failed to open container %s", testFileName);
INFO(VERBOSE_2, param, "Container epoch state:");
INFO(VERBOSE_2, param, " HCE: %lu",
info->ci_epoch_state.es_hce);
INFO(VERBOSE_2, param, " LRE: %lu",
info->ci_epoch_state.es_lre);
INFO(VERBOSE_2, param, " LHE: %lu (%lx)",
info->ci_epoch_state.es_lhe, info->ci_epoch_state.es_lhe);
INFO(VERBOSE_2, param, " GHCE: %lu",
info->ci_epoch_state.es_ghce);
INFO(VERBOSE_2, param, " GLRE: %lu",
info->ci_epoch_state.es_glre);
INFO(VERBOSE_2, param, " GHPCE: %lu",
info->ci_epoch_state.es_ghpce);
#if 0
if (param->open != WRITE && param->daosWait != 0) {
daos_epoch_t e;
e = param->daosWait;
INFO(VERBOSE_2, param, "Waiting for epoch %lu", e);
rc = daos_epoch_wait(*container, &e,
NULL /* ignore HLE */,
NULL /* synchronous */);
DCHECK(rc, "Failed to wait for epoch %lu",
param->daosWait);
}
if (param->open == WRITE &&
param->useExistingTestFile == FALSE) {
daos_oclass_attr_t attr = {
.ca_schema = DAOS_OS_STRIPED,
.ca_resil_degree = 0,
.ca_resil = DAOS_RES_REPL,
.ca_grp_nr = 4,
.u.repl = {
.r_method = 0,
.r_num = 2
}
};
INFO(VERBOSE_2, param, "Registering object class");
rc = daos_oclass_register(container, objectClass, &attr,
NULL /* ev */);
DCHECK(rc, "Failed to register object class");
}
#endif
}
HandleDistribute(container, CONTAINER_HANDLE, param);
MPI_CHECK(MPI_Bcast(info, sizeof *info, MPI_BYTE, 0, param->testComm),
"Failed to broadcast container info");
}
static void ContainerClose(daos_handle_t container, IOR_param_t *param)
{
int rc;
if (rank != 0) {
rc = daos_cont_close(container, NULL /* ev */);
DCHECK(rc, "Failed to close container");
}
/* An MPI_Gather() call would probably be more efficient. */
MPI_CHECK(MPI_Barrier(param->testComm),
"Failed to synchronize processes");
if (rank == 0) {
rc = daos_cont_close(container, NULL /* ev */);
DCHECK(rc, "Failed to close container");
}
}
static void ObjectOpen(daos_handle_t container, daos_handle_t *object,
daos_epoch_t epoch, IOR_param_t *param)
{
daos_obj_id_t oid;
unsigned int flags;
int rc;
oid.hi = 0;
oid.lo = 1;
daos_obj_id_generate(&oid, 0, objectClass);
#if 0
/** declaring object not implemented commenting it */
if (rank == 0 && param->open == WRITE &&
param->useExistingTestFile == FALSE) {
INFO(VERBOSE_2, param, "Declaring object");
rc = daos_obj_declare(container, oid, epoch, NULL /* oa */,
NULL /* ev */);
DCHECK(rc, "Failed to declare object");
}
#endif
/* An MPI_Bcast() call would probably be more efficient. */
MPI_CHECK(MPI_Barrier(param->testComm),
"Failed to synchronize processes");
if (param->open == WRITE)
flags = DAOS_OO_RW;
else
flags = DAOS_OO_RO;
rc = daos_obj_open(container, oid, epoch, flags, object, NULL /* ev */);
DCHECK(rc, "Failed to open object");
}
static void ObjectClose(daos_handle_t object)
{
int rc;
rc = daos_obj_close(object, NULL /* ev */);
DCHECK(rc, "Failed to close object");
}
static void AIOInit(IOR_param_t *param)
{
struct aio *aio;
int i;
int rc;
rc = posix_memalign((void **) &buffers, sysconf(_SC_PAGESIZE),
param->transferSize * param->daosAios);
DCHECK(rc, "Failed to allocate buffer array");
for (i = 0; i < param->daosAios; i++) {
aio = malloc(sizeof *aio);
if (aio == NULL)
ERR("Failed to allocate aio array");
memset(aio, 0, sizeof *aio);
aio->a_dkey.iov_buf = aio->a_dkeyBuf;
aio->a_dkey.iov_buf_len = sizeof aio->a_dkeyBuf;
aio->a_recx.rx_nr = 1;
aio->a_csum.cs_csum = &aio->a_csumBuf;
aio->a_csum.cs_buf_len = sizeof aio->a_csumBuf;
aio->a_csum.cs_len = aio->a_csum.cs_buf_len;
aio->a_epochRange.epr_hi = DAOS_EPOCH_MAX;
aio->a_iod.iod_name.iov_buf = "data";
aio->a_iod.iod_name.iov_buf_len =
strlen(aio->a_iod.iod_name.iov_buf) + 1;
aio->a_iod.iod_name.iov_len = aio->a_iod.iod_name.iov_buf_len;
aio->a_iod.iod_nr = 1;
aio->a_iod.iod_type = DAOS_IOD_ARRAY;
aio->a_iod.iod_recxs = &aio->a_recx;
aio->a_iod.iod_csums = &aio->a_csum;
aio->a_iod.iod_eprs = &aio->a_epochRange;
aio->a_iod.iod_size = param->transferSize;
aio->a_iov.iov_buf = buffers + param->transferSize * i;
aio->a_iov.iov_buf_len = param->transferSize;
aio->a_iov.iov_len = aio->a_iov.iov_buf_len;
aio->a_sgl.sg_nr = 1;
aio->a_sgl.sg_iovs = &aio->a_iov;
rc = daos_event_init(&aio->a_event, eventQueue,
NULL /* parent */);
DCHECK(rc, "Failed to initialize event for aio[%d]", i);
cfs_list_add(&aio->a_list, &aios);
INFO(VERBOSE_3, param, "Allocated AIO %p: buffer %p", aio,
aio->a_iov.iov_buf);
}
nAios = param->daosAios;
events = malloc((sizeof *events) * param->daosAios);
if (events == NULL)
ERR("Failed to allocate events array");
}
static void AIOFini(IOR_param_t *param)
{
struct aio *aio;
struct aio *tmp;
free(events);
cfs_list_for_each_entry_safe(aio, tmp, &aios, a_list) {
INFO(VERBOSE_3, param, "Freeing AIO %p: buffer %p", aio,
aio->a_iov.iov_buf);
cfs_list_del_init(&aio->a_list);
daos_event_fini(&aio->a_event);
free(aio);
}
free(buffers);
}
static void AIOWait(IOR_param_t *param)
{
struct aio *aio;
int i;
int rc;
rc = daos_eq_poll(eventQueue, 0, DAOS_EQ_WAIT, param->daosAios,
events);
DCHECK(rc, "Failed to poll event queue");
assert(rc <= param->daosAios - nAios);
for (i = 0; i < rc; i++) {
int ret;
aio = (struct aio *)
((char *) events[i] -
(char *) (&((struct aio *) 0)->a_event));
DCHECK(aio->a_event.ev_error, "Failed to transfer (%lu, %lu)",
aio->a_iod.iod_recxs->rx_idx,
aio->a_iod.iod_recxs->rx_nr);
daos_event_fini(&aio->a_event);
ret = daos_event_init(&aio->a_event, eventQueue,
NULL /* parent */);
DCHECK(ret, "Failed to reinitialize event for AIO %p", aio);
cfs_list_move(&aio->a_list, &aios);
nAios++;
if (param->verbose >= VERBOSE_3)
INFO(VERBOSE_3, param, "Completed AIO %p: buffer %p", aio,
aio->a_iov.iov_buf);
}
INFO(VERBOSE_3, param, "Found %d completed AIOs (%d free %d busy)", rc,
nAios, param->daosAios - nAios);
}
static void ObjectClassParse(const char *string)
{
if (strcasecmp(string, "tiny") == 0)
objectClass = DAOS_OC_TINY_RW;
else if (strcasecmp(string, "small") == 0)
objectClass = DAOS_OC_SMALL_RW;
else if (strcasecmp(string, "large") == 0)
objectClass = DAOS_OC_LARGE_RW;
else if (strcasecmp(string, "echo") == 0)
objectClass = DAOS_OC_ECHO_RW;
else if (strcasecmp(string, "R2") == 0)
objectClass = DAOS_OC_R2_RW;
else if (strcasecmp(string, "R2S") == 0)
objectClass = DAOS_OC_R2S_RW;
else if (strcasecmp(string, "R3S") == 0)
objectClass = DAOS_OC_R3S_RW;
else if (strcasecmp(string, "R3") == 0)
objectClass = DAOS_OC_R3_RW;
else if (strcasecmp(string, "R4") == 0)
objectClass = DAOS_OC_R4_RW;
else if (strcasecmp(string, "R4S") == 0)
objectClass = DAOS_OC_R4S_RW;
else if (strcasecmp(string, "repl_max") == 0)
objectClass = DAOS_OC_REPL_MAX_RW;
else
GERR("Invalid 'daosObjectClass' argument: '%s'", string);
}
static const char *GetGroup(IOR_param_t *param)
{
if (strlen(param->daosGroup) == 0)
return NULL;
return param->daosGroup;
}
static void ParseService(IOR_param_t *param, int max, d_rank_list_t *ranks)
{
char *s;
s = strdup(param->daosPoolSvc);
if (s == NULL)
GERR("failed to duplicate argument");
ranks->rl_nr = 0;
while ((s = strtok(s, ":")) != NULL) {
if (ranks->rl_nr >= max) {
free(s);
GERR("at most %d pool service replicas supported", max);
}
ranks->rl_ranks[ranks->rl_nr] = atoi(s);
ranks->rl_nr++;
s = NULL;
}
free(s);
}
static void DAOS_Init(IOR_param_t *param)
{
int rc;
if (strlen(param->daosObjectClass) != 0)
ObjectClassParse(param->daosObjectClass);
if (param->filePerProc)
GERR("'filePerProc' not yet supported");
if (param->daosStripeMax % param->daosStripeSize != 0)
GERR("'daosStripeMax' must be a multiple of 'daosStripeSize'");
if (param->daosStripeSize % param->transferSize != 0)
GERR("'daosStripeSize' must be a multiple of 'transferSize'");
if (param->transferSize % param->daosRecordSize != 0)
GERR("'transferSize' must be a multiple of 'daosRecordSize'");
if (param->daosKill && ((objectClass != DAOS_OC_R2_RW) ||
(objectClass != DAOS_OC_R3_RW) ||
(objectClass != DAOS_OC_R4_RW) ||
(objectClass != DAOS_OC_R2S_RW) ||
(objectClass != DAOS_OC_R3S_RW) ||
(objectClass != DAOS_OC_R4S_RW) ||
(objectClass != DAOS_OC_REPL_MAX_RW)))
GERR("'daosKill' only makes sense with 'daosObjectClass=repl'");
if (rank == 0)
INFO(VERBOSE_0, param, "WARNING: USING daosStripeMax CAUSES READS TO RETURN INVALID DATA");
rc = daos_init();
DCHECK(rc, "Failed to initialize daos");
rc = daos_eq_create(&eventQueue);
DCHECK(rc, "Failed to create event queue");
if (rank == 0) {
uuid_t uuid;
d_rank_t rank[13];
d_rank_list_t ranks;
if (strlen(param->daosPool) == 0)
GERR("'daosPool' must be specified");
if (strlen(param->daosPoolSvc) == 0)
GERR("'daosPoolSvc' must be specified");
INFO(VERBOSE_2, param, "Connecting to pool %s %s",
param->daosPool, param->daosPoolSvc);
rc = uuid_parse(param->daosPool, uuid);
DCHECK(rc, "Failed to parse 'daosPool': %s", param->daosPool);
ranks.rl_ranks = rank;
ParseService(param, sizeof(rank) / sizeof(rank[0]), &ranks);
rc = daos_pool_connect(uuid, GetGroup(param), &ranks,
DAOS_PC_RW, &pool, &poolInfo,
NULL /* ev */);
DCHECK(rc, "Failed to connect to pool %s", param->daosPool);
}
HandleDistribute(&pool, POOL_HANDLE, param);
MPI_CHECK(MPI_Bcast(&poolInfo, sizeof poolInfo, MPI_BYTE, 0,
param->testComm),
"Failed to bcast pool info");
if (param->daosStripeCount == -1)
param->daosStripeCount = poolInfo.pi_ntargets * 64UL;
}
static void DAOS_Fini(IOR_param_t *param)
{
int rc;
rc = daos_pool_disconnect(pool, NULL /* ev */);
DCHECK(rc, "Failed to disconnect from pool %s", param->daosPool);
rc = daos_eq_destroy(eventQueue, 0 /* flags */);
DCHECK(rc, "Failed to destroy event queue");
rc = daos_fini();
DCHECK(rc, "Failed to finalize daos");
}
static void *DAOS_Create(char *testFileName, IOR_param_t *param)
{
return DAOS_Open(testFileName, param);
}
static void *DAOS_Open(char *testFileName, IOR_param_t *param)
{
struct fileDescriptor *fd;
daos_epoch_t ghce;
fd = malloc(sizeof *fd);
if (fd == NULL)
ERR("Failed to allocate fd");
ContainerOpen(testFileName, param, &fd->container, &fd->containerInfo);
ghce = fd->containerInfo.ci_epoch_state.es_ghce;
if (param->open == WRITE) {
if (param->daosEpoch == 0)
fd->epoch = ghce + 1;
else if (param->daosEpoch <= ghce)
GERR("Can't modify committed epoch\n");
else
fd->epoch = param->daosEpoch;
} else {
if (param->daosEpoch == 0) {
if (param->daosWait == 0)
fd->epoch = ghce;
else
fd->epoch = param->daosWait;
} else if (param->daosEpoch > ghce) {
GERR("Can't read uncommitted epoch\n");
} else {
fd->epoch = param->daosEpoch;
}
}
if (rank == 0)
INFO(VERBOSE_2, param, "Accessing epoch %lu", fd->epoch);
if (rank == 0 && param->open == WRITE) {
daos_epoch_t e = fd->epoch;
int rc;
INFO(VERBOSE_2, param, "Holding epoch %lu", fd->epoch);
rc = daos_epoch_hold(fd->container, &fd->epoch,
NULL /* state */, NULL /* ev */);
DCHECK(rc, "Failed to hold epoch");
assert(fd->epoch == e);
}
ObjectOpen(fd->container, &fd->object, fd->epoch, param);
AIOInit(param);
return fd;
}
static void
kill_daos_server(IOR_param_t *param)
{
daos_pool_info_t info;
d_rank_t rank, svc_ranks[13];
d_rank_list_t svc, targets;
uuid_t uuid;
char *s;
int rc;
rc = daos_pool_query(pool, NULL, &info, NULL);
DCHECK(rc, "Error in querying pool\n");
if (info.pi_ntargets - info.pi_ndisabled <= 1)
return;
/* choose the last alive one */
rank = info.pi_ntargets - 1 - info.pi_ndisabled;
rc = uuid_parse(param->daosPool, uuid);
DCHECK(rc, "Failed to parse 'daosPool': %s", param->daosPool);
if (rc != 0)
printf("Killing tgt rank: %d (total of %d of %d already disabled)\n",
rank, info.pi_ndisabled, info.pi_ntargets);
fflush(stdout);
rc = daos_mgmt_svc_rip(GetGroup(param), rank, true, NULL);
DCHECK(rc, "Error in killing server\n");
targets.rl_nr = 1;
targets.rl_ranks = &rank;
svc.rl_ranks = svc_ranks;
ParseService(param, sizeof(svc_ranks)/ sizeof(svc_ranks[0]), &svc);
rc = daos_pool_exclude(uuid, NULL, &svc, &targets, NULL);
DCHECK(rc, "Error in excluding pool from poolmap\n");
rc = daos_pool_query(pool, NULL, &info, NULL);
DCHECK(rc, "Error in querying pool\n");
printf("%d targets succesfully disabled\n",
info.pi_ndisabled);
}
static void
kill_and_sync(IOR_param_t *param)
{
double start, end;
start = MPI_Wtime();
if (rank == 0)
kill_daos_server(param);
if (rank == 0)
printf("Done killing and excluding\n");
MPI_CHECK(MPI_Barrier(param->testComm),
"Failed to synchronize processes");
end = MPI_Wtime();
if (rank == 0)
printf("Time spent inducing failure: %lf\n", (end - start));
}
static IOR_offset_t DAOS_Xfer(int access, void *file, IOR_size_t *buffer,
IOR_offset_t length, IOR_param_t *param)
{
struct fileDescriptor *fd = file;
struct aio *aio;
uint64_t stripe;
IOR_offset_t stripeOffset;
uint64_t round;
int rc;
assert(length == param->transferSize);
assert(param->offset % length == 0);
/**
* Currently killing only during writes
* Kills once when 1/2 of blocksize is
* written
**/
total_size += length;
if (param->daosKill && (access == WRITE) &&
((param->blockSize)/2) == total_size) {
/** More than half written lets kill */
if (rank == 0)
printf("Killing and Syncing\n", rank);
kill_and_sync(param);
param->daosKill = 0;
}
/*
* Find an available AIO descriptor. If none, wait for one.
*/
while (nAios == 0)
AIOWait(param);
aio = cfs_list_entry(aios.next, struct aio, a_list);
cfs_list_move_tail(&aio->a_list, &aios);
nAios--;
stripe = (param->offset / param->daosStripeSize) %
param->daosStripeCount;
rc = snprintf(aio->a_dkeyBuf, sizeof aio->a_dkeyBuf, "%lu", stripe);
assert(rc < sizeof aio->a_dkeyBuf);
aio->a_dkey.iov_len = strlen(aio->a_dkeyBuf) + 1;
round = param->offset / (param->daosStripeSize * param->daosStripeCount);
stripeOffset = param->daosStripeSize * round +
param->offset % param->daosStripeSize;
if (param->daosStripeMax != 0)
stripeOffset %= param->daosStripeMax;
aio->a_recx.rx_idx = stripeOffset / param->daosRecordSize;
aio->a_epochRange.epr_lo = fd->epoch;
/*
* If the data written will be checked later, we have to copy in valid
* data instead of writing random bytes. If the data being read is for
* checking purposes, poison the buffer first.
*/
if (access == WRITE && param->checkWrite)
memcpy(aio->a_iov.iov_buf, buffer, length);
else if (access == WRITECHECK || access == READCHECK)
memset(aio->a_iov.iov_buf, '#', length);
INFO(VERBOSE_3, param, "Starting AIO %p (%d free %d busy): access %d "
"dkey '%s' iod <%llu, %llu> sgl <%p, %lu>", aio, nAios,
param->daosAios - nAios, access, (char *) aio->a_dkey.iov_buf,
(unsigned long long) aio->a_iod.iod_recxs->rx_idx,
(unsigned long long) aio->a_iod.iod_recxs->rx_nr,
aio->a_sgl.sg_iovs->iov_buf,
(unsigned long long) aio->a_sgl.sg_iovs->iov_buf_len);
if (access == WRITE) {
rc = daos_obj_update(fd->object, fd->epoch, &aio->a_dkey,
1 /* nr */, &aio->a_iod, &aio->a_sgl,
&aio->a_event);
DCHECK(rc, "Failed to start update operation");
} else {
rc = daos_obj_fetch(fd->object, fd->epoch, &aio->a_dkey,
1 /* nr */, &aio->a_iod, &aio->a_sgl,
NULL /* maps */, &aio->a_event);
DCHECK(rc, "Failed to start fetch operation");
}
/*
* If this is a WRITECHECK or READCHECK, we are expected to fill data
* into the buffer before returning. Note that if this is a READ, we
* don't have to return valid data as WriteOrRead() doesn't care.
*/
if (access == WRITECHECK || access == READCHECK) {
while (param->daosAios - nAios > 0)
AIOWait(param);
memcpy(buffer, aio->a_sgl.sg_iovs->iov_buf, length);
}
return length;
}
static void DAOS_Close(void *file, IOR_param_t *param)
{
struct fileDescriptor *fd = file;
int rc;
while (param->daosAios - nAios > 0)
AIOWait(param);
AIOFini(param);
ObjectClose(fd->object);
if (param->open == WRITE && !param->daosWriteOnly) {
/* Wait for everybody for to complete the writes. */
MPI_CHECK(MPI_Barrier(param->testComm),
"Failed to synchronize processes");
if (rank == 0) {
INFO(VERBOSE_2, param, "Flushing epoch %lu", fd->epoch);
rc = daos_epoch_flush(fd->container, fd->epoch,
NULL /* state */, NULL /* ev */);
DCHECK(rc, "Failed to flush epoch");
INFO(VERBOSE_2, param, "Committing epoch %lu",
fd->epoch);
rc = daos_epoch_commit(fd->container, fd->epoch,
NULL /* state */, NULL /* ev */);
DCHECK(rc, "Failed to commit object write");
}
}
ContainerClose(fd->container, param);
free(fd);
}
static void DAOS_Delete(char *testFileName, IOR_param_t *param)
{
uuid_t uuid;
int rc;
INFO(VERBOSE_2, param, "Deleting container %s", testFileName);
rc = uuid_parse(testFileName, uuid);
DCHECK(rc, "Failed to parse 'testFile': %s", testFileName);
rc = daos_cont_destroy(pool, uuid, 1 /* force */, NULL /* ev */);
if (rc != -DER_NONEXIST)
DCHECK(rc, "Failed to destroy container %s", testFileName);
}
static char* DAOS_GetVersion()
{
static char ver[1024] = {};
sprintf(ver, "%s", "DAOS");
return ver;
}
static void DAOS_Fsync(void *file, IOR_param_t *param)
{
while (param->daosAios - nAios > 0)
AIOWait(param);
}
static IOR_offset_t DAOS_GetFileSize(IOR_param_t *test, MPI_Comm testComm,
char *testFileName)
{
/*
* Sizes are inapplicable to containers at the moment.
*/
return 0;
}

View File

@ -57,6 +57,9 @@ ior_aiori_t *available_aiori[] = {
#endif
#ifdef USE_RADOS_AIORI
&rados_aiori,
#endif
#ifdef USE_DAOS_AIORI
&daos_aiori,
#endif
NULL
};
@ -133,7 +136,7 @@ char* aiori_get_version()
static int is_initialized = FALSE;
void aiori_initialize(){
void aiori_initialize(IOR_test_t *tests_head){
if (is_initialized) return;
is_initialized = TRUE;
@ -145,18 +148,18 @@ void aiori_initialize(){
for (ior_aiori_t **tmp = available_aiori ; *tmp != NULL; ++tmp) {
if((*tmp)->initialize){
(*tmp)->initialize();
(*tmp)->initialize(tests_head ? &tests_head->params : NULL);
}
}
}
void aiori_finalize(){
void aiori_finalize(IOR_test_t *tests_head){
if (! is_initialized) return;
is_initialized = FALSE;
for (ior_aiori_t **tmp = available_aiori ; *tmp != NULL; ++tmp) {
if((*tmp)->finalize){
(*tmp)->finalize();
(*tmp)->finalize(tests_head ? &tests_head->params : NULL);
}
}
}

View File

@ -79,8 +79,8 @@ typedef struct ior_aiori {
int (*rmdir) (const char *path, IOR_param_t * param);
int (*access) (const char *path, int mode, IOR_param_t * param);
int (*stat) (const char *path, struct stat *buf, IOR_param_t * param);
void (*initialize)(); /* called once per program before MPI is started */
void (*finalize)(); /* called once per program after MPI is shutdown */
void (*initialize)(IOR_param_t *); /* called once per program before MPI is started */
void (*finalize)(IOR_param_t *); /* called once per program after MPI is shutdown */
option_help * (*get_options)();
} ior_aiori_t;
@ -96,9 +96,10 @@ extern ior_aiori_t s3_aiori;
extern ior_aiori_t s3_plus_aiori;
extern ior_aiori_t s3_emc_aiori;
extern ior_aiori_t rados_aiori;
extern ior_aiori_t daos_aiori;
void aiori_initialize();
void aiori_finalize();
void aiori_initialize(IOR_test_t *th);
void aiori_finalize(IOR_test_t *th);
const ior_aiori_t *aiori_select (const char *api);
int aiori_count (void);
void aiori_supported_apis(char * APIs);

View File

@ -98,8 +98,6 @@ int ior_main(int argc, char **argv)
out_logfile = stdout;
out_resultfile = stdout;
aiori_initialize();
/*
* check -h option from commandline without starting MPI;
*/
@ -125,6 +123,8 @@ int ior_main(int argc, char **argv)
PrintHeader(argc, argv);
aiori_initialize(tests_head);
/* perform each test */
for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
verbose = tptr->params.verbose;
@ -143,6 +143,8 @@ int ior_main(int argc, char **argv)
ShowTestEnd(tptr);
}
aiori_finalize(tests_head);
if (verbose < 0)
/* always print final summary */
verbose = 0;
@ -155,8 +157,6 @@ int ior_main(int argc, char **argv)
MPI_CHECK(MPI_Finalize(), "cannot finalize MPI");
aiori_finalize();
return totalErrorCount;
}
@ -199,6 +199,11 @@ void init_IOR_Param_t(IOR_param_t * p)
p->setAlignment = 1;
p->lustre_start_ost = -1;
p->daosRecordSize = 262144;
p->daosStripeSize = 524288;
p->daosStripeCount = -1;
p->daosAios = 1;
hdfs_user = getenv("USER");
if (!hdfs_user)
hdfs_user = "";
@ -297,7 +302,8 @@ static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep)
1, MPI_LONG_LONG_INT, MPI_SUM, testComm),
"cannot total data moved");
if (strcasecmp(params->api, "HDF5") != 0 && strcasecmp(params->api, "NCMPI") != 0) {
if (strcasecmp(params->api, "HDF5") != 0 && strcasecmp(params->api, "NCMPI") != 0 &&
strcasecmp(params->api, "DAOS") != 0) {
if (verbose >= VERBOSE_0 && rank == 0) {
if ((params->expectedAggFileSize
!= results[rep].aggFileSizeFromXfer)
@ -913,7 +919,8 @@ static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
rankOffset = 0;
GetTestFileName(testFileName, test);
}
if (backend->access(testFileName, F_OK, test) == 0) {
if (backend->access(testFileName, F_OK, test) == 0 ||
strcasecmp(test->api, "DAOS") == 0) {
backend->delete(testFileName, test);
}
if (test->reorderTasksRandom == TRUE) {
@ -921,7 +928,8 @@ static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
GetTestFileName(testFileName, test);
}
} else {
if ((rank == 0) && (backend->access(testFileName, F_OK, test) == 0)) {
if ((rank == 0) && (backend->access(testFileName, F_OK, test) == 0 ||
strcasecmp(test->api, "DAOS"))) {
backend->delete(testFileName, test);
}
}

View File

@ -190,6 +190,21 @@ typedef struct
int lustre_set_striping; /* flag that we need to set lustre striping */
int lustre_ignore_locks;
/* DAOS variables */
char daosGroup[MAX_STR]; /* group name */
char daosPool[37]; /* pool UUID */
char daosPoolSvc[MAX_STR]; /* pool service ranks */
int daosRecordSize; /* size of akey record (i.e., rx_rsize) */
int daosStripeSize;
unsigned long daosStripeCount;
unsigned long daosStripeMax; /* max length of a stripe */
int daosAios; /* max number of concurrent async I/Os */
int daosWriteOnly; /* write only, no flush and commit */
unsigned long daosEpoch; /* epoch to access */
unsigned long daosWait; /* epoch to wait for before reading */
int daosKill; /* kill a target while running IOR */
char daosObjectClass[MAX_STR]; /* object class */
/* gpfs variables */
int gpfs_hint_access; /* use gpfs "access range" hint */
int gpfs_release_token; /* immediately release GPFS tokens after

556
src/list.h Normal file
View File

@ -0,0 +1,556 @@
/**
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 only,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License version 2 for more details (a copy is included
* in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* GPL HEADER END
*/
#ifndef __DAOS_LIST_H__
#define __DAOS_LIST_H__
/*
* Simple doubly linked list implementation.
*
* Some of the internal functions ("__xxx") are useful when
* manipulating whole lists rather than single entries, as
* sometimes we already know the next/prev entries and we can
* generate better code by using them directly rather than
* using the generic single-entry routines.
*/
#define prefetch(a) ((void)a)
struct cfs_list_head {
struct cfs_list_head *next, *prev;
};
typedef struct cfs_list_head cfs_list_t;
#define CFS_LIST_HEAD_INIT(name) { &(name), &(name) }
#define CFS_LIST_HEAD(name) \
cfs_list_t name = CFS_LIST_HEAD_INIT(name)
#define CFS_INIT_LIST_HEAD(ptr) do { \
(ptr)->next = (ptr); (ptr)->prev = (ptr); \
} while (0)
/**
* Insert a new entry between two known consecutive entries.
*
* This is only for internal list manipulation where we know
* the prev/next entries already!
*/
static inline void __cfs_list_add(cfs_list_t * new,
cfs_list_t * prev,
cfs_list_t * next)
{
next->prev = new;
new->next = next;
new->prev = prev;
prev->next = new;
}
/**
* Insert an entry at the start of a list.
* \param new new entry to be inserted
* \param head list to add it to
*
* Insert a new entry after the specified head.
* This is good for implementing stacks.
*/
static inline void cfs_list_add(cfs_list_t *new,
cfs_list_t *head)
{
__cfs_list_add(new, head, head->next);
}
/**
* Insert an entry at the end of a list.
* \param new new entry to be inserted
* \param head list to add it to
*
* Insert a new entry before the specified head.
* This is useful for implementing queues.
*/
static inline void cfs_list_add_tail(cfs_list_t *new,
cfs_list_t *head)
{
__cfs_list_add(new, head->prev, head);
}
/*
* Delete a list entry by making the prev/next entries
* point to each other.
*
* This is only for internal list manipulation where we know
* the prev/next entries already!
*/
static inline void __cfs_list_del(cfs_list_t *prev,
cfs_list_t *next)
{
next->prev = prev;
prev->next = next;
}
/**
* Remove an entry from the list it is currently in.
* \param entry the entry to remove
* Note: list_empty(entry) does not return true after this, the entry is in an
* undefined state.
*/
static inline void cfs_list_del(cfs_list_t *entry)
{
__cfs_list_del(entry->prev, entry->next);
}
/**
* Remove an entry from the list it is currently in and reinitialize it.
* \param entry the entry to remove.
*/
static inline void cfs_list_del_init(cfs_list_t *entry)
{
__cfs_list_del(entry->prev, entry->next);
CFS_INIT_LIST_HEAD(entry);
}
/**
* Remove an entry from the list it is currently in and insert it at the start
* of another list.
* \param list the entry to move
* \param head the list to move it to
*/
static inline void cfs_list_move(cfs_list_t *list,
cfs_list_t *head)
{
__cfs_list_del(list->prev, list->next);
cfs_list_add(list, head);
}
/**
* Remove an entry from the list it is currently in and insert it at the end of
* another list.
* \param list the entry to move
* \param head the list to move it to
*/
static inline void cfs_list_move_tail(cfs_list_t *list,
cfs_list_t *head)
{
__cfs_list_del(list->prev, list->next);
cfs_list_add_tail(list, head);
}
/**
* Test whether a list is empty
* \param head the list to test.
*/
static inline int cfs_list_empty(cfs_list_t *head)
{
return head->next == head;
}
/**
* Test whether a list is empty and not being modified
* \param head the list to test
*
* Tests whether a list is empty _and_ checks that no other CPU might be
* in the process of modifying either member (next or prev)
*
* NOTE: using cfs_list_empty_careful() without synchronization
* can only be safe if the only activity that can happen
* to the list entry is cfs_list_del_init(). Eg. it cannot be used
* if another CPU could re-list_add() it.
*/
static inline int cfs_list_empty_careful(const cfs_list_t *head)
{
cfs_list_t *next = head->next;
return (next == head) && (next == head->prev);
}
static inline void __cfs_list_splice(cfs_list_t *list,
cfs_list_t *head)
{
cfs_list_t *first = list->next;
cfs_list_t *last = list->prev;
cfs_list_t *at = head->next;
first->prev = head;
head->next = first;
last->next = at;
at->prev = last;
}
/**
* Join two lists
* \param list the new list to add.
* \param head the place to add it in the first list.
*
* The contents of \a list are added at the start of \a head. \a list is in an
* undefined state on return.
*/
static inline void cfs_list_splice(cfs_list_t *list,
cfs_list_t *head)
{
if (!cfs_list_empty(list))
__cfs_list_splice(list, head);
}
/**
* Join two lists and reinitialise the emptied list.
* \param list the new list to add.
* \param head the place to add it in the first list.
*
* The contents of \a list are added at the start of \a head. \a list is empty
* on return.