From 28c94f8875e568be33d8359ab781a48026983a63 Mon Sep 17 00:00:00 2001 From: Jeff Inman Date: Thu, 28 Aug 2014 15:39:44 -0600 Subject: [PATCH] Fixed bug in aiori-HDFS.c where it was calling "free(fd);", which is bad. Along the way, added a bunch of diagnostic output in the HDFS calls, which only shows up at verbosity >= 4. I'll probably remove this stuff before merging with master. Also, there's an #ifdef'ed-out sleep() in place, which I used to attach gdb to a running MPI task. I'll get rid of that later, too. Also, added another hdfs-related parameter to the IOR_param_t structure; hdfs_user_name gets the value of the USER environment-variable as the default HDFS user for connections. Does this cause portability problems? --- .gitignore | 9 +-- src/aiori-HDFS.c | 156 ++++++++++++++++++++++++++++++++++++++++++++--- src/ior.c | 25 ++++++-- src/ior.h | 1 + 4 files changed, 171 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index 6287534..bb976c0 100644 --- a/.gitignore +++ b/.gitignore @@ -25,10 +25,7 @@ src/stamp-h1 NOTES.txt autom4te.cache contrib/cbif.o -src/aiori-MPIIO.o -src/aiori-PLFS.c -src/aiori-POSIX.o +src/*.o +src/*.i +src/*.s src/ior -src/ior.o -src/parse_options.o -src/utilities.o diff --git a/src/aiori-HDFS.c b/src/aiori-HDFS.c index cc7f348..c0fd3da 100644 --- a/src/aiori-HDFS.c +++ b/src/aiori-HDFS.c @@ -51,6 +51,11 @@ * * Implement of abstract I/O interface for HDFS. * +* HDFS has the added concept of a "File System Handle" which has to be +* connected before files are opened. We store this in the IOR_param_t +* object that is always passed to our functions. The thing that callers +* think of as the "fd" is an hdfsFile, (a pointer). +* \******************************************************************************/ #ifdef HAVE_CONFIG_H @@ -154,45 +159,70 @@ void hdfs_set_o_direct_flag(int *fd) * filesystem is connected. */ static void hdfs_connect( IOR_param_t* param ) { + if (param->verbose >= VERBOSE_4) { + printf("-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n", + param->hdfs_name_node, + param->hdfs_name_node_port, + param->hdfs_user ); + } - if ( param->hdfs_fs ) + if ( param->hdfs_fs ) { + if (param->verbose >= VERBOSE_4) { + printf("<- hdfs_connect [nothing to do]\n"); /* DEBUGGING */ + } return; + } /* initialize a builder, holding parameters for hdfsBuilderConnect() */ struct hdfsBuilder* builder = hdfsNewBuilder(); if ( ! builder ) - ERR_SIMPLE("couldn't create and hdsfsBuilder"); + ERR_SIMPLE("couldn't create an hdfsBuilder"); + hdfsBuilderSetForceNewInstance ( builder ); /* don't use cached instance */ hdfsBuilderSetNameNode ( builder, param->hdfs_name_node ); hdfsBuilderSetNameNodePort( builder, param->hdfs_name_node_port ); + hdfsBuilderSetUserName ( builder, param->hdfs_user ); /* NOTE: hdfsBuilderConnect() frees the builder */ param->hdfs_fs = hdfsBuilderConnect( builder ); if ( ! param->hdfs_fs ) ERR_SIMPLE("hdsfsBuilderConnect failed"); + + if (param->verbose >= VERBOSE_4) { + printf("<- hdfs_connect [success]\n"); + } } static void hdfs_disconnect( IOR_param_t* param ) { + if (param->verbose >= VERBOSE_4) { + printf("-> hdfs_disconnect\n"); + } if ( param->hdfs_fs ) { hdfsDisconnect( param->hdfs_fs ); + param->hdfs_fs = NULL; + } + if (param->verbose >= VERBOSE_4) { + printf("<- hdfs_disconnect\n"); } } /* * Create or open the file. Pass TRUE if creating and FALSE if opening an existing file. + * Return an hdfsFile. */ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsigned char createFile ) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_Create_Or_Open\n"); + } hdfsFile hdfs_file = NULL; int fd_oflags = 0, hdfs_return; /* initialize file-system handle, if needed */ - if ( ! param->hdfs_fs ) { - hdfs_connect( param ); - } + hdfs_connect( param ); /* * Check for unsupported flags. @@ -270,13 +300,21 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign * Now rank zero can open and truncate, if necessary. */ + if (param->verbose >= VERBOSE_4) { + printf("\thdfsOpenFile(0x%llx, %s, 0%o, %d, %d, %d)\n", + param->hdfs_fs, + testFileName, + fd_oflags, /* shown in octal to compare w/ */ + param->transferSize, + param->hdfs_replicas, + param->hdfs_block_size); + } hdfs_file = hdfsOpenFile( param->hdfs_fs, testFileName, fd_oflags, param->transferSize, param->hdfs_replicas, param->hdfs_block_size); - if ( ! hdfs_file ) { ERR( "Failed to open the file" ); } @@ -292,6 +330,9 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign MPI_CHECK(MPI_Barrier(testComm), "barrier error"); } + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Create_Or_Open\n"); + } return ((void *) hdfs_file ); } @@ -300,7 +341,13 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign */ static void *HDFS_Create( char *testFileName, IOR_param_t * param ) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_Create\n"); + } + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Create\n"); + } return HDFS_Create_Or_Open( testFileName, param, TRUE ); } @@ -308,11 +355,20 @@ static void *HDFS_Create( char *testFileName, IOR_param_t * param ) { * Open a file through the HDFS interface. */ static void *HDFS_Open( char *testFileName, IOR_param_t * param ) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_Open\n"); + } if ( param->openFlags & IOR_CREAT ) { + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Open( ... TRUE)\n"); + } return HDFS_Create_Or_Open( testFileName, param, TRUE ); } else { + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Open( ... FALSE)\n"); + } return HDFS_Create_Or_Open( testFileName, param, FALSE ); } } @@ -323,6 +379,10 @@ static void *HDFS_Open( char *testFileName, IOR_param_t * param ) { static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, IOR_offset_t length, IOR_param_t * param) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_Xfer(acc:%d, file:0x%llx, buf:0x%llx, len:%llu, 0x%llx)\n", + access, file, buffer, length, param); + } int xferRetries = 0; long long remaining = (long long)length; @@ -343,6 +403,10 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, param->offset + length - remaining); } + if (param->verbose >= VERBOSE_4) { + printf("\thdfsWrite( 0x%llx, 0x%llx, 0x%llx, %lld)\n", + hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */ + } rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining ); if ( rc < 0 ) { ERR( "hdfsWrite() failed" ); @@ -361,6 +425,10 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, param->offset + length - remaining ); } + if (param->verbose >= VERBOSE_4) { + printf("\thdfsRead( 0x%llx, 0x%llx, 0x%llx, %lld)\n", + hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */ + } rc = hdfsRead( hdfs_fs, hdfs_file, ptr, remaining ); if ( rc == 0 ) { @@ -398,6 +466,9 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, xferRetries++; } + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Xfer\n"); + } return ( length ); } @@ -406,12 +477,37 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, */ static void HDFS_Fsync( void *fd, IOR_param_t * param ) { - + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_Fsync\n"); + } hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */ hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */ +#if 0 + if (param->verbose >= VERBOSE_4) { + printf("\thdfsHSync(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file); + } if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) { - EWARN( "hdfs_sync() failed" ); + EWARN( "hdfsHSync() failed" ); + } +#elif 0 + if (param->verbose >= VERBOSE_4) { + printf("\thdfsHFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file); + } + if ( hdfsHFlush( hdfs_fs, hdfs_file ) != 0 ) { + EWARN( "hdfsHFlush() failed" ); + } +#else + if (param->verbose >= VERBOSE_4) { + printf("\thdfsFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file); + } + if ( hdfsFlush( hdfs_fs, hdfs_file ) != 0 ) { + EWARN( "hdfsFlush() failed" ); + } +#endif + + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Fsync\n"); } } @@ -420,6 +516,9 @@ static void HDFS_Fsync( void *fd, IOR_param_t * param ) { */ static void HDFS_Close( void *fd, IOR_param_t * param ) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_Close\n"); + } hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */ hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */ @@ -436,7 +535,9 @@ static void HDFS_Close( void *fd, IOR_param_t * param ) { ERR( "hdfsCloseFile() failed" ); } - free( fd ); + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Close\n"); + } } /* @@ -446,9 +547,15 @@ static void HDFS_Close( void *fd, IOR_param_t * param ) { * select recursive deletes. We'll assume that that is never needed. */ static void HDFS_Delete( char *testFileName, IOR_param_t * param ) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_Delete\n"); + } char errmsg[256]; + /* initialize file-system handle, if needed */ + hdfs_connect( param ); + if ( ! param->hdfs_fs ) ERR_SIMPLE( "Can't delete a file without an HDFS connection" ); @@ -459,6 +566,9 @@ static void HDFS_Delete( char *testFileName, IOR_param_t * param ) { EWARN( errmsg ); } + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_Delete\n"); + } } /* @@ -466,8 +576,14 @@ static void HDFS_Delete( char *testFileName, IOR_param_t * param ) { */ static void HDFS_SetVersion( IOR_param_t * param ) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_SetVersion\n"); + } strcpy( param->apiVersion, param->api ); + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_SetVersion\n"); + } } /* @@ -480,6 +596,9 @@ static IOR_offset_t HDFS_GetFileSize(IOR_param_t * param, MPI_Comm testComm, char * testFileName) { + if (param->verbose >= VERBOSE_4) { + printf("-> HDFS_GetFileSize(%s)\n", testFileName); + } IOR_offset_t aggFileSizeFromStat; IOR_offset_t tmpMin, tmpMax, tmpSum; @@ -488,13 +607,23 @@ HDFS_GetFileSize(IOR_param_t * param, hdfs_connect( param ); /* file-info struct includes size in bytes */ + if (param->verbose >= VERBOSE_4) { + printf("\thdfsGetPathInfo(%s) ...", testFileName);fflush(stdout); + } + hdfsFileInfo* info = hdfsGetPathInfo( param->hdfs_fs, testFileName ); if ( ! info ) ERR_SIMPLE( "hdfsGetPathInfo() failed" ); + if (param->verbose >= VERBOSE_4) { + printf("done.\n");fflush(stdout); + } aggFileSizeFromStat = info->mSize; if ( param->filePerProc == TRUE ) { + if (param->verbose >= VERBOSE_4) { + printf("\tall-reduce (1)\n"); + } MPI_CHECK( MPI_Allreduce( &aggFileSizeFromStat, &tmpSum, 1, MPI_LONG_LONG_INT, MPI_SUM, testComm ), @@ -503,11 +632,17 @@ HDFS_GetFileSize(IOR_param_t * param, aggFileSizeFromStat = tmpSum; } else { + if (param->verbose >= VERBOSE_4) { + printf("\tall-reduce (2a)\n"); + } MPI_CHECK( MPI_Allreduce( &aggFileSizeFromStat, &tmpMin, 1, MPI_LONG_LONG_INT, MPI_MIN, testComm ), "cannot total data moved" ); + if (param->verbose >= VERBOSE_4) { + printf("\tall-reduce (2b)\n"); + } MPI_CHECK( MPI_Allreduce( &aggFileSizeFromStat, &tmpMax, 1, MPI_LONG_LONG_INT, MPI_MAX, testComm ), @@ -523,5 +658,8 @@ HDFS_GetFileSize(IOR_param_t * param, } } + if (param->verbose >= VERBOSE_4) { + printf("<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat); + } return ( aggFileSizeFromStat ); } diff --git a/src/ior.c b/src/ior.c index fa01a96..c8129c2 100644 --- a/src/ior.c +++ b/src/ior.c @@ -145,6 +145,14 @@ int main(int argc, char **argv) if (rank == 0 && verbose >= VERBOSE_3) { ShowTest(&tptr->params); } +#if 0 + // This is useful for trapping a running MPI process. While + // this is sleeping, run the script 'testing/hdfs/gdb.attach' + printf("\tsleeping ..."); + fflush(stdout); + sleep(5); + printf("done.\n"); +#endif TestIoSys(tptr); } @@ -196,10 +204,12 @@ void init_IOR_Param_t(IOR_param_t * p) p->testComm = MPI_COMM_WORLD; p->setAlignment = 1; p->lustre_start_ost = -1; + + strncpy(p->hdfs_user, getenv("USER"), MAX_STR); p->hdfs_name_node = "default"; p->hdfs_name_node_port = 0; /* ??? */ p->hdfs_fs = NULL; - p->hdfs_replicas = 0; + p->hdfs_replicas = 0; /* invokes the default */ p->hdfs_block_size = 0; } @@ -873,9 +883,10 @@ void GetPlatformName(char *platformName) */ static void GetTestFileName(char *testFileName, IOR_param_t * test) { - char **fileNames, - initialTestFileName[MAXPATHLEN], - testFileNameRoot[MAX_STR], tmpString[MAX_STR]; + char **fileNames; + char initialTestFileName[MAXPATHLEN]; + char testFileNameRoot[MAX_STR]; + char tmpString[MAX_STR]; int count; /* parse filename for multiple file systems */ @@ -1483,7 +1494,7 @@ static void ShowSetup(IOR_param_t *params) printf("\tapi = %s\n", params->apiVersion); printf("\ttest filename = %s\n", params->testFileName); printf("\taccess = "); - printf(params->filePerProc ? "file-per-process" : "single-shared-file"); + printf(params->filePerProc ? "file-per-process" : "single-shared-file"); if (verbose >= VERBOSE_1 && strcmp(params->api, "POSIX") != 0) { printf(params->collective == FALSE ? ", independent" : ", collective"); } @@ -2053,6 +2064,10 @@ static void TestIoSys(IOR_test_t *test) } timer[2][rep] = GetTimeStamp(); dataMoved = WriteOrRead(params, fd, WRITE); + if (params->verbose >= VERBOSE_4) { + printf("* data moved = %llu\n", dataMoved); + fflush(stdout); + } timer[3][rep] = GetTimeStamp(); if (params->intraTestBarriers) MPI_CHECK(MPI_Barrier(testComm), diff --git a/src/ior.h b/src/ior.h index 2a97d66..ee9a21b 100644 --- a/src/ior.h +++ b/src/ior.h @@ -120,6 +120,7 @@ typedef struct IOR_offset_t setAlignment; /* alignment in bytes */ /* HDFS variables */ + char hdfs_user[MAX_STR]; /* copied from ENV, for now */ const char* hdfs_name_node; tPort hdfs_name_node_port; /* (uint16_t) */