Fixed bug in aiori-HDFS.c where it was calling "free(fd);", which is bad.

Along the way, added a bunch of diagnostic output in the HDFS calls, which
only shows up at verbosity >= 4.  I'll probably remove this stuff before
merging with master.  Also, there's an #ifdef'ed-out sleep() in place,
which I used to attach gdb to a running MPI task.  I'll get rid of that
later, too.

Also, added another hdfs-related parameter to the IOR_param_t structure;
hdfs_user_name gets the value of the USER environment-variable as the
default HDFS user for connections.  Does this cause portability problems?
master
Jeff Inman 2014-08-28 15:39:44 -06:00
parent 0743eaf3fd
commit 28c94f8875
4 changed files with 171 additions and 20 deletions

9
.gitignore vendored
View File

@ -25,10 +25,7 @@ src/stamp-h1
NOTES.txt
autom4te.cache
contrib/cbif.o
src/aiori-MPIIO.o
src/aiori-PLFS.c
src/aiori-POSIX.o
src/*.o
src/*.i
src/*.s
src/ior
src/ior.o
src/parse_options.o
src/utilities.o

View File

@ -51,6 +51,11 @@
*
* Implement of abstract I/O interface for HDFS.
*
* HDFS has the added concept of a "File System Handle" which has to be
* connected before files are opened. We store this in the IOR_param_t
* object that is always passed to our functions. The thing that callers
* think of as the "fd" is an hdfsFile, (a pointer).
*
\******************************************************************************/
#ifdef HAVE_CONFIG_H
@ -154,45 +159,70 @@ void hdfs_set_o_direct_flag(int *fd)
* filesystem is connected.
*/
static void hdfs_connect( IOR_param_t* param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n",
param->hdfs_name_node,
param->hdfs_name_node_port,
param->hdfs_user );
}
if ( param->hdfs_fs )
if ( param->hdfs_fs ) {
if (param->verbose >= VERBOSE_4) {
printf("<- hdfs_connect [nothing to do]\n"); /* DEBUGGING */
}
return;
}
/* initialize a builder, holding parameters for hdfsBuilderConnect() */
struct hdfsBuilder* builder = hdfsNewBuilder();
if ( ! builder )
ERR_SIMPLE("couldn't create and hdsfsBuilder");
ERR_SIMPLE("couldn't create an hdfsBuilder");
hdfsBuilderSetForceNewInstance ( builder ); /* don't use cached instance */
hdfsBuilderSetNameNode ( builder, param->hdfs_name_node );
hdfsBuilderSetNameNodePort( builder, param->hdfs_name_node_port );
hdfsBuilderSetUserName ( builder, param->hdfs_user );
/* NOTE: hdfsBuilderConnect() frees the builder */
param->hdfs_fs = hdfsBuilderConnect( builder );
if ( ! param->hdfs_fs )
ERR_SIMPLE("hdsfsBuilderConnect failed");
if (param->verbose >= VERBOSE_4) {
printf("<- hdfs_connect [success]\n");
}
}
static void hdfs_disconnect( IOR_param_t* param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> hdfs_disconnect\n");
}
if ( param->hdfs_fs ) {
hdfsDisconnect( param->hdfs_fs );
param->hdfs_fs = NULL;
}
if (param->verbose >= VERBOSE_4) {
printf("<- hdfs_disconnect\n");
}
}
/*
* Create or open the file. Pass TRUE if creating and FALSE if opening an existing file.
* Return an hdfsFile.
*/
static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsigned char createFile ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Create_Or_Open\n");
}
hdfsFile hdfs_file = NULL;
int fd_oflags = 0, hdfs_return;
/* initialize file-system handle, if needed */
if ( ! param->hdfs_fs ) {
hdfs_connect( param );
}
hdfs_connect( param );
/*
* Check for unsupported flags.
@ -270,13 +300,21 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
* Now rank zero can open and truncate, if necessary.
*/
if (param->verbose >= VERBOSE_4) {
printf("\thdfsOpenFile(0x%llx, %s, 0%o, %d, %d, %d)\n",
param->hdfs_fs,
testFileName,
fd_oflags, /* shown in octal to compare w/ <bits/fcntl.h> */
param->transferSize,
param->hdfs_replicas,
param->hdfs_block_size);
}
hdfs_file = hdfsOpenFile( param->hdfs_fs,
testFileName,
fd_oflags,
param->transferSize,
param->hdfs_replicas,
param->hdfs_block_size);
if ( ! hdfs_file ) {
ERR( "Failed to open the file" );
}
@ -292,6 +330,9 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
}
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Create_Or_Open\n");
}
return ((void *) hdfs_file );
}
@ -300,7 +341,13 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
*/
static void *HDFS_Create( char *testFileName, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Create\n");
}
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Create\n");
}
return HDFS_Create_Or_Open( testFileName, param, TRUE );
}
@ -308,11 +355,20 @@ static void *HDFS_Create( char *testFileName, IOR_param_t * param ) {
* Open a file through the HDFS interface.
*/
static void *HDFS_Open( char *testFileName, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Open\n");
}
if ( param->openFlags & IOR_CREAT ) {
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Open( ... TRUE)\n");
}
return HDFS_Create_Or_Open( testFileName, param, TRUE );
}
else {
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Open( ... FALSE)\n");
}
return HDFS_Create_Or_Open( testFileName, param, FALSE );
}
}
@ -323,6 +379,10 @@ static void *HDFS_Open( char *testFileName, IOR_param_t * param ) {
static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_param_t * param) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Xfer(acc:%d, file:0x%llx, buf:0x%llx, len:%llu, 0x%llx)\n",
access, file, buffer, length, param);
}
int xferRetries = 0;
long long remaining = (long long)length;
@ -343,6 +403,10 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
param->offset + length - remaining);
}
if (param->verbose >= VERBOSE_4) {
printf("\thdfsWrite( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */
}
rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining );
if ( rc < 0 ) {
ERR( "hdfsWrite() failed" );
@ -361,6 +425,10 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
param->offset + length - remaining );
}
if (param->verbose >= VERBOSE_4) {
printf("\thdfsRead( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */
}
rc = hdfsRead( hdfs_fs, hdfs_file, ptr, remaining );
if ( rc == 0 ) {
@ -398,6 +466,9 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
xferRetries++;
}
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Xfer\n");
}
return ( length );
}
@ -406,12 +477,37 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
*/
static void HDFS_Fsync( void *fd, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Fsync\n");
}
hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */
hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */
#if 0
if (param->verbose >= VERBOSE_4) {
printf("\thdfsHSync(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
}
if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) {
EWARN( "hdfs_sync() failed" );
EWARN( "hdfsHSync() failed" );
}
#elif 0
if (param->verbose >= VERBOSE_4) {
printf("\thdfsHFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
}
if ( hdfsHFlush( hdfs_fs, hdfs_file ) != 0 ) {
EWARN( "hdfsHFlush() failed" );
}
#else
if (param->verbose >= VERBOSE_4) {
printf("\thdfsFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
}
if ( hdfsFlush( hdfs_fs, hdfs_file ) != 0 ) {
EWARN( "hdfsFlush() failed" );
}
#endif
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Fsync\n");
}
}
@ -420,6 +516,9 @@ static void HDFS_Fsync( void *fd, IOR_param_t * param ) {
*/
static void HDFS_Close( void *fd, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Close\n");
}
hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */
hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */
@ -436,7 +535,9 @@ static void HDFS_Close( void *fd, IOR_param_t * param ) {
ERR( "hdfsCloseFile() failed" );
}
free( fd );
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Close\n");
}
}
/*
@ -446,9 +547,15 @@ static void HDFS_Close( void *fd, IOR_param_t * param ) {
* select recursive deletes. We'll assume that that is never needed.
*/
static void HDFS_Delete( char *testFileName, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Delete\n");
}
char errmsg[256];
/* initialize file-system handle, if needed */
hdfs_connect( param );
if ( ! param->hdfs_fs )
ERR_SIMPLE( "Can't delete a file without an HDFS connection" );
@ -459,6 +566,9 @@ static void HDFS_Delete( char *testFileName, IOR_param_t * param ) {
EWARN( errmsg );
}
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Delete\n");
}
}
/*
@ -466,8 +576,14 @@ static void HDFS_Delete( char *testFileName, IOR_param_t * param ) {
*/
static void HDFS_SetVersion( IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_SetVersion\n");
}
strcpy( param->apiVersion, param->api );
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_SetVersion\n");
}
}
/*
@ -480,6 +596,9 @@ static IOR_offset_t
HDFS_GetFileSize(IOR_param_t * param,
MPI_Comm testComm,
char * testFileName) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_GetFileSize(%s)\n", testFileName);
}
IOR_offset_t aggFileSizeFromStat;
IOR_offset_t tmpMin, tmpMax, tmpSum;
@ -488,13 +607,23 @@ HDFS_GetFileSize(IOR_param_t * param,
hdfs_connect( param );
/* file-info struct includes size in bytes */
if (param->verbose >= VERBOSE_4) {
printf("\thdfsGetPathInfo(%s) ...", testFileName);fflush(stdout);
}
hdfsFileInfo* info = hdfsGetPathInfo( param->hdfs_fs, testFileName );
if ( ! info )
ERR_SIMPLE( "hdfsGetPathInfo() failed" );
if (param->verbose >= VERBOSE_4) {
printf("done.\n");fflush(stdout);
}
aggFileSizeFromStat = info->mSize;
if ( param->filePerProc == TRUE ) {
if (param->verbose >= VERBOSE_4) {
printf("\tall-reduce (1)\n");
}
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpSum, 1, MPI_LONG_LONG_INT, MPI_SUM, testComm ),
@ -503,11 +632,17 @@ HDFS_GetFileSize(IOR_param_t * param,
aggFileSizeFromStat = tmpSum;
}
else {
if (param->verbose >= VERBOSE_4) {
printf("\tall-reduce (2a)\n");
}
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpMin, 1, MPI_LONG_LONG_INT, MPI_MIN, testComm ),
"cannot total data moved" );
if (param->verbose >= VERBOSE_4) {
printf("\tall-reduce (2b)\n");
}
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpMax, 1, MPI_LONG_LONG_INT, MPI_MAX, testComm ),
@ -523,5 +658,8 @@ HDFS_GetFileSize(IOR_param_t * param,
}
}
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat);
}
return ( aggFileSizeFromStat );
}

View File

@ -145,6 +145,14 @@ int main(int argc, char **argv)
if (rank == 0 && verbose >= VERBOSE_3) {
ShowTest(&tptr->params);
}
#if 0
// This is useful for trapping a running MPI process. While
// this is sleeping, run the script 'testing/hdfs/gdb.attach'
printf("\tsleeping ...");
fflush(stdout);
sleep(5);
printf("done.\n");
#endif
TestIoSys(tptr);
}
@ -196,10 +204,12 @@ void init_IOR_Param_t(IOR_param_t * p)
p->testComm = MPI_COMM_WORLD;
p->setAlignment = 1;
p->lustre_start_ost = -1;
strncpy(p->hdfs_user, getenv("USER"), MAX_STR);
p->hdfs_name_node = "default";
p->hdfs_name_node_port = 0; /* ??? */
p->hdfs_fs = NULL;
p->hdfs_replicas = 0;
p->hdfs_replicas = 0; /* invokes the default */
p->hdfs_block_size = 0;
}
@ -873,9 +883,10 @@ void GetPlatformName(char *platformName)
*/
static void GetTestFileName(char *testFileName, IOR_param_t * test)
{
char **fileNames,
initialTestFileName[MAXPATHLEN],
testFileNameRoot[MAX_STR], tmpString[MAX_STR];
char **fileNames;
char initialTestFileName[MAXPATHLEN];
char testFileNameRoot[MAX_STR];
char tmpString[MAX_STR];
int count;
/* parse filename for multiple file systems */
@ -1483,7 +1494,7 @@ static void ShowSetup(IOR_param_t *params)
printf("\tapi = %s\n", params->apiVersion);
printf("\ttest filename = %s\n", params->testFileName);
printf("\taccess = ");
printf(params->filePerProc ? "file-per-process" : "single-shared-file");
printf(params->filePerProc ? "file-per-process" : "single-shared-file");
if (verbose >= VERBOSE_1 && strcmp(params->api, "POSIX") != 0) {
printf(params->collective == FALSE ? ", independent" : ", collective");
}
@ -2053,6 +2064,10 @@ static void TestIoSys(IOR_test_t *test)
}
timer[2][rep] = GetTimeStamp();
dataMoved = WriteOrRead(params, fd, WRITE);
if (params->verbose >= VERBOSE_4) {
printf("* data moved = %llu\n", dataMoved);
fflush(stdout);
}
timer[3][rep] = GetTimeStamp();
if (params->intraTestBarriers)
MPI_CHECK(MPI_Barrier(testComm),

View File

@ -120,6 +120,7 @@ typedef struct
IOR_offset_t setAlignment; /* alignment in bytes */
/* HDFS variables */
char hdfs_user[MAX_STR]; /* copied from ENV, for now */
const char* hdfs_name_node;
tPort hdfs_name_node_port; /* (uint16_t) */