re-write the DAOS IOR driver to use the Array API

Signed-off-by: Mohamad Chaarawi <mohamad.chaarawi@intel.com>
master
Mohamad Chaarawi 2019-03-30 19:11:44 +00:00
parent 8b23c50389
commit 4a788a0c23
3 changed files with 322 additions and 587 deletions

View File

@ -4,14 +4,16 @@ Building
The DAOS library must be installed on the system.
./bootsrap
./configure --prefix=iorInstallDir --with-daos=DIR
./configure --prefix=iorInstallDir --with-daos=DIR --with-cart=DIR
One must specify "--with-daos=/path/to/daos/install". When that is specified
the DAOS and DFS driver will be built.
One must specify "--with-daos=/path/to/daos/install and --with-cart". When that
is specified the DAOS and DFS driver will be built.
The DAOS driver uses the DAOS API to create a container (file) and update/fetch
an object to the container using the daos object API. The DAOS driver works with
IOR only (no mdtest support yet).
The DAOS driver uses the DAOS API to open a container (or create it if it
doesn't exist first) then create an array object in that container (file) and
read/write to the array object using the daos Array API. The DAOS driver works
with IOR only (no mdtest support yet). The file name used by IOR (passed by -o
option) is hashed to an object ID that is used as the array oid.
The DFS (DAOS File System) driver creates an encapsulated namespace and emulates
the POSIX driver using the DFS API directly on top of DAOS. The DFS driver works
@ -33,40 +35,22 @@ The DAOS options include:
Required Options:
--daos.pool <pool_uuid>: pool uuid to connect to (has to be created beforehand)
--daos.svcl <pool_svcl>: pool svcl list (: separated)
--daos.cont <cont_uuid>: container for the IOR files/objects (can use `uuidgen`)
Optional Options:
--daos.group <group_name>: group name of servers with the pool
--daos.recordSize <record_size>: object record size for IO
--daos.stripeSize <stripe_size>
--daos.stripeCount <stripe_count>
--daos.stripeMax <max_stripe_size>
--daos.aios <num>: number of concurrent async IOs
--daos.kill flag to kill a rank during IO
--daos.objectClass <object_class>: specific object class
When benchmarking write performance, one likely does not want "-W", which causes
the write phase to do one additional memory copy for every I/O. This is due to
IOR's assumption that when a DAOS_Xfer() call returns the buffer may be
released. Therefore, random data is written when "-W" is absent, while data is
copied from IOR buffers when "-W" is present.
See doc/USER_GUIDE for all options and directives. Note that not all
combinations of options are supported. For example specifying an epoch to access
and running ior with multiple iterations would cause all iterations other than
first one to fail because the epoch will be committed in the first iteration. In
that case, the epoch should not be specified and the DAOS driver would choose
the epoch to access automatically on each iteration.
--daos.chunk_size <chunk_size>: Chunk size of the array object controlling striping over DKEYs
--daos.destroy flag to destory the container on finalize
--daos.oclass <object_class>: specific object class for array object
Examples that should work include:
- "ior -a DAOS -w -W -o <container_uuid> --daos.pool <pool_uuid> --daos.svcl <svc_ranks>"
writes into a new container and verifies the data, using default
daosRecordSize, transferSize, daosStripeSize, blockSize, daosAios, etc.
- "ior -a DAOS -w -W -o file_name --daos.pool <pool_uuid> --daos.svcl <svc_ranks>\
--daos.cont <cont_uuid>"
- "ior -a DAOS -w -W -r -R -o <container_uuid> -b 1g -t 4m -C \
--daos.pool <pool_uuid> --daos.svcl <svc_ranks> --daos.recordSize 1m --daos.stripeSize 4m\
--daos.stripeCount 256 --daos.aios 8
does all IOR tests and shifts ranks during checkWrite and checkRead.
- "ior -a DAOS -w -W -r -R -o file_name -b 1g -t 4m \
--daos.pool <pool_uuid> --daos.svcl <svc_ranks> --daos.cont <cont_uuid>\
--daos.chunk_size 1024 --daos.oclass R2"
Running with DFS API
---------------------

View File

@ -185,7 +185,20 @@ AM_COND_IF([USE_RADOS_AIORI],[
AC_DEFINE([USE_RADOS_AIORI], [], [Build RADOS backend AIORI])
])
# DAOS Backends (DAOS and DFS) IO support
# DAOS Backends (DAOS and DFS) IO support require DAOS and CART/GURT
AC_ARG_WITH([cart],
[AS_HELP_STRING([--with-cart],
[support IO with DAOS backends @<:@default=no@:>@])],
[],
[with_daos=no])
AS_IF([test "x$with_cart" != xno],
CART="yes"
LDFLAGS="$LDFLAGS -L$with_cart/lib"
CPPFLAGS="$CPPFLAGS -I$with_cart/include/"
AC_CHECK_HEADERS(gurt/common.h,, [unset CART])
AC_CHECK_LIB([gurt], [d_hash_murmur64],, [unset CART]))
AC_ARG_WITH([daos],
[AS_HELP_STRING([--with-daos],
[support IO with DAOS backends @<:@default=no@:>@])],

View File

@ -28,8 +28,10 @@
#include <sys/types.h>
#include <libgen.h>
#include <stdbool.h>
#include <gurt/common.h>
#include <daos.h>
#include <daos_types.h>
#include <daos_addons.h>
#include "ior.h"
#include "aiori.h"
@ -38,42 +40,33 @@
/************************** O P T I O N S *****************************/
struct daos_options{
char *daosPool;
char *daosPoolSvc;
char *daosGroup;
int daosRecordSize;
int daosStripeSize;
uint64_t daosStripeCount;
uint64_t daosStripeMax; /* max length of a stripe */
int daosAios; /* max number of concurrent async I/Os */
int daosKill; /* kill a target while running IOR */
char *daosObjectClass; /* object class */
char *pool;
char *svcl;
char *group;
char *cont;
int chunk_size;
int destroy;
char *oclass;
};
static struct daos_options o = {
.daosPool = NULL,
.daosPoolSvc = NULL,
.daosGroup = NULL,
.daosRecordSize = 262144,
.daosStripeSize = 524288,
.daosStripeCount = -1,
.daosStripeMax = 0,
.daosAios = 1,
.daosKill = 0,
.daosObjectClass = NULL,
.pool = NULL,
.svcl = NULL,
.group = NULL,
.cont = NULL,
.chunk_size = 1048576,
.destroy = 0,
.oclass = NULL,
};
static option_help options [] = {
{0, "daos.pool", "pool uuid", OPTION_REQUIRED_ARGUMENT, 's', &o.daosPool},
{0, "daos.svcl", "pool SVCL", OPTION_REQUIRED_ARGUMENT, 's', &o.daosPoolSvc},
{0, "daos.group", "server group", OPTION_OPTIONAL_ARGUMENT, 's', &o.daosGroup},
{0, "daos.recordSize", "Record Size", OPTION_OPTIONAL_ARGUMENT, 'd', &o.daosRecordSize},
{0, "daos.stripeSize", "Stripe Size", OPTION_OPTIONAL_ARGUMENT, 'd', &o.daosStripeSize},
{0, "daos.stripeCount", "Stripe Count", OPTION_OPTIONAL_ARGUMENT, 'u', &o.daosStripeCount},
{0, "daos.stripeMax", "Max Stripe",OPTION_OPTIONAL_ARGUMENT, 'u', &o.daosStripeMax},
{0, "daos.aios", "Concurrent Async IOs",OPTION_OPTIONAL_ARGUMENT, 'd', &o.daosAios},
{0, "daos.kill", "Kill target while running",OPTION_FLAG, 'd', &o.daosKill},
{0, "daos.objectClass", "object class", OPTION_OPTIONAL_ARGUMENT, 's', &o.daosObjectClass},
{0, "daos.pool", "pool uuid", OPTION_REQUIRED_ARGUMENT, 's', &o.pool},
{0, "daos.svcl", "pool SVCL", OPTION_REQUIRED_ARGUMENT, 's', &o.svcl},
{0, "daos.group", "server group", OPTION_OPTIONAL_ARGUMENT, 's', &o.group},
{0, "daos.cont", "container uuid", OPTION_REQUIRED_ARGUMENT, 's', &o.cont},
{0, "daos.chunk_size", "chunk size", OPTION_OPTIONAL_ARGUMENT, 'd', &o.chunk_size},
{0, "daos.destroy", "Destroy Container", OPTION_FLAG, 'd', &o.destroy},
{0, "daos.oclass", "object class", OPTION_OPTIONAL_ARGUMENT, 's', &o.oclass},
LAST_OPTION
};
@ -111,40 +104,19 @@ ior_aiori_t daos_aiori = {
.get_options = DAOS_options,
};
#define IOR_DAOS_MUR_SEED 0xDEAD10CC
enum handleType {
POOL_HANDLE,
CONTAINER_HANDLE
CONT_HANDLE,
ARRAY_HANDLE
};
struct fileDescriptor {
daos_handle_t container;
daos_cont_info_t containerInfo;
daos_handle_t object;
};
struct aio {
cfs_list_t a_list;
char a_dkeyBuf[32];
daos_key_t a_dkey;
daos_recx_t a_recx;
unsigned char a_csumBuf[32];
daos_csum_buf_t a_csum;
daos_iod_t a_iod;
daos_iov_t a_iov;
daos_sg_list_t a_sgl;
struct daos_event a_event;
};
static daos_handle_t eventQueue;
static struct daos_event **events;
static unsigned char *buffers;
static int nAios;
static daos_handle_t pool;
static daos_pool_info_t poolInfo;
static daos_oclass_id_t objectClass = DAOS_OC_LARGE_RW;
static CFS_LIST_HEAD(aios);
static IOR_offset_t total_size;
static bool daos_initialized = false;
static daos_handle_t poh;
static daos_handle_t coh;
static daos_handle_t aoh;
static daos_oclass_id_t objectClass = DAOS_OC_LARGE_RW;
static bool daos_initialized = false;
/***************************** F U N C T I O N S ******************************/
@ -176,13 +148,12 @@ do { \
} while (0)
/* Distribute process 0's pool or container handle to others. */
static void HandleDistribute(daos_handle_t *handle, enum handleType type)
static void
HandleDistribute(daos_handle_t *handle, enum handleType type)
{
daos_iov_t global;
int rc;
assert(type == POOL_HANDLE || !daos_handle_is_inval(pool));
global.iov_buf = NULL;
global.iov_buf_len = 0;
global.iov_len = 0;
@ -191,8 +162,10 @@ static void HandleDistribute(daos_handle_t *handle, enum handleType type)
/* Get the global handle size. */
if (type == POOL_HANDLE)
rc = daos_pool_local2global(*handle, &global);
else
else if (type == CONT_HANDLE)
rc = daos_cont_local2global(*handle, &global);
else
rc = daos_array_local2global(*handle, &global);
DCHECK(rc, "Failed to get global handle size");
}
@ -207,8 +180,10 @@ static void HandleDistribute(daos_handle_t *handle, enum handleType type)
if (rank == 0) {
if (type == POOL_HANDLE)
rc = daos_pool_local2global(*handle, &global);
else
else if (type == CONT_HANDLE)
rc = daos_cont_local2global(*handle, &global);
else
rc = daos_array_local2global(*handle, &global);
DCHECK(rc, "Failed to create global handle");
}
@ -222,213 +197,18 @@ static void HandleDistribute(daos_handle_t *handle, enum handleType type)
if (type == POOL_HANDLE)
rc = daos_pool_global2local(global, handle);
else
rc = daos_cont_global2local(pool, global, handle);
else if (type == CONT_HANDLE)
rc = daos_cont_global2local(poh, global, handle);
else
rc = daos_array_global2local(coh, global, handle);
DCHECK(rc, "Failed to get local handle");
}
free(global.iov_buf);
}
static void ContainerOpen(char *testFileName, IOR_param_t *param,
daos_handle_t *container, daos_cont_info_t *info)
{
int rc;
if (rank == 0) {
uuid_t uuid;
unsigned int dFlags;
rc = uuid_parse(testFileName, uuid);
DCHECK(rc, "Failed to parse 'testFile': %s", testFileName);
if (param->open == WRITE &&
param->useExistingTestFile == FALSE) {
INFO(VERBOSE_2, "Creating container %s", testFileName);
rc = daos_cont_create(pool, uuid, NULL, NULL);
DCHECK(rc, "Failed to create container %s",
testFileName);
}
INFO(VERBOSE_2, "Opening container %s", testFileName);
if (param->open == WRITE)
dFlags = DAOS_COO_RW;
else
dFlags = DAOS_COO_RO;
rc = daos_cont_open(pool, uuid, dFlags, container, info,
NULL /* ev */);
DCHECK(rc, "Failed to open container %s", testFileName);
}
HandleDistribute(container, CONTAINER_HANDLE);
MPI_CHECK(MPI_Bcast(info, sizeof *info, MPI_BYTE, 0, param->testComm),
"Failed to broadcast container info");
}
static void ContainerClose(daos_handle_t container, IOR_param_t *param)
{
int rc;
if (rank != 0) {
rc = daos_cont_close(container, NULL /* ev */);
DCHECK(rc, "Failed to close container");
}
/* An MPI_Gather() call would probably be more efficient. */
MPI_CHECK(MPI_Barrier(param->testComm),
"Failed to synchronize processes");
if (rank == 0) {
rc = daos_cont_close(container, NULL /* ev */);
DCHECK(rc, "Failed to close container");
}
}
static void ObjectOpen(daos_handle_t container, daos_handle_t *object,
IOR_param_t *param)
{
daos_obj_id_t oid;
unsigned int flags;
int rc;
oid.hi = 0;
oid.lo = 1;
daos_obj_generate_id(&oid, 0, objectClass);
if (param->open == WRITE)
flags = DAOS_OO_RW;
else
flags = DAOS_OO_RO;
rc = daos_obj_open(container, oid, flags, object, NULL /* ev */);
DCHECK(rc, "Failed to open object");
}
static void ObjectClose(daos_handle_t object)
{
int rc;
rc = daos_obj_close(object, NULL /* ev */);
DCHECK(rc, "Failed to close object");
}
static void AIOInit(IOR_param_t *param)
{
struct aio *aio;
int i;
int rc;
rc = posix_memalign((void **) &buffers, sysconf(_SC_PAGESIZE),
param->transferSize * o.daosAios);
DCHECK(rc, "Failed to allocate buffer array");
for (i = 0; i < o.daosAios; i++) {
aio = malloc(sizeof *aio);
if (aio == NULL)
ERR("Failed to allocate aio array");
memset(aio, 0, sizeof *aio);
aio->a_dkey.iov_buf = aio->a_dkeyBuf;
aio->a_dkey.iov_buf_len = sizeof aio->a_dkeyBuf;
aio->a_recx.rx_nr = 1;
daos_iov_set(&aio->a_iod.iod_name, "data", strlen("data"));
daos_csum_set(&aio->a_iod.iod_kcsum, NULL, 0);
aio->a_iod.iod_nr = 1;
aio->a_iod.iod_type = DAOS_IOD_ARRAY;
aio->a_iod.iod_recxs = &aio->a_recx;
aio->a_iod.iod_csums = NULL;
aio->a_iod.iod_eprs = NULL;
aio->a_iod.iod_size = param->transferSize;
aio->a_iov.iov_buf = buffers + param->transferSize * i;
aio->a_iov.iov_buf_len = param->transferSize;
aio->a_iov.iov_len = aio->a_iov.iov_buf_len;
aio->a_sgl.sg_nr = 1;
aio->a_sgl.sg_iovs = &aio->a_iov;
rc = daos_event_init(&aio->a_event, eventQueue,
NULL /* parent */);
DCHECK(rc, "Failed to initialize event for aio[%d]", i);
cfs_list_add(&aio->a_list, &aios);
INFO(VERBOSE_3, "Allocated AIO %p: buffer %p", aio,
aio->a_iov.iov_buf);
}
nAios = o.daosAios;
events = malloc((sizeof *events) * o.daosAios);
if (events == NULL)
ERR("Failed to allocate events array");
}
static void AIOFini(IOR_param_t *param)
{
struct aio *aio;
struct aio *tmp;
free(events);
cfs_list_for_each_entry_safe(aio, tmp, &aios, a_list) {
INFO(VERBOSE_3, "Freeing AIO %p: buffer %p", aio,
aio->a_iov.iov_buf);
cfs_list_del_init(&aio->a_list);
daos_event_fini(&aio->a_event);
free(aio);
}
free(buffers);
}
static void AIOWait(IOR_param_t *param)
{
struct aio *aio;
int i;
int rc;
rc = daos_eq_poll(eventQueue, 0, DAOS_EQ_WAIT, o.daosAios,
events);
DCHECK(rc, "Failed to poll event queue");
assert(rc <= o.daosAios - nAios);
for (i = 0; i < rc; i++) {
int ret;
aio = (struct aio *)
((char *) events[i] -
(char *) (&((struct aio *) 0)->a_event));
DCHECK(aio->a_event.ev_error, "Failed to transfer (%lu, %lu)",
aio->a_iod.iod_recxs->rx_idx,
aio->a_iod.iod_recxs->rx_nr);
daos_event_fini(&aio->a_event);
ret = daos_event_init(&aio->a_event, eventQueue,
NULL /* parent */);
DCHECK(ret, "Failed to reinitialize event for AIO %p", aio);
cfs_list_move(&aio->a_list, &aios);
nAios++;
if (param->verbose >= VERBOSE_3)
INFO(VERBOSE_3, "Completed AIO %p: buffer %p", aio,
aio->a_iov.iov_buf);
}
INFO(VERBOSE_3, "Found %d completed AIOs (%d free %d busy)", rc,
nAios, o.daosAios - nAios);
}
static void ObjectClassParse(const char *string)
static void
ObjectClassParse(const char *string)
{
if (strcasecmp(string, "tiny") == 0)
objectClass = DAOS_OC_TINY_RW;
@ -459,105 +239,107 @@ static void ObjectClassParse(const char *string)
else if (strcasecmp(string, "repl_max") == 0)
objectClass = DAOS_OC_REPL_MAX_RW;
else
GERR("Invalid 'daosObjectClass' argument: '%s'", string);
GERR("Invalid 'oclass' argument: '%s'", string);
}
static void ParseService(int max, d_rank_list_t *ranks)
static option_help *
DAOS_options()
{
char *s;
s = strdup(o.daosPoolSvc);
if (s == NULL)
GERR("failed to duplicate argument");
ranks->rl_nr = 0;
while ((s = strtok(s, ":")) != NULL) {
if (ranks->rl_nr >= max) {
free(s);
GERR("at most %d pool service replicas supported", max);
}
ranks->rl_ranks[ranks->rl_nr] = atoi(s);
ranks->rl_nr++;
s = NULL;
}
free(s);
}
static option_help * DAOS_options(){
return options;
}
static void DAOS_Init()
static void
DAOS_Init()
{
int rc;
if (daos_initialized)
return;
if (o.daosPool == NULL || o.daosPoolSvc == NULL)
return;
if (o.daosObjectClass)
ObjectClassParse(o.daosObjectClass);
if (o.daosStripeMax % o.daosStripeSize != 0)
GERR("'daosStripeMax' must be a multiple of 'daosStripeSize'");
if (o.daosKill && ((objectClass != DAOS_OC_R2_RW) ||
(objectClass != DAOS_OC_R3_RW) ||
(objectClass != DAOS_OC_R4_RW) ||
(objectClass != DAOS_OC_R2S_RW) ||
(objectClass != DAOS_OC_R3S_RW) ||
(objectClass != DAOS_OC_R4S_RW) ||
(objectClass != DAOS_OC_REPL_MAX_RW)))
GERR("'daosKill' only makes sense with 'daosObjectClass=repl'");
if (rank == 0)
INFO(VERBOSE_0, "WARNING: USING daosStripeMax CAUSES READS TO RETURN INVALID DATA");
if (o.pool == NULL || o.svcl == NULL || o.cont == NULL)
return;
if (o.oclass)
ObjectClassParse(o.oclass);
rc = daos_init();
if (rc != -DER_ALREADY)
if (rc)
DCHECK(rc, "Failed to initialize daos");
rc = daos_eq_create(&eventQueue);
DCHECK(rc, "Failed to create event queue");
if (rank == 0) {
uuid_t uuid;
d_rank_t d_rank[13];
d_rank_list_t ranks;
uuid_t uuid;
d_rank_list_t *svcl = NULL;
d_rank_list_t ranks;
static daos_pool_info_t po_info;
static daos_cont_info_t co_info;
INFO(VERBOSE_2, "Connecting to pool %s %s", o.daosPool, o.daosPoolSvc);
INFO(VERBOSE_1, "Connecting to pool %s", o.pool);
rc = uuid_parse(o.daosPool, uuid);
DCHECK(rc, "Failed to parse 'daosPool': %s", o.daosPool);
ranks.rl_ranks = d_rank;
ParseService(sizeof(d_rank) / sizeof(d_rank[0]), &ranks);
rc = uuid_parse(o.pool, uuid);
DCHECK(rc, "Failed to parse 'pool': %s", o.pool);
rc = daos_pool_connect(uuid, o.daosGroup, &ranks,
DAOS_PC_RW, &pool, &poolInfo,
NULL /* ev */);
DCHECK(rc, "Failed to connect to pool %s", o.daosPool);
svcl = daos_rank_list_parse(o.svcl, ":");
if (svcl == NULL)
ERR("Failed to allocate svcl");
rc = daos_pool_connect(uuid, o.group, svcl, DAOS_PC_RW,
&poh, &po_info, NULL);
daos_rank_list_free(svcl);
DCHECK(rc, "Failed to connect to pool %s", o.pool);
INFO(VERBOSE_1, "Create/Open Container %s", o.cont);
uuid_clear(uuid);
rc = uuid_parse(o.cont, uuid);
DCHECK(rc, "Failed to parse 'cont': %s", o.cont);
rc = daos_cont_open(poh, uuid, DAOS_COO_RW, &coh, &co_info,
NULL);
/* If NOEXIST we create it */
if (rc == -DER_NONEXIST) {
INFO(VERBOSE_2, "Creating DAOS Container...\n");
rc = daos_cont_create(poh, uuid, NULL, NULL);
if (rc == 0)
rc = daos_cont_open(poh, uuid, DAOS_COO_RW,
&coh, &co_info, NULL);
}
DCHECK(rc, "Failed to create container");
}
HandleDistribute(&pool, POOL_HANDLE);
MPI_CHECK(MPI_Bcast(&poolInfo, sizeof poolInfo, MPI_BYTE, 0, MPI_COMM_WORLD),
"Failed to bcast pool info");
if (o.daosStripeCount == -1)
o.daosStripeCount = poolInfo.pi_ntargets * 64UL;
HandleDistribute(&poh, POOL_HANDLE);
HandleDistribute(&coh, CONT_HANDLE);
aoh.cookie = 0;
daos_initialized = true;
}
static void DAOS_Fini()
static void
DAOS_Fini()
{
int rc;
if (!daos_initialized)
return;
rc = daos_pool_disconnect(pool, NULL /* ev */);
DCHECK(rc, "Failed to disconnect from pool %s", o.daosPool);
rc = daos_cont_close(coh, NULL);
DCHECK(rc, "Failed to close container\n");
rc = daos_eq_destroy(eventQueue, 0 /* flags */);
DCHECK(rc, "Failed to destroy event queue");
if (o.destroy) {
if (rank == 0) {
uuid_t uuid;
INFO(VERBOSE_1, "Destroying Container %s", o.cont);
uuid_parse(o.cont, uuid);
rc = daos_cont_destroy(poh, o.cont, 1, NULL);
}
MPI_Bcast(&rc, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (rc)
DCHECK(rc, "Failed to destroy container %s (%d)",
o.cont, rc);
}
rc = daos_pool_disconnect(poh, NULL);
DCHECK(rc, "Failed to disconnect from pool %s", o.pool);
rc = daos_fini();
DCHECK(rc, "Failed to finalize daos");
@ -565,253 +347,180 @@ static void DAOS_Fini()
daos_initialized = false;
}
static void *DAOS_Create(char *testFileName, IOR_param_t *param)
static void
gen_oid(const char *name, daos_obj_id_t *oid)
{
return DAOS_Open(testFileName, param);
daos_ofeat_t feat = 0;
oid->lo = d_hash_murmur64(name, strlen(name), IOR_DAOS_MUR_SEED);
oid->hi = 0;
feat = DAOS_OF_DKEY_UINT64 | DAOS_OF_AKEY_HASHED;
daos_obj_generate_id(oid, feat, objectClass);
}
static void *
DAOS_Create(char *testFileName, IOR_param_t *param)
{
daos_obj_id_t oid;
int rc;
/** Convert file name into object ID */
gen_oid(testFileName, &oid);
/** Create the array */
if (param->filePerProc || rank == 0) {
printf("Chunk size = %zu\n", o.chunk_size);
rc = daos_array_create(coh, oid, DAOS_TX_NONE, 1, o.chunk_size,
&aoh, NULL);
DCHECK(rc, "Failed to create array object\n");
}
/** Distribute the array handle if not FPP */
if (!param->filePerProc)
HandleDistribute(&aoh, ARRAY_HANDLE);
}
static int
DAOS_Access(const char *testFileName, int mode, IOR_param_t * param)
{
uuid_t uuid;
unsigned int dFlags;
daos_handle_t coh;
daos_cont_info_t info;
daos_obj_id_t oid;
int rc;
rc = uuid_parse(testFileName, uuid);
DCHECK(rc, "Failed to parse 'testFile': %s", testFileName);
/** Convert file name into object ID */
gen_oid(testFileName, &oid);
rc = daos_cont_open(pool, uuid, DAOS_COO_RO, &coh, &info, NULL);
if (rc)
return rc;
/** open the array to verify it exists */
if (param->filePerProc || rank == 0) {
daos_size_t cell_size, chunk_size;
rc = daos_array_open(coh, oid, DAOS_TX_NONE, DAOS_OO_RO,
&cell_size, &chunk_size, &aoh, NULL);
if (rc)
return rc;
if (cell_size != 1)
GERR("Invalid DAOS Array object.\n");
rc = daos_array_close(aoh, NULL);
aoh.cookie = 0;
}
if (!param->filePerProc)
MPI_Bcast(&rc, 1, MPI_INT, 0, MPI_COMM_WORLD);
rc = daos_cont_close(coh, NULL);
return rc;
}
static void *DAOS_Open(char *testFileName, IOR_param_t *param)
static void *
DAOS_Open(char *testFileName, IOR_param_t *param)
{
struct fileDescriptor *fd;
daos_obj_id_t oid;
fd = malloc(sizeof *fd);
if (fd == NULL)
ERR("Failed to allocate fd");
/** Convert file name into object ID */
gen_oid(testFileName, &oid);
ContainerOpen(testFileName, param, &fd->container, &fd->containerInfo);
ObjectOpen(fd->container, &fd->object, param);
AIOInit(param);
/** Open the array */
if (param->filePerProc || rank == 0) {
daos_size_t cell_size, chunk_size;
int rc;
return fd;
rc = daos_array_open(coh, oid, DAOS_TX_NONE, DAOS_OO_RW,
&cell_size, &chunk_size, &aoh, NULL);
DCHECK(rc, "Failed to create array object\n");
if (cell_size != 1)
GERR("Invalid DAOS Array object.\n");
}
/** Distribute the array handle if not FPP */
if (!param->filePerProc)
HandleDistribute(&aoh, ARRAY_HANDLE);
}
static void
kill_daos_server(IOR_param_t *param)
static IOR_offset_t
DAOS_Xfer(int access, void *file, IOR_size_t *buffer,
IOR_offset_t length, IOR_param_t *param)
{
daos_pool_info_t info;
d_rank_t d_rank, svc_ranks[13];
d_rank_list_t svc;
struct d_tgt_list targets;
int tgt_idx = -1;
uuid_t uuid;
char *s;
int rc;
daos_array_iod_t iod;
daos_range_t rg;
daos_sg_list_t sgl;
daos_iov_t iov;
int rc;
rc = daos_pool_query(pool, NULL, &info, NULL, NULL);
DCHECK(rc, "Error in querying pool\n");
/** set array location */
iod.arr_nr = 1;
rg.rg_len = length;
rg.rg_idx = param->offset;
iod.arr_rgs = &rg;
if (info.pi_ntargets - info.pi_ndisabled <= 1)
return;
/* choose the last alive one */
d_rank = info.pi_ntargets - 1 - info.pi_ndisabled;
rc = uuid_parse(o.daosPool, uuid);
DCHECK(rc, "Failed to parse 'daosPool': %s", o.daosPool);
if (rc != 0)
printf("Killing tgt rank: %d (total of %d of %d already disabled)\n",
d_rank, info.pi_ndisabled, info.pi_ntargets);
fflush(stdout);
rc = daos_mgmt_svc_rip(o.daosGroup, d_rank, true, NULL);
DCHECK(rc, "Error in killing server\n");
targets.tl_nr = 1;
targets.tl_ranks = &d_rank;
targets.tl_tgts = &tgt_idx;
svc.rl_ranks = svc_ranks;
ParseService(sizeof(svc_ranks)/ sizeof(svc_ranks[0]), &svc);
rc = daos_pool_tgt_exclude(uuid, NULL, &svc, &targets, NULL);
DCHECK(rc, "Error in excluding pool from poolmap\n");
rc = daos_pool_query(pool, NULL, &info, NULL, NULL);
DCHECK(rc, "Error in querying pool\n");
printf("%d targets succesfully disabled\n",
info.pi_ndisabled);
}
static void
kill_and_sync(IOR_param_t *param)
{
double start, end;
start = MPI_Wtime();
if (rank == 0)
kill_daos_server(param);
if (rank == 0)
printf("Done killing and excluding\n");
MPI_CHECK(MPI_Barrier(param->testComm),
"Failed to synchronize processes");
end = MPI_Wtime();
if (rank == 0)
printf("Time spent inducing failure: %lf\n", (end - start));
}
static IOR_offset_t DAOS_Xfer(int access, void *file, IOR_size_t *buffer,
IOR_offset_t length, IOR_param_t *param)
{
struct fileDescriptor *fd = file;
struct aio *aio;
uint64_t stripe;
IOR_offset_t stripeOffset;
uint64_t round;
int rc;
if (!daos_initialized)
GERR("DAOS is not initialized!");
if (param->filePerProc)
GERR("'filePerProc' not yet supported");
if (o.daosStripeSize % param->transferSize != 0)
GERR("'daosStripeSize' must be a multiple of 'transferSize'");
if (param->transferSize % o.daosRecordSize != 0)
GERR("'transferSize' must be a multiple of 'daosRecordSize'");
assert(length == param->transferSize);
assert(param->offset % length == 0);
/**
* Currently killing only during writes
* Kills once when 1/2 of blocksize is
* written
**/
total_size += length;
if (o.daosKill && (access == WRITE) &&
((param->blockSize)/2) == total_size) {
/** More than half written lets kill */
if (rank == 0)
printf("Killing and Syncing\n", rank);
kill_and_sync(param);
o.daosKill = 0;
}
/*
* Find an available AIO descriptor. If none, wait for one.
*/
while (nAios == 0)
AIOWait(param);
aio = cfs_list_entry(aios.next, struct aio, a_list);
cfs_list_move_tail(&aio->a_list, &aios);
nAios--;
stripe = (param->offset / o.daosStripeSize) %
o.daosStripeCount;
rc = snprintf(aio->a_dkeyBuf, sizeof aio->a_dkeyBuf, "%lu", stripe);
assert(rc < sizeof aio->a_dkeyBuf);
aio->a_dkey.iov_len = strlen(aio->a_dkeyBuf) + 1;
round = param->offset / (o.daosStripeSize * o.daosStripeCount);
stripeOffset = o.daosStripeSize * round +
param->offset % o.daosStripeSize;
if (o.daosStripeMax != 0)
stripeOffset %= o.daosStripeMax;
aio->a_recx.rx_idx = stripeOffset / o.daosRecordSize;
/*
* If the data written will be checked later, we have to copy in valid
* data instead of writing random bytes. If the data being read is for
* checking purposes, poison the buffer first.
*/
if (access == WRITE && param->checkWrite)
memcpy(aio->a_iov.iov_buf, buffer, length);
else if (access == WRITECHECK || access == READCHECK)
memset(aio->a_iov.iov_buf, '#', length);
INFO(VERBOSE_3, "Starting AIO %p (%d free %d busy): access %d "
"dkey '%s' iod <%llu, %llu> sgl <%p, %lu>", aio, nAios,
o.daosAios - nAios, access, (char *) aio->a_dkey.iov_buf,
(unsigned long long) aio->a_iod.iod_recxs->rx_idx,
(unsigned long long) aio->a_iod.iod_recxs->rx_nr,
aio->a_sgl.sg_iovs->iov_buf,
(unsigned long long) aio->a_sgl.sg_iovs->iov_buf_len);
/** set memory location */
sgl.sg_nr = 1;
daos_iov_set(&iov, buffer, length);
sgl.sg_iovs = &iov;
if (access == WRITE) {
rc = daos_obj_update(fd->object, DAOS_TX_NONE, &aio->a_dkey,
1 /* nr */, &aio->a_iod, &aio->a_sgl,
&aio->a_event);
DCHECK(rc, "Failed to start update operation");
} else {
rc = daos_obj_fetch(fd->object, DAOS_TX_NONE, &aio->a_dkey,
1 /* nr */, &aio->a_iod, &aio->a_sgl,
NULL /* maps */, &aio->a_event);
DCHECK(rc, "Failed to start fetch operation");
}
rc = daos_array_write(aoh, DAOS_TX_NONE, &iod, &sgl, NULL, NULL);
DCHECK(rc, "daos_array_write() failed (%d).", rc);
} else {
rc = daos_array_read(aoh, DAOS_TX_NONE, &iod, &sgl, NULL, NULL);
DCHECK(rc, "daos_array_read() failed (%d).", rc);
}
/*
* If this is a WRITECHECK or READCHECK, we are expected to fill data
* into the buffer before returning. Note that if this is a READ, we
* don't have to return valid data as WriteOrRead() doesn't care.
*/
if (access == WRITECHECK || access == READCHECK) {
while (o.daosAios - nAios > 0)
AIOWait(param);
memcpy(buffer, aio->a_sgl.sg_iovs->iov_buf, length);
}
return length;
return length;
}
static void DAOS_Close(void *file, IOR_param_t *param)
static void
DAOS_Close(void *file, IOR_param_t *param)
{
struct fileDescriptor *fd = file;
int rc;
if (!daos_initialized)
return;
while (o.daosAios - nAios > 0)
AIOWait(param);
AIOFini(param);
ObjectClose(fd->object);
ContainerClose(fd->container, param);
free(fd);
}
static void DAOS_Delete(char *testFileName, IOR_param_t *param)
{
uuid_t uuid;
int rc;
int rc;
if (!daos_initialized)
GERR("DAOS is not initialized!");
INFO(VERBOSE_2, "Deleting container %s", testFileName);
rc = daos_array_close(aoh, NULL);
DCHECK(rc, "daos_array_close() failed (%d).", rc);
rc = uuid_parse(testFileName, uuid);
DCHECK(rc, "Failed to parse 'testFile': %s", testFileName);
rc = daos_cont_destroy(pool, uuid, 1 /* force */, NULL /* ev */);
if (rc)
DCHECK(rc, "Failed to destroy container %s (%d)", testFileName, rc);
aoh.cookie = 0;
}
static char* DAOS_GetVersion()
static void
DAOS_Delete(char *testFileName, IOR_param_t *param)
{
daos_obj_id_t oid;
int rc;
if (!daos_initialized)
GERR("DAOS is not initialized!");
/** Convert file name into object ID */
gen_oid(testFileName, &oid);
/** open the array to verify it exists */
if (param->filePerProc || rank == 0) {
daos_size_t cell_size, chunk_size;
rc = daos_array_open(coh, oid, DAOS_TX_NONE, DAOS_OO_RW,
&cell_size, &chunk_size, &aoh, NULL);
DCHECK(rc, "daos_array_open() failed (%d).", rc);
if (cell_size != 1)
GERR("Invalid DAOS Array object.\n");
rc = daos_array_destroy(aoh, DAOS_TX_NONE, NULL);
DCHECK(rc, "daos_array_destroy() failed (%d).", rc);
rc = daos_array_close(aoh, NULL);
DCHECK(rc, "daos_array_close() failed (%d).", rc);
aoh.cookie = 0;
}
if (!param->filePerProc)
MPI_Bcast(&rc, 1, MPI_INT, 0, MPI_COMM_WORLD);
}
static char *
DAOS_GetVersion()
{
static char ver[1024] = {};
@ -819,17 +528,46 @@ static char* DAOS_GetVersion()
return ver;
}
static void DAOS_Fsync(void *file, IOR_param_t *param)
static void
DAOS_Fsync(void *file, IOR_param_t *param)
{
while (o.daosAios - nAios > 0)
AIOWait(param);
return;
}
static IOR_offset_t DAOS_GetFileSize(IOR_param_t *test, MPI_Comm testComm,
char *testFileName)
static IOR_offset_t
DAOS_GetFileSize(IOR_param_t *param, MPI_Comm testComm, char *testFileName)
{
/*
* Sizes are inapplicable to containers at the moment.
*/
return 0;
daos_obj_id_t oid;
daos_size_t size;
int rc;
if (!daos_initialized)
GERR("DAOS is not initialized!");
/** Convert file name into object ID */
gen_oid(testFileName, &oid);
/** open the array to verify it exists */
if (param->filePerProc || rank == 0) {
daos_size_t cell_size, chunk_size;
rc = daos_array_open(coh, oid, DAOS_TX_NONE, DAOS_OO_RO,
&cell_size, &chunk_size, &aoh, NULL);
DCHECK(rc, "daos_array_open() failed (%d).", rc);
if (cell_size != 1)
GERR("Invalid DAOS Array object.\n");
rc = daos_array_get_size(aoh, DAOS_TX_NONE, &size, NULL);
DCHECK(rc, "daos_array_get_size() failed (%d).", rc);
rc = daos_array_close(aoh, NULL);
DCHECK(rc, "daos_array_close() failed (%d).", rc);
aoh.cookie = 0;
}
if (!param->filePerProc)
MPI_Bcast(&size, 1, MPI_LONG, 0, MPI_COMM_WORLD);
return size;
}