Initial commit of the new aiori-HDFS module.

This provides an HDFS back-end, allowing IOR to exercise a Hadoop
Distributed File-System, plus corresponding changes throughout, to
integrate the new module into the build.  The commit compiles at LANL, but
hasn't been run yet.  We're currently waiting for some configuration on
machines that will eventually provide HDFS.  By default, configure ignores
the HDFS module.  You have to explicitly add --with-hdfs.
master
Jeff Inman 2014-08-13 16:53:24 -06:00
parent b7fcafc9ec
commit 0be8973c0e
9 changed files with 926 additions and 310 deletions

View File

@ -20,7 +20,7 @@ AX_PROG_CC_MPI
# Checks for libraries.
# Checks for header files.
AC_CHECK_HEADERS([fcntl.h libintl.h stdlib.h string.h strings.h sys/ioctl.h sys/param.h sys/statfs.h sys/statvfs.h sys/time.h unistd.h wchar.h gpfs.h gpfs_fcntl.h])
AC_CHECK_HEADERS([fcntl.h libintl.h stdlib.h string.h strings.h sys/ioctl.h sys/param.h sys/statfs.h sys/statvfs.h sys/time.h unistd.h wchar.h gpfs.h gpfs_fcntl.h plfs.h hdfs.h])
# Checks for typedefs, structures, and compiler characteristics.
AC_TYPE_SIZE_T
@ -46,15 +46,26 @@ AS_IF([test "x$with_lustre" != xno], [
])
])
# POSIX IO support
AC_ARG_WITH([posix],
[AS_HELP_STRING([--with-posix],
[support IO with POSIX backend @<:@default=yes@:>@])],
# HDF5 support
AC_ARG_WITH([hdf5],
[AS_HELP_STRING([--with-hdf5],
[support IO with HDF5 backend @<:@default=no@:>@])],
[],
[with_posix=yes])
AM_CONDITIONAL([USE_POSIX_AIORI], [test x$with_posix = xyes])
AM_COND_IF([USE_POSIX_AIORI],[
AC_DEFINE([USE_POSIX_AIORI], [], [Build POSIX backend AIORI])
[with_hdf5=no])
AM_CONDITIONAL([USE_HDF5_AIORI], [test x$with_hdf5 = xyes])
AM_COND_IF([USE_HDF5_AIORI],[
AC_DEFINE([USE_HDF5_AIORI], [], [Build HDF5 backend AIORI])
])
# HDFS support
AC_ARG_WITH([hdfs],
[AS_HELP_STRING([--with-hdfs],
[support IO with HDFS backend @<:@default=no@:>@])],
[],
[with_hdfs=no])
AM_CONDITIONAL([USE_HDFS_AIORI], [test x$with_hdfs = xyes])
AM_COND_IF([USE_HDFS_AIORI],[
AC_DEFINE([USE_HDFS_AIORI], [], [Build HDFS backend AIORI])
])
# MPIIO support
@ -68,17 +79,6 @@ AM_COND_IF([USE_MPIIO_AIORI],[
AC_DEFINE([USE_MPIIO_AIORI], [], [Build MPIIO backend AIORI])
])
# HDF5 support
AC_ARG_WITH([hdf5],
[AS_HELP_STRING([--with-hdf5],
[support IO with HDF5 backend @<:@default=no@:>@])],
[],
[with_hdf5=no])
AM_CONDITIONAL([USE_HDF5_AIORI], [test x$with_hdf5 = xyes])
AM_COND_IF([USE_HDF5_AIORI],[
AC_DEFINE([USE_HDF5_AIORI], [], [Build HDF5 backend AIORI])
])
# NCMPI (Parallel netcdf) support
AC_ARG_WITH([ncmpi],
[AS_HELP_STRING([--with-ncmpi],
@ -90,6 +90,31 @@ AM_COND_IF([USE_NCMPI_AIORI],[
AC_DEFINE([USE_NCMPI_AIORI], [], [Build NCMPI backend AIORI])
])
# PLFS IO support
AC_ARG_WITH([plfs],
[AS_HELP_STRING([--with-plfs],
[support IO with PLFS backend @<:@default=no@:>@])],
[],
[with_plfs=no])
AM_CONDITIONAL([USE_PLFS_AIORI], [test x$with_plfs = xyes])
AM_COND_IF([USE_PLFS_AIORI],[
AC_DEFINE([USE_PLFS_AIORI], [], [Build PLFS backend AIORI])
])
# POSIX IO support
AC_ARG_WITH([posix],
[AS_HELP_STRING([--with-posix],
[support IO with POSIX backend @<:@default=yes@:>@])],
[],
[with_posix=yes])
AM_CONDITIONAL([USE_POSIX_AIORI], [test x$with_posix = xyes])
AM_COND_IF([USE_POSIX_AIORI],[
AC_DEFINE([USE_POSIX_AIORI], [], [Build POSIX backend AIORI])
])
# Enable building "IOR", in all capitals
AC_ARG_ENABLE([caps],
[AS_HELP_STRING([--enable-caps],

View File

@ -3,24 +3,44 @@ if USE_CAPS
bin_PROGRAMS += IOR
endif
ior_SOURCES = ior.c utilities.c parse_options.c
ior_SOURCES += ior.h utilities.h parse_options.h aiori.h iordef.h
ior_LDADD =
ior_SOURCES =
ior_CPPFLAGS =
ior_LDFLAGS =
ior_LDADD =
if USE_POSIX_AIORI
ior_SOURCES += aiori-POSIX.c
ior_SOURCES += ior.c utilities.c parse_options.c
ior_SOURCES += ior.h utilities.h parse_options.h aiori.h iordef.h
if USE_HDFS_AIORI
# TBD: figure out how to find this from the corresponding bin/ dir in $PATH
# or pick an environment var to use (and set it in modulefiles)
# or provide a config-flag, to set a variable we use here
ior_SOURCES += aiori-HDFS.c
ior_CPPFLAGS += -I/opt/hadoop-2.2.0/include
ior_LDFLAGS += -L/opt/hadoop-2.2.0/lib/native
ior_LDADD += -lhdfs
endif
if USE_HDF5_AIORI
ior_SOURCES += aiori-HDF5.c
ior_LDADD += -lhdf5 -lz
endif
if USE_MPIIO_AIORI
ior_SOURCES += aiori-MPIIO.c
endif
if USE_HDF5_AIORI
ior_SOURCES += aiori-HDF5.c
ior_LDADD += -lhdf5 -lz
endif
if USE_NCMPI_AIORI
ior_SOURCES += aiori-NCMPI.c
ior_LDADD += -lpnetcdf
endif
if USE_PLFS_AIORI
ior_SOURCES += aiori-PLFS.c
endif
if USE_POSIX_AIORI
ior_SOURCES += aiori-POSIX.c
endif
IOR_SOURCES = $(ior_SOURCES)
IOR_LDADD = $(ior_LDADD)
IOT_CPPFLAGS = $(ior_CPPFLAGS)

527
src/aiori-HDFS.c Normal file
View File

@ -0,0 +1,527 @@
/* -*- mode: c; indent-tabs-mode: t; -*-
* vim:noexpandtab:
*
* Editing with tabs allows different users to pick their own indentation
* appearance without changing the file.
*/
/*
* Copyright (c) 2009, Los Alamos National Security, LLC All rights reserved.
* Copyright 2009. Los Alamos National Security, LLC. This software was produced
* under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National
* Laboratory (LANL), which is operated by Los Alamos National Security, LLC for
* the U.S. Department of Energy. The U.S. Government has rights to use,
* reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS
* ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
* ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is
* modified to produce derivative works, such modified software should be
* clearly marked, so as not to confuse it with the version available from
* LANL.
*
* Additionally, redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following conditions are
* met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* Neither the name of Los Alamos National Security, LLC, Los Alamos National
* Laboratory, LANL, the U.S. Government, nor the names of its contributors may be
* used to endorse or promote products derived from this software without specific
* prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
* OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*/
/******************************************************************************\
*
* Implement of abstract I/O interface for HDFS.
*
\******************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#ifdef __linux__
# include <sys/ioctl.h> /* necessary for: */
# define __USE_GNU /* O_DIRECT and */
# include <fcntl.h> /* IO operations */
# undef __USE_GNU
#endif /* __linux__ */
#include <errno.h>
#include <fcntl.h> /* IO operations */
#include <sys/stat.h>
#include <assert.h>
/*
#ifdef HAVE_LUSTRE_LUSTRE_USER_H
#include <lustre/lustre_user.h>
#endif
*/
#include "ior.h"
#include "aiori.h"
#include "iordef.h"
#ifndef open64 /* necessary for TRU64 -- */
# define open64 open /* unlikely, but may pose */
#endif /* not open64 */ /* conflicting prototypes */
#ifndef lseek64 /* necessary for TRU64 -- */
# define lseek64 lseek /* unlikely, but may pose */
#endif /* not lseek64 */ /* conflicting prototypes */
#ifndef O_BINARY /* Required on Windows */
# define O_BINARY 0
#endif
#include "hdfs.h"
/**************************** P R O T O T Y P E S *****************************/
static void *HDFS_Create(char *, IOR_param_t *);
static void *HDFS_Open(char *, IOR_param_t *);
static IOR_offset_t HDFS_Xfer(int, void *, IOR_size_t *,
IOR_offset_t, IOR_param_t *);
static void HDFS_Close(void *, IOR_param_t *);
static void HDFS_Delete(char *, IOR_param_t *);
static void HDFS_SetVersion(IOR_param_t *);
static void HDFS_Fsync(void *, IOR_param_t *);
static IOR_offset_t HDFS_GetFileSize(IOR_param_t *, MPI_Comm, char *);
/************************** D E C L A R A T I O N S ***************************/
ior_aiori_t hdfs_aiori = {
"HDFS",
HDFS_Create,
HDFS_Open,
HDFS_Xfer,
HDFS_Close,
HDFS_Delete,
HDFS_SetVersion,
HDFS_Fsync,
HDFS_GetFileSize
};
/***************************** F U N C T I O N S ******************************/
/* This is identical to the one in aiori-POSIX.c Doesn't seem like
* it would be appropriate in utilities.c.
*/
void hdfs_set_o_direct_flag(int *fd)
{
/* note that TRU64 needs O_DIRECTIO, SunOS uses directio(),
and everyone else needs O_DIRECT */
#ifndef O_DIRECT
# ifndef O_DIRECTIO
WARN("cannot use O_DIRECT");
# define O_DIRECT 000000
# else /* O_DIRECTIO */
# define O_DIRECT O_DIRECTIO
# endif /* not O_DIRECTIO */
#endif /* not O_DIRECT */
*fd |= O_DIRECT;
}
/*
* "Connect" to an HDFS file-system. HDFS requires this be done before and
* files are opened. It is easy for ior_aiori.open/create to assure that
* we connect, if we haven't already done so. However, there's not a
* simple way to assure that we disconnect after the last file-close. For
* now, we'll make a special call at the end of ior.c
*
* NOTE: It's okay to call this thing whenever you need to be sure the HDFS
* filesystem is connected.
*/
static void hdfs_connect( IOR_param_t* param ) {
if ( param->hdfs_fs )
return;
/* initialize a builder, holding parameters for hdfsBuilderConnect() */
struct hdfsBuilder* builder = hdfsNewBuilder();
if ( ! builder )
ERR_SIMPLE("couldn't create and hdsfsBuilder");
hdfsBuilderSetForceNewInstance ( builder ); /* don't use cached instance */
hdfsBuilderSetNameNode ( builder, param->hdfs_name_node );
hdfsBuilderSetNameNodePort( builder, param->hdfs_name_node_port );
/* NOTE: hdfsBuilderConnect() frees the builder */
param->hdfs_fs = hdfsBuilderConnect( builder );
if ( ! param->hdfs_fs )
ERR_SIMPLE("hdsfsBuilderConnect failed");
}
static void hdfs_disconnect( IOR_param_t* param ) {
if ( param->hdfs_fs ) {
hdfsDisconnect( param->hdfs_fs );
}
}
/*
* Create or open the file. Pass TRUE if creating and FALSE if opening an existing file.
*/
static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsigned char createFile ) {
hdfsFile hdfs_file = NULL;
int fd_oflags = 0, hdfs_return;
/* initialize file-system handle, if needed */
if ( ! param->hdfs_fs ) {
hdfs_connect( param );
}
/*
* Check for unsupported flags.
*
* If they want RDWR, we don't know if they're going to try to do both, so we
* can't default to either O_RDONLY or O_WRONLY. Thus, we error and exit.
*
* The other two, we just note that they are not supported and don't do them.
*/
if ( param->openFlags & IOR_RDWR ) {
ERR( "Opening or creating a file in RDWR is not implemented in HDFS" );
}
if ( param->openFlags & IOR_EXCL ) {
fprintf( stdout, "Opening or creating a file in Exclusive mode is not implemented in HDFS\n" );
}
if ( param->openFlags & IOR_APPEND ) {
fprintf( stdout, "Opening or creating a file for appending is not implemented in HDFS\n" );
}
/*
* Setup the flags to be used.
*/
if ( createFile == TRUE ) {
fd_oflags = O_CREAT;
}
if ( param->openFlags & IOR_WRONLY ) {
if ( !param->filePerProc ) {
// in N-1 mode, only rank 0 truncates the file
if ( rank != 0 ) {
fd_oflags |= O_WRONLY;
} else {
fd_oflags |= O_TRUNC;
fd_oflags |= O_WRONLY;
}
} else {
// in N-N mode, everyone does truncate
fd_oflags |= O_TRUNC;
fd_oflags |= O_WRONLY;
}
} else {
fd_oflags |= O_RDONLY;
}
/*
* Now see if O_DIRECT is needed.
*/
if ( param->useO_DIRECT == TRUE ) {
hdfs_set_o_direct_flag( &fd_oflags );
}
/*
* For N-1 write, All other ranks wait for Rank 0 to open the file.
* this is bec 0 does truncate and rest don't
* it would be dangerous for all to do truncate as they might
* truncate each other's writes
*/
if (( param->openFlags & IOR_WRONLY ) &&
( !param->filePerProc ) &&
( rank != 0 )) {
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
}
/*
* Now rank zero can open and truncate, if necessary.
*/
hdfs_file = hdfsOpenFile( param->hdfs_fs,
testFileName,
fd_oflags,
param->transferSize,
param->hdfs_replicas,
param->hdfs_block_size);
if ( ! hdfs_file ) {
ERR( "Failed to open the file" );
}
/*
* For N-1 write, Rank 0 waits for the other ranks to open the file after it has.
*/
if (( param->openFlags & IOR_WRONLY ) &&
( !param->filePerProc ) &&
( rank == 0 )) {
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
}
return ((void *) hdfs_file );
}
/*
* Create and open a file through the HDFS interface.
*/
static void *HDFS_Create( char *testFileName, IOR_param_t * param ) {
return HDFS_Create_Or_Open( testFileName, param, TRUE );
}
/*
* Open a file through the HDFS interface.
*/
static void *HDFS_Open( char *testFileName, IOR_param_t * param ) {
if ( param->openFlags & IOR_CREAT ) {
return HDFS_Create_Or_Open( testFileName, param, TRUE );
}
else {
return HDFS_Create_Or_Open( testFileName, param, FALSE );
}
}
/*
* Write or read to file using the HDFS interface.
*/
static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_param_t * param) {
int xferRetries = 0;
long long remaining = (long long)length;
char* ptr = (char *)buffer;
long long rc;
off_t offset = param->offset;
hdfsFS hdfs_fs = param->hdfs_fs; /* (void*) */
hdfsFile hdfs_file = (hdfsFile)file; /* (void*) */
while ( remaining > 0 ) {
/* write/read file */
if (access == WRITE) { /* WRITE */
if (verbose >= VERBOSE_4) {
fprintf( stdout, "task %d writing to offset %lld\n",
rank,
param->offset + length - remaining);
}
rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining );
if ( rc < 0 ) {
ERR( "hdfsWrite() failed" );
}
offset += rc;
if ( param->fsyncPerWrite == TRUE ) {
HDFS_Fsync( hdfs_file, param );
}
}
else { /* READ or CHECK */
if (verbose >= VERBOSE_4) {
fprintf( stdout, "task %d reading from offset %lld\n",
rank,
param->offset + length - remaining );
}
rc = hdfsRead( hdfs_fs, hdfs_file, ptr, remaining );
if ( rc == 0 ) {
ERR( "hdfs_read() returned EOF prematurely" );
}
if ( rc < 0 ) {
ERR( "hdfs_read() failed" );
}
offset += rc;
}
if ( rc < remaining ) {
fprintf(stdout, "WARNING: Task %d, partial %s, %lld of %lld bytes at offset %lld\n",
rank,
access == WRITE ? "hdfsWrite()" : "hdfs_read()",
rc, remaining,
param->offset + length - remaining );
if ( param->singleXferAttempt == TRUE ) {
MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ), "barrier error" );
}
if ( xferRetries > MAX_RETRY ) {
ERR( "too many retries -- aborting" );
}
}
assert( rc >= 0 );
assert( rc <= remaining );
remaining -= rc;
ptr += rc;
xferRetries++;
}
return ( length );
}
/*
* Perform hdfs_sync().
*/
static void HDFS_Fsync( void *fd, IOR_param_t * param ) {
hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */
hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */
if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) {
EWARN( "hdfs_sync() failed" );
}
}
/*
* Close a file through the HDFS interface.
*/
static void HDFS_Close( void *fd, IOR_param_t * param ) {
hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */
hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */
int open_flags;
if ( param->openFlags & IOR_WRONLY ) {
open_flags = O_CREAT | O_WRONLY;
} else {
open_flags = O_RDONLY;
}
if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) {
ERR( "hdfsCloseFile() failed" );
}
free( fd );
}
/*
* Delete a file through the HDFS interface.
*
* NOTE: The signature for ior_aiori.delete doesn't include a parameter to
* select recursive deletes. We'll assume that that is never needed.
*/
static void HDFS_Delete( char *testFileName, IOR_param_t * param ) {
char errmsg[256];
if ( ! param->hdfs_fs )
ERR_SIMPLE( "Can't delete a file without an HDFS connection" );
if ( hdfsDelete( param->hdfs_fs, testFileName, 0 ) != 0 ) {
sprintf(errmsg,
"[RANK %03d]: hdfsDelete() of file \"%s\" failed\n",
rank, testFileName);
EWARN( errmsg );
}
}
/*
* Determine api version.
*/
static void HDFS_SetVersion( IOR_param_t * param ) {
strcpy( param->apiVersion, param->api );
}
/*
* Use hdfsGetPathInfo() to get info about file?
* Is there an fstat we can use on hdfs?
* Should we just use POSIX fstat?
*/
static IOR_offset_t
HDFS_GetFileSize(IOR_param_t * param,
MPI_Comm testComm,
char * testFileName) {
IOR_offset_t aggFileSizeFromStat;
IOR_offset_t tmpMin, tmpMax, tmpSum;
/* make sure file-system is connected */
hdfs_connect( param );
/* file-info struct includes size in bytes */
hdfsFileInfo* info = hdfsGetPathInfo( param->hdfs_fs, testFileName );
if ( ! info )
ERR_SIMPLE( "hdfsGetPathInfo() failed" );
aggFileSizeFromStat = info->mSize;
if ( param->filePerProc == TRUE ) {
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpSum, 1, MPI_LONG_LONG_INT, MPI_SUM, testComm ),
"cannot total data moved" );
aggFileSizeFromStat = tmpSum;
}
else {
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpMin, 1, MPI_LONG_LONG_INT, MPI_MIN, testComm ),
"cannot total data moved" );
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpMax, 1, MPI_LONG_LONG_INT, MPI_MAX, testComm ),
"cannot total data moved" );
if ( tmpMin != tmpMax ) {
if ( rank == 0 ) {
WARN( "inconsistent file size by different tasks" );
}
/* incorrect, but now consistent across tasks */
aggFileSizeFromStat = tmpMin;
}
}
return ( aggFileSizeFromStat );
}

View File

@ -215,6 +215,11 @@ static void *MPIIO_Open(char *testFileName, IOR_param_t * param)
static IOR_offset_t MPIIO_Xfer(int access, void *fd, IOR_size_t * buffer,
IOR_offset_t length, IOR_param_t * param)
{
/* NOTE: The second arg is (void *) for reads, and (const void *)
for writes. Therefore, one of the two sets of assignments below
will get "assignment from incompatible pointer-type" warnings,
if we only use this one set of signatures. */
int (MPIAPI * Access) (MPI_File, void *, int,
MPI_Datatype, MPI_Status *);
int (MPIAPI * Access_at) (MPI_File, MPI_Offset, void *, int,

View File

@ -62,10 +62,11 @@ typedef struct ior_aiori {
IOR_offset_t (*get_file_size)(IOR_param_t *, MPI_Comm, char *);
} ior_aiori_t;
ior_aiori_t posix_aiori;
ior_aiori_t mpiio_aiori;
ior_aiori_t hdf5_aiori;
ior_aiori_t hdfs_aiori;
ior_aiori_t mpiio_aiori;
ior_aiori_t ncmpi_aiori;
ior_aiori_t posix_aiori;
IOR_offset_t MPIIO_GetFileSize(IOR_param_t * test, MPI_Comm testComm,
char *testFileName);

554
src/ior.c
View File

@ -51,17 +51,21 @@ double wall_clock_deviation;
ior_aiori_t *backend;
ior_aiori_t *available_aiori[] = {
#ifdef USE_POSIX_AIORI
&posix_aiori,
#ifdef USE_HDF5_AIORI
&hdf5_aiori,
#endif
#ifdef USE_HDFS_AIORI
&hdfs_aiori,
#endif
#ifdef USE_MPIIO_AIORI
&mpiio_aiori,
#endif
#ifdef USE_HDF5_AIORI
&hdf5_aiori,
#endif
#ifdef USE_NCMPI_AIORI
&ncmpi_aiori,
#endif
#ifdef USE_POSIX_AIORI
&posix_aiori,
#endif
NULL
};
@ -116,7 +120,8 @@ int main(int argc, char **argv)
/* Sanity check, we were compiled with SOME backend, right? */
if (available_aiori[0] == NULL) {
ERR("No IO backends compiled into ior. That should not have happened.");
ERR("No IO backends compiled into ior. "
"Run 'configure --with-<backend>', and recompile.");
}
/* setup tests before verifying test validity */
@ -129,10 +134,10 @@ int main(int argc, char **argv)
DisplayUsage(argv);
}
PrintHeader(argc, argv);
PrintHeader(argc, argv);
/* perform each test */
for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
verbose = tptr->params.verbose;
if (rank == 0 && verbose >= VERBOSE_0) {
ShowTestInfo(&tptr->params);
@ -146,15 +151,15 @@ int main(int argc, char **argv)
if (verbose < 0)
/* always print final summary */
verbose = 0;
PrintLongSummaryAllTests(tests_head);
PrintLongSummaryAllTests(tests_head);
/* display finish time */
if (rank == 0 && verbose >= VERBOSE_0) {
fprintf(stdout, "\n");
fprintf(stdout, "Finished: %s", CurrentTimeString());
fprintf(stdout, "\n");
fprintf(stdout, "Finished: %s", CurrentTimeString());
}
DestroyTests(tests_head);
DestroyTests(tests_head);
MPI_CHECK(MPI_Finalize(), "cannot finalize MPI");
@ -168,14 +173,16 @@ int main(int argc, char **argv)
*/
void init_IOR_Param_t(IOR_param_t * p)
{
assert(available_aiori[0] != NULL);
memset(p, 0, sizeof(IOR_param_t));
p->mode = IOR_IRUSR | IOR_IWUSR | IOR_IRGRP | IOR_IWGRP;
p->openFlags = IOR_RDWR | IOR_CREAT;
assert(available_aiori[0] != NULL);
strncpy(p->api, available_aiori[0]->name, MAX_STR);
strncpy(p->platform, "HOST(OSTYPE)", MAX_STR);
strncpy(p->testFileName, "testFile", MAXPATHLEN);
p->nodes = 1;
p->tasksPerNode = 1;
p->repetitions = 1;
@ -189,6 +196,11 @@ void init_IOR_Param_t(IOR_param_t * p)
p->testComm = MPI_COMM_WORLD;
p->setAlignment = 1;
p->lustre_start_ost = -1;
p->hdfs_name_node = "default";
p->hdfs_name_node_port = 0; /* ??? */
p->hdfs_fs = NULL;
p->hdfs_replicas = 0;
p->hdfs_block_size = 0;
}
/*
@ -285,8 +297,8 @@ static void CheckForOutliers(IOR_param_t * test, double **timer, int rep,
*/
static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep)
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
MPI_CHECK(MPI_Allreduce(&dataMoved, &results->aggFileSizeFromXfer[rep],
1, MPI_LONG_LONG_INT, MPI_SUM, testComm),
@ -439,8 +451,8 @@ static int CountErrors(IOR_param_t * test, int access, int errors)
WARN("overflow in errors counted");
allErrors = -1;
}
fprintf(stdout, "WARNING: incorrect data on %s (%d errors found).\n",
access == WRITECHECK ? "write" : "read", allErrors);
fprintf(stdout, "WARNING: incorrect data on %s (%d errors found).\n",
access == WRITECHECK ? "write" : "read", allErrors);
fprintf(stdout,
"Used Time Stamp %u (0x%x) for Data Signature\n",
test->timeStampSignatureValue,
@ -475,7 +487,7 @@ static int CountTasksPerNode(int numTasks, MPI_Comm comm)
rc = gethostname(localhost, MAX_STR);
if (rc == -1) {
/* This node won't match task 0's hostname...expect in the
/* This node won't match task 0's hostname...except in the
case where ALL gethostname() calls fail, in which
case ALL nodes will appear to be on the same node.
We'll handle that later. */
@ -546,37 +558,37 @@ static void aligned_buffer_free(void *buf)
void AllocResults(IOR_test_t *test)
{
int reps;
if (test->results != NULL)
return;
int reps;
if (test->results != NULL)
return;
reps = test->params.repetitions;
test->results = (IOR_results_t *)malloc(sizeof(IOR_results_t));
if (test->results == NULL)
ERR("malloc of IOR_results_t failed");
reps = test->params.repetitions;
test->results = (IOR_results_t *)malloc(sizeof(IOR_results_t));
if (test->results == NULL)
ERR("malloc of IOR_results_t failed");
test->results->writeTime = (double *)malloc(reps * sizeof(double));
if (test->results->writeTime == NULL)
ERR("malloc of writeTime array failed");
memset(test->results->writeTime, 0, reps * sizeof(double));
test->results->writeTime = (double *)malloc(reps * sizeof(double));
if (test->results->writeTime == NULL)
ERR("malloc of writeTime array failed");
memset(test->results->writeTime, 0, reps * sizeof(double));
test->results->readTime = (double *)malloc(reps * sizeof(double));
if (test->results->readTime == NULL)
ERR("malloc of readTime array failed");
memset(test->results->readTime, 0, reps * sizeof(double));
test->results->readTime = (double *)malloc(reps * sizeof(double));
if (test->results->readTime == NULL)
ERR("malloc of readTime array failed");
memset(test->results->readTime, 0, reps * sizeof(double));
test->results->aggFileSizeFromStat =
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
if (test->results->aggFileSizeFromStat == NULL)
ERR("malloc of aggFileSizeFromStat failed");
test->results->aggFileSizeFromXfer =
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
if (test->results->aggFileSizeFromXfer == NULL)
ERR("malloc of aggFileSizeFromXfer failed");
test->results->aggFileSizeForBW =
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
(IOR_offset_t *)malloc(reps * sizeof(IOR_offset_t));
if (test->results->aggFileSizeForBW == NULL)
ERR("malloc of aggFileSizeForBW failed");
@ -584,14 +596,14 @@ void AllocResults(IOR_test_t *test)
void FreeResults(IOR_test_t *test)
{
if (test->results != NULL) {
free(test->results->aggFileSizeFromStat);
free(test->results->aggFileSizeFromXfer);
free(test->results->aggFileSizeForBW);
free(test->results->readTime);
free(test->results->writeTime);
free(test->results);
}
if (test->results != NULL) {
free(test->results->aggFileSizeFromStat);
free(test->results->aggFileSizeFromXfer);
free(test->results->aggFileSizeForBW);
free(test->results->readTime);
free(test->results->writeTime);
free(test->results);
}
}
@ -611,24 +623,24 @@ IOR_test_t *CreateTest(IOR_param_t *init_params, int test_num)
newTest->params.tasksPerNode = tasksPerNode;
newTest->params.id = test_num;
newTest->next = NULL;
newTest->results = NULL;
newTest->results = NULL;
return newTest;
}
static void DestroyTest(IOR_test_t *test)
{
FreeResults(test);
free(test);
FreeResults(test);
free(test);
}
static void DestroyTests(IOR_test_t *tests_head)
{
IOR_test_t *tptr, *next;
IOR_test_t *tptr, *next;
for (tptr = tests_head; tptr != NULL; tptr = next) {
next = tptr->next;
DestroyTest(tptr);
}
for (tptr = tests_head; tptr != NULL; tptr = next) {
next = tptr->next;
DestroyTest(tptr);
}
}
/*
@ -1185,15 +1197,15 @@ static void ReduceIterResults(IOR_test_t *test, double **timer, int rep,
int access)
{
double reduced[12] = { 0 };
double diff[6];
double *diff_subset;
double totalTime;
double bw;
double diff[6];
double *diff_subset;
double totalTime;
double bw;
enum { RIGHT, LEFT };
int i;
MPI_Op op;
assert(access == WRITE || access == READ);
assert(access == WRITE || access == READ);
/* Find the minimum start time of the even numbered timers, and the
maximum finish time for the odd numbered timers */
@ -1204,46 +1216,46 @@ static void ReduceIterResults(IOR_test_t *test, double **timer, int rep,
}
if (rank != 0) {
/* Only rank 0 tallies and prints the results. */
return;
}
/* Only rank 0 tallies and prints the results. */
return;
}
/* Calculate elapsed times and throughput numbers */
for (i = 0; i < 6; i++) {
diff[i] = reduced[2 * i + 1] - reduced[2 * i];
}
if (access == WRITE) {
totalTime = reduced[5] - reduced[0];
test->results->writeTime[rep] = totalTime;
diff_subset = &diff[0];
} else { /* READ */
totalTime = reduced[11] - reduced[6];
test->results->readTime[rep] = totalTime;
diff_subset = &diff[3];
}
/* Calculate elapsed times and throughput numbers */
for (i = 0; i < 6; i++) {
diff[i] = reduced[2 * i + 1] - reduced[2 * i];
}
if (access == WRITE) {
totalTime = reduced[5] - reduced[0];
test->results->writeTime[rep] = totalTime;
diff_subset = &diff[0];
} else { /* READ */
totalTime = reduced[11] - reduced[6];
test->results->readTime[rep] = totalTime;
diff_subset = &diff[3];
}
if (verbose < VERBOSE_0) {
return;
}
return;
}
fprintf(stdout, "%-10s", access == WRITE ? "write" : "read");
bw = (double)test->results->aggFileSizeForBW[rep] / totalTime;
PPDouble(LEFT, bw / MEBIBYTE, " ");
PPDouble(LEFT, (double)test->params.blockSize / KIBIBYTE, " ");
PPDouble(LEFT, (double)test->params.transferSize / KIBIBYTE, " ");
PPDouble(LEFT, diff_subset[0], " ");
PPDouble(LEFT, diff_subset[1], " ");
PPDouble(LEFT, diff_subset[2], " ");
PPDouble(LEFT, totalTime, " ");
fprintf(stdout, "%-4d\n", rep);
fprintf(stdout, "%-10s", access == WRITE ? "write" : "read");
bw = (double)test->results->aggFileSizeForBW[rep] / totalTime;
PPDouble(LEFT, bw / MEBIBYTE, " ");
PPDouble(LEFT, (double)test->params.blockSize / KIBIBYTE, " ");
PPDouble(LEFT, (double)test->params.transferSize / KIBIBYTE, " ");
PPDouble(LEFT, diff_subset[0], " ");
PPDouble(LEFT, diff_subset[1], " ");
PPDouble(LEFT, diff_subset[2], " ");
PPDouble(LEFT, totalTime, " ");
fprintf(stdout, "%-4d\n", rep);
fflush(stdout);
fflush(stdout);
}
static void PrintRemoveTiming(double start, double finish, int rep)
{
if (rank != 0 || verbose < VERBOSE_0)
return;
return;
printf("remove - - - - - - ");
PPDouble(1, finish-start, " ");
@ -1381,21 +1393,21 @@ static void XferBuffersFree(void *buffer, void *checkBuffer,
*/
static void PrintEarlyHeader()
{
if (rank != 0)
return;
if (rank != 0)
return;
printf("IOR-" META_VERSION ": MPI Coordinated Test of Parallel I/O\n");
printf("\n");
fflush(stdout);
printf("IOR-" META_VERSION ": MPI Coordinated Test of Parallel I/O\n");
printf("\n");
fflush(stdout);
}
static void PrintHeader(int argc, char **argv)
{
struct utsname unamebuf;
int i;
int i;
if (rank != 0)
return;
if (rank != 0)
return;
fprintf(stdout, "Began: %s", CurrentTimeString());
fprintf(stdout, "Command line used:");
@ -1414,7 +1426,7 @@ static void PrintHeader(int argc, char **argv)
unamebuf.version, unamebuf.machine);
}
}
fprintf(stdout, "\n");
fprintf(stdout, "\n");
#ifdef _NO_MPI_TIMER
if (verbose >= VERBOSE_2)
fprintf(stdout, "Using unsynchronized POSIX timer\n");
@ -1438,7 +1450,7 @@ static void PrintHeader(int argc, char **argv)
}
fprintf(stdout, "ENDING ENVIRON LOOP\n");
}
fflush(stdout);
fflush(stdout);
}
/*
@ -1446,7 +1458,7 @@ static void PrintHeader(int argc, char **argv)
*/
static void ShowTestInfo(IOR_param_t *params)
{
fprintf(stdout, "\n");
fprintf(stdout, "\n");
fprintf(stdout, "Test %d started: %s", params->id, CurrentTimeString());
if (verbose >= VERBOSE_1) {
/* if pvfs2:, then skip */
@ -1471,7 +1483,7 @@ static void ShowSetup(IOR_param_t *params)
printf("\tapi = %s\n", params->apiVersion);
printf("\ttest filename = %s\n", params->testFileName);
printf("\taccess = ");
printf(params->filePerProc ? "file-per-process" : "single-shared-file");
printf(params->filePerProc ? "file-per-process" : "single-shared-file");
if (verbose >= VERBOSE_1 && strcmp(params->api, "POSIX") != 0) {
printf(params->collective == FALSE ? ", independent" : ", collective");
}
@ -1617,57 +1629,57 @@ static void ShowTest(IOR_param_t * test)
static double mean_of_array_of_doubles(double *values, int len)
{
double tot = 0.0;
int i;
double tot = 0.0;
int i;
for (i = 0; i < len; i++) {
tot += values[i];
}
return tot / len;
for (i = 0; i < len; i++) {
tot += values[i];
}
return tot / len;
}
struct results {
double min;
double max;
double mean;
double var;
double sd;
double sum;
double *val;
double min;
double max;
double mean;
double var;
double sd;
double sum;
double *val;
};
static struct results *bw_values(int reps, IOR_offset_t *agg_file_size, double *vals)
{
struct results *r;
int i;
struct results *r;
int i;
r = (struct results *)malloc(sizeof(struct results)
+ (reps * sizeof(double)));
if (r == NULL)
ERR("malloc failed");
r->val = (double *)&r[1];
r = (struct results *)malloc(sizeof(struct results)
+ (reps * sizeof(double)));
if (r == NULL)
ERR("malloc failed");
r->val = (double *)&r[1];
for (i = 0; i < reps; i++) {
r->val[i] = (double)agg_file_size[i] / vals[i];
if (i == 0) {
r->min = r->val[i];
r->max = r->val[i];
r->sum = 0.0;
}
r->min = MIN(r->min, r->val[i]);
r->max = MAX(r->max, r->val[i]);
r->sum += r->val[i];
}
r->mean = r->sum / reps;
r->var = 0.0;
for (i = 0; i < reps; i++) {
r->var += pow((r->mean - r->val[i]), 2);
}
r->var = r->var / reps;
r->sd = sqrt(r->var);
for (i = 0; i < reps; i++) {
r->val[i] = (double)agg_file_size[i] / vals[i];
if (i == 0) {
r->min = r->val[i];
r->max = r->val[i];
r->sum = 0.0;
}
r->min = MIN(r->min, r->val[i]);
r->max = MAX(r->max, r->val[i]);
r->sum += r->val[i];
}
r->mean = r->sum / reps;
r->var = 0.0;
for (i = 0; i < reps; i++) {
r->var += pow((r->mean - r->val[i]), 2);
}
r->var = r->var / reps;
r->sd = sqrt(r->var);
return r;
return r;
}
/*
@ -1677,17 +1689,17 @@ static struct results *bw_values(int reps, IOR_offset_t *agg_file_size, double *
*/
static void PrintLongSummaryOneOperation(IOR_test_t *test, double *times, char *operation)
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
struct results *bw;
int reps;
if (rank != 0 || verbose < VERBOSE_0)
return;
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
struct results *bw;
int reps;
if (rank != 0 || verbose < VERBOSE_0)
return;
reps = params->repetitions;
reps = params->repetitions;
bw = bw_values(reps, results->aggFileSizeForBW, times);
bw = bw_values(reps, results->aggFileSizeForBW, times);
fprintf(stdout, "%-9s ", operation);
fprintf(stdout, "%10.2f ", bw->max / MEBIBYTE);
@ -1712,15 +1724,15 @@ static void PrintLongSummaryOneOperation(IOR_test_t *test, double *times, char *
fprintf(stdout, "%s ", params->api);
fprintf(stdout, "%d", params->referenceNumber);
fprintf(stdout, "\n");
fflush(stdout);
fflush(stdout);
free(bw);
free(bw);
}
static void PrintLongSummaryOneTest(IOR_test_t *test)
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
if (params->writeFile)
PrintLongSummaryOneOperation(test, results->writeTime, "write");
@ -1730,13 +1742,13 @@ static void PrintLongSummaryOneTest(IOR_test_t *test)
static void PrintLongSummaryHeader()
{
if (rank != 0 || verbose < VERBOSE_0)
return;
if (rank != 0 || verbose < VERBOSE_0)
return;
fprintf(stdout, "\n");
fprintf(stdout, "%-9s %10s %10s %10s %10s %10s",
fprintf(stdout, "\n");
fprintf(stdout, "%-9s %10s %10s %10s %10s %10s",
"Operation", "Max(MiB)", "Min(MiB)", "Mean(MiB)", "StdDev",
"Mean(s)");
"Mean(s)");
fprintf(stdout, " Test# #Tasks tPN reps fPP reord reordoff reordrand seed"
" segcnt blksiz xsize aggsize API RefNum\n");
}
@ -1745,51 +1757,51 @@ static void PrintLongSummaryAllTests(IOR_test_t *tests_head)
{
IOR_test_t *tptr;
if (rank != 0 || verbose < VERBOSE_0)
return;
if (rank != 0 || verbose < VERBOSE_0)
return;
fprintf(stdout, "\n");
fprintf(stdout, "Summary of all tests:");
fprintf(stdout, "\n");
fprintf(stdout, "Summary of all tests:");
PrintLongSummaryHeader();
for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
PrintLongSummaryOneTest(tptr);
}
}
}
static void PrintShortSummary(IOR_test_t * test)
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
double max_write = 0.0;
double max_read = 0.0;
double bw;
int reps;
int i;
if (rank != 0 || verbose < VERBOSE_0)
return;
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
double max_write = 0.0;
double max_read = 0.0;
double bw;
int reps;
int i;
if (rank != 0 || verbose < VERBOSE_0)
return;
reps = params->repetitions;
reps = params->repetitions;
max_write = results->writeTime[0];
max_read = results->readTime[0];
for (i = 0; i < reps; i++) {
bw = (double)results->aggFileSizeForBW[i]/results->writeTime[i];
max_write = MAX(bw, max_write);
bw = (double)results->aggFileSizeForBW[i]/results->readTime[i];
max_read = MAX(bw, max_read);
}
max_write = results->writeTime[0];
max_read = results->readTime[0];
for (i = 0; i < reps; i++) {
bw = (double)results->aggFileSizeForBW[i]/results->writeTime[i];
max_write = MAX(bw, max_write);
bw = (double)results->aggFileSizeForBW[i]/results->readTime[i];
max_read = MAX(bw, max_read);
}
fprintf(stdout, "\n");
if (params->writeFile) {
fprintf(stdout, "Max Write: %.2f MiB/sec (%.2f MB/sec)\n",
max_write/MEBIBYTE, max_write/MEGABYTE);
}
if (params->readFile) {
fprintf(stdout, "Max Read: %.2f MiB/sec (%.2f MB/sec)\n",
max_read/MEBIBYTE, max_read/MEGABYTE);
}
fprintf(stdout, "\n");
if (params->writeFile) {
fprintf(stdout, "Max Write: %.2f MiB/sec (%.2f MB/sec)\n",
max_write/MEBIBYTE, max_write/MEGABYTE);
}
if (params->readFile) {
fprintf(stdout, "Max Read: %.2f MiB/sec (%.2f MB/sec)\n",
max_read/MEBIBYTE, max_read/MEGABYTE);
}
}
/*
@ -1804,7 +1816,7 @@ static void *malloc_and_touch(size_t size)
if (size == 0)
return NULL;
page_size = sysconf(_SC_PAGESIZE);
page_size = sysconf(_SC_PAGESIZE);
buf = (char *)malloc(size);
if (buf == NULL)
@ -1819,59 +1831,59 @@ static void *malloc_and_touch(size_t size)
static void file_hits_histogram(IOR_param_t *params)
{
int *rankoffs;
int *filecont;
int *filehits;
int ifile;
int jfile;
int *rankoffs;
int *filecont;
int *filehits;
int ifile;
int jfile;
if (rank == 0) {
rankoffs = (int *)malloc(params->numTasks * sizeof(int));
filecont = (int *)malloc(params->numTasks * sizeof(int));
filehits = (int *)malloc(params->numTasks * sizeof(int));
}
if (rank == 0) {
rankoffs = (int *)malloc(params->numTasks * sizeof(int));
filecont = (int *)malloc(params->numTasks * sizeof(int));
filehits = (int *)malloc(params->numTasks * sizeof(int));
}
MPI_CHECK(MPI_Gather(&rankOffset, 1, MPI_INT, rankoffs,
1, MPI_INT, 0, MPI_COMM_WORLD),
"MPI_Gather error");
MPI_CHECK(MPI_Gather(&rankOffset, 1, MPI_INT, rankoffs,
1, MPI_INT, 0, MPI_COMM_WORLD),
"MPI_Gather error");
if (rank != 0)
return;
if (rank != 0)
return;
memset((void *)filecont, 0, params->numTasks * sizeof(int));
for (ifile = 0; ifile < params->numTasks; ifile++) {
filecont[(ifile + rankoffs[ifile]) % params->numTasks]++;
}
memset((void *)filehits, 0, params->numTasks * sizeof(int));
for (ifile = 0; ifile < params->numTasks; ifile++)
for (jfile = 0; jfile < params->numTasks; jfile++) {
if (ifile == filecont[jfile])
filehits[ifile]++;
}
fprintf(stdout, "#File Hits Dist:");
jfile = 0;
ifile = 0;
while (jfile < params->numTasks && ifile < params->numTasks) {
fprintf(stdout, " %d", filehits[ifile]);
jfile += filehits[ifile], ifile++;
}
fprintf(stdout, "\n");
free(rankoffs);
free(filecont);
free(filehits);
memset((void *)filecont, 0, params->numTasks * sizeof(int));
for (ifile = 0; ifile < params->numTasks; ifile++) {
filecont[(ifile + rankoffs[ifile]) % params->numTasks]++;
}
memset((void *)filehits, 0, params->numTasks * sizeof(int));
for (ifile = 0; ifile < params->numTasks; ifile++)
for (jfile = 0; jfile < params->numTasks; jfile++) {
if (ifile == filecont[jfile])
filehits[ifile]++;
}
fprintf(stdout, "#File Hits Dist:");
jfile = 0;
ifile = 0;
while (jfile < params->numTasks && ifile < params->numTasks) {
fprintf(stdout, " %d", filehits[ifile]);
jfile += filehits[ifile], ifile++;
}
fprintf(stdout, "\n");
free(rankoffs);
free(filecont);
free(filehits);
}
int test_time_elapsed(IOR_param_t *params, double startTime)
{
double endTime;
double endTime;
if (params->maxTimeDuration == 0)
return 0;
if (params->maxTimeDuration == 0)
return 0;
endTime = startTime + (params->maxTimeDuration * 60);
endTime = startTime + (params->maxTimeDuration * 60);
return GetTimeStamp() >= endTime;
return GetTimeStamp() >= endTime;
}
/*
@ -1908,8 +1920,8 @@ static void *HogMemory(IOR_param_t *params)
*/
static void TestIoSys(IOR_test_t *test)
{
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
IOR_param_t *params = &test->params;
IOR_results_t *results = test->results;
char testFileName[MAX_STR];
double *timer[12];
double startTime;
@ -1933,9 +1945,9 @@ static void TestIoSys(IOR_test_t *test)
}
MPI_CHECK(MPI_Comm_group(MPI_COMM_WORLD, &orig_group),
"MPI_Comm_group() error");
range[0] = 0; /* first rank */
range[0] = 0; /* first rank */
range[1] = params->numTasks - 1; /* last rank */
range[2] = 1; /* stride */
range[2] = 1; /* stride */
MPI_CHECK(MPI_Group_range_incl(orig_group, 1, &range, &new_group),
"MPI_Group_range_incl() error");
MPI_CHECK(MPI_Comm_create(MPI_COMM_WORLD, new_group, &testComm),
@ -1983,14 +1995,14 @@ static void TestIoSys(IOR_test_t *test)
if (rank == 0) {
if (params->setTimeStampSignature) {
params->timeStampSignatureValue =
(unsigned int)params->setTimeStampSignature;
(unsigned int)params->setTimeStampSignature;
} else {
time_t currentTime;
if ((currentTime = time(NULL)) == -1) {
ERR("cannot get current time");
}
params->timeStampSignatureValue =
(unsigned int)currentTime;
(unsigned int)currentTime;
}
if (verbose >= VERBOSE_2) {
fprintf(stdout,
@ -1998,11 +2010,11 @@ static void TestIoSys(IOR_test_t *test)
params->timeStampSignatureValue,
params->timeStampSignatureValue);
}
if (rep == 0 && verbose >= VERBOSE_0) {
fprintf(stdout, "\n");
fprintf(stdout, "access bw(MiB/s) block(KiB) xfer(KiB) open(s) wr/rd(s) close(s) total(s) iter\n");
fprintf(stdout, "------ --------- ---------- --------- -------- -------- -------- -------- ----\n");
}
if (rep == 0 && verbose >= VERBOSE_0) {
fprintf(stdout, "\n");
fprintf(stdout, "access bw(MiB/s) block(KiB) xfer(KiB) open(s) wr/rd(s) close(s) total(s) iter\n");
fprintf(stdout, "------ --------- ---------- --------- -------- -------- -------- -------- ----\n");
}
}
MPI_CHECK(MPI_Bcast
(&params->timeStampSignatureValue, 1, MPI_UNSIGNED, 0,
@ -2037,7 +2049,7 @@ static void TestIoSys(IOR_test_t *test)
if (rank == 0 && verbose >= VERBOSE_1) {
fprintf(stderr,
"Commencing write performance test: %s",
CurrentTimeString());
CurrentTimeString());
}
timer[2][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, fd, WRITE);
@ -2051,13 +2063,13 @@ static void TestIoSys(IOR_test_t *test)
timer[5][rep] = GetTimeStamp();
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
/* get the size of the file just written */
results->aggFileSizeFromStat[rep] =
backend->get_file_size(params, testComm, testFileName);
/* get the size of the file just written */
results->aggFileSizeFromStat[rep] =
backend->get_file_size(params, testComm, testFileName);
/* check if stat() of file doesn't equal expected file size,
use actual amount of byte moved */
CheckFileSize(test, dataMoved, rep);
/* check if stat() of file doesn't equal expected file size,
use actual amount of byte moved */
CheckFileSize(test, dataMoved, rep);
if (verbose >= VERBOSE_3)
WriteTimes(params, timer, rep, WRITE);
@ -2081,7 +2093,7 @@ static void TestIoSys(IOR_test_t *test)
if (params->reorderTasks) {
/* move two nodes away from writing node */
rankOffset =
(2 * params->tasksPerNode) % params->numTasks;
(2 * params->tasksPerNode) % params->numTasks;
}
GetTestFileName(testFileName, params);
params->open = WRITECHECK;
@ -2099,21 +2111,21 @@ static void TestIoSys(IOR_test_t *test)
if (params->reorderTasks) {
/* move taskPerNodeOffset nodes[1==default] away from writing node */
rankOffset =
(params->taskPerNodeOffset *
params->tasksPerNode) % params->numTasks;
(params->taskPerNodeOffset *
params->tasksPerNode) % params->numTasks;
}
/* random process offset reading */
if (params->reorderTasksRandom) {
/* this should not intefere with randomOffset within a file because GetOffsetArrayRandom */
/* seeds every random() call */
int nodeoffset;
int nodeoffset;
unsigned int iseed0;
nodeoffset = params->taskPerNodeOffset;
nodeoffset = (nodeoffset < params->nodes) ? nodeoffset : params->nodes - 1;
if (params->reorderTasksRandomSeed < 0)
iseed0 = -1 * params->reorderTasksRandomSeed + rep;
else
iseed0 = params->reorderTasksRandomSeed;
iseed0 = -1 * params->reorderTasksRandomSeed + rep;
else
iseed0 = params->reorderTasksRandomSeed;
srand(rank + iseed0);
{
rankOffset = rand() % params->numTasks;
@ -2124,7 +2136,7 @@ static void TestIoSys(IOR_test_t *test)
}
/* Get more detailed stats if requested by verbose level */
if (verbose >= VERBOSE_2) {
file_hits_histogram(params);
file_hits_histogram(params);
}
}
/* Using globally passed rankOffset, following function generates testFileName to read */
@ -2146,7 +2158,7 @@ static void TestIoSys(IOR_test_t *test)
if (rank == 0 && verbose >= VERBOSE_1) {
fprintf(stderr,
"Commencing read performance test: %s",
CurrentTimeString());
CurrentTimeString());
}
timer[8][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, fd, READ);
@ -2160,8 +2172,8 @@ static void TestIoSys(IOR_test_t *test)
/* get the size of the file just read */
results->aggFileSizeFromStat[rep] =
backend->get_file_size(params, testComm,
testFileName);
backend->get_file_size(params, testComm,
testFileName);
/* check if stat() of file doesn't equal expected file size,
use actual amount of byte moved */
@ -2239,8 +2251,8 @@ static void TestIoSys(IOR_test_t *test)
PrintShortSummary(test);
}
if (hog_buf != NULL)
free(hog_buf);
if (hog_buf != NULL)
free(hog_buf);
for (i = 0; i < 12; i++) {
free(timer[i]);
}
@ -2415,7 +2427,7 @@ static IOR_offset_t *GetOffsetArraySequential(IOR_param_t * test,
/* setup empty array */
offsetArray =
(IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
(IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
if (offsetArray == NULL)
ERR("malloc() failed");
offsetArray[offsets] = -1; /* set last offset with -1 */
@ -2428,8 +2440,8 @@ static IOR_offset_t *GetOffsetArraySequential(IOR_param_t * test,
offsetArray[k] += i * test->blockSize;
} else {
offsetArray[k] +=
(i * test->numTasks * test->blockSize)
+ (pretendRank * test->blockSize);
(i * test->numTasks * test->blockSize)
+ (pretendRank * test->blockSize);
}
k++;
}
@ -2474,7 +2486,7 @@ static IOR_offset_t *GetOffsetArrayRandom(IOR_param_t * test, int pretendRank,
/* setup empty array */
offsetArray =
(IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
(IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
if (offsetArray == NULL)
ERR("malloc() failed");
offsetArray[offsets] = -1; /* set last offset with -1 */
@ -2557,19 +2569,19 @@ static IOR_offset_t WriteOrRead(IOR_param_t * test, void *fd, int access)
transfer = test->transferSize;
if (access == WRITE) {
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot write to file");
} else if (access == READ) {
amtXferred =
backend->xfer(access, fd, buffer, transfer, test);
backend->xfer(access, fd, buffer, transfer, test);
if (amtXferred != transfer)
ERR("cannot read from file");
} else if (access == WRITECHECK) {
memset(checkBuffer, 'a', transfer);
amtXferred =
backend->xfer(access, fd, checkBuffer, transfer,
test);
backend->xfer(access, fd, checkBuffer, transfer,
test);
if (amtXferred != transfer)
ERR("cannot read from file write check");
transferCount++;

View File

@ -12,7 +12,10 @@
#define _IOR_H
#ifdef HAVE_CONFIG_H
#include "config.h"
# include "config.h"
#endif
#ifdef USE_HDFS_AIORI
# include <hdfs.h> /* hdfsFS */
#endif
#include "iordef.h"
@ -98,7 +101,6 @@ typedef struct
void * fd_fppReadCheck; /* additional fd for fpp read check */
int randomSeed; /* random seed for write/read check */
int randomOffset; /* access is to random offsets */
MPI_Comm testComm; /* MPI communicator */
size_t memoryPerTask; /* additional memory used per task */
size_t memoryPerNode; /* additional memory used per node */
@ -108,6 +110,7 @@ typedef struct
int fsync; /* fsync() after write */
/* MPI variables */
MPI_Comm testComm; /* MPI communicator */
MPI_Datatype transferType; /* datatype for transfer */
MPI_Datatype fileType; /* filetype for file view */
@ -116,6 +119,14 @@ typedef struct
int noFill; /* no fill in file creation */
IOR_offset_t setAlignment; /* alignment in bytes */
/* HDFS variables */
const char* hdfs_name_node;
tPort hdfs_name_node_port; /* (uint16_t) */
hdfsFS hdfs_fs; /* file-system handle */
int hdfs_replicas; /* n block replicas. (0 gets default) */
int hdfs_block_size; /* internal blk-size. (0 gets default) */
/* NCMPI variables */
int var_id; /* variable id handle for data set */
@ -147,11 +158,12 @@ typedef struct {
/* define the queuing structure for the test parameters */
typedef struct IOR_test_t {
IOR_param_t params;
IOR_results_t *results;
struct IOR_test_t *next;
IOR_param_t params;
IOR_results_t *results;
struct IOR_test_t *next;
} IOR_test_t;
IOR_test_t *CreateTest(IOR_param_t *init_params, int test_num);
void AllocResults(IOR_test_t *test);
void GetPlatformName(char *);

View File

@ -119,6 +119,8 @@ extern int verbose; /* verbose output */
typedef long long int IOR_offset_t;
typedef long long int IOR_size_t;
#define IOR_format "%016llx"
/******************************** M A C R O S *********************************/
@ -168,6 +170,15 @@ typedef long long int IOR_size_t;
} while (0)
/* display a simple error message (i.e. errno is not set) and terminate execution */
#define ERR_SIMPLE(MSG) do { \
fprintf(stdout, "ior ERROR: %s, (%s:%d)\n", \
MSG, __FILE__, __LINE__); \
fflush(stdout); \
MPI_Abort(MPI_COMM_WORLD, -1); \
} while (0)
/******************************************************************************/
/*
* MPI_CHECK will display a custom error message as well as an error string

View File

@ -73,14 +73,17 @@ char *CurrentTimeString(void)
/*
* Dump transfer buffer.
*/
void DumpBuffer(void *buffer, size_t size)
void DumpBuffer(void *buffer,
size_t size) /* <size> in bytes */
{
size_t i, j;
unsigned long long *dumpBuf = (unsigned long long *)buffer;
IOR_size_t *dumpBuf = (IOR_size_t *)buffer;
/* Turns out, IOR_size_t is unsigned long long, but we don't want
to assume that it must always be */
for (i = 0; i < ((size / sizeof(IOR_size_t)) / 4); i++) {
for (j = 0; j < 4; j++) {
fprintf(stdout, "%016llx ", dumpBuf[4 * i + j]);
fprintf(stdout, IOR_format" ", dumpBuf[4 * i + j]);
}
fprintf(stdout, "\n");
}