diff --git a/src/aiori-HDFS.c b/src/aiori-HDFS.c index 118de15..8c528ab 100755 --- a/src/aiori-HDFS.c +++ b/src/aiori-HDFS.c @@ -81,10 +81,9 @@ #include #endif */ - #include "ior.h" #include "aiori.h" -#include "iordef.h" +#include "utilities.h" #ifndef open64 /* necessary for TRU64 -- */ # define open64 open /* unlikely, but may pose */ @@ -101,15 +100,23 @@ #include "hdfs.h" /**************************** P R O T O T Y P E S *****************************/ -static void *HDFS_Create(char *, IOR_param_t *); -static void *HDFS_Open(char *, IOR_param_t *); -static IOR_offset_t HDFS_Xfer(int, void *, IOR_size_t *, - IOR_offset_t, IOR_param_t *); -static void HDFS_Close(void *, IOR_param_t *); -static void HDFS_Delete(char *, IOR_param_t *); -static void HDFS_SetVersion(IOR_param_t *); -static void HDFS_Fsync(void *, IOR_param_t *); -static IOR_offset_t HDFS_GetFileSize(IOR_param_t *, MPI_Comm, char *); +static aiori_fd_t *HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t * param); +static aiori_fd_t *HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t * param); +static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, + IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param); +static void HDFS_Close(aiori_fd_t *, aiori_mod_opt_t *); +static void HDFS_Delete(char *testFileName, aiori_mod_opt_t * param); +static void HDFS_Fsync(aiori_fd_t *, aiori_mod_opt_t *); +static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t *,char *); +static void hdfs_xfer_hints(aiori_xfer_hint_t * params); +static option_help * HDFS_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values); +static int HDFS_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options); +static int HDFS_rmdir (const char *path, aiori_mod_opt_t * options); +static int HDFS_access (const char *path, int mode, aiori_mod_opt_t * options); +static int HDFS_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options); +static int HDFS_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options); + +static aiori_xfer_hint_t * hints = NULL; /************************** D E C L A R A T I O N S ***************************/ @@ -121,13 +128,120 @@ ior_aiori_t hdfs_aiori = { .xfer = HDFS_Xfer, .close = HDFS_Close, .delete = HDFS_Delete, - .set_version = HDFS_SetVersion, + .get_options = HDFS_options, + .get_version = aiori_get_version, + .xfer_hints = hdfs_xfer_hints, .fsync = HDFS_Fsync, .get_file_size = HDFS_GetFileSize, + .statfs = HDFS_statfs, + .mkdir = HDFS_mkdir, + .rmdir = HDFS_rmdir, + .access = HDFS_access, + .stat = HDFS_stat, + .enable_mdtest = true }; /***************************** F U N C T I O N S ******************************/ +void hdfs_xfer_hints(aiori_xfer_hint_t * params){ + hints = params; +} + +/************************** O P T I O N S *****************************/ +typedef struct { + char * user; + char * name_node; + int replicas; /* n block replicas. (0 gets default) */ + int direct_io; + IOR_offset_t block_size; /* internal blk-size. (0 gets default) */ + // runtime options + hdfsFS fs; /* file-system handle */ + tPort name_node_port; /* (uint16_t) */ +} hdfs_options_t; + +static void hdfs_connect( hdfs_options_t* o ); + +option_help * HDFS_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){ + hdfs_options_t * o = malloc(sizeof(hdfs_options_t)); + + if (init_values != NULL){ + memcpy(o, init_values, sizeof(hdfs_options_t)); + }else{ + memset(o, 0, sizeof(hdfs_options_t)); + char *hdfs_user; + hdfs_user = getenv("USER"); + if (!hdfs_user){ + hdfs_user = ""; + } + o->user = strdup(hdfs_user); + o->name_node = "default"; + } + + *init_backend_options = (aiori_mod_opt_t*) o; + + option_help h [] = { + {0, "hdfs.odirect", "Direct I/O Mode", OPTION_FLAG, 'd', & o->direct_io}, + {0, "hdfs.user", "Username", OPTION_OPTIONAL_ARGUMENT, 's', & o->user}, + {0, "hdfs.name_node", "Namenode", OPTION_OPTIONAL_ARGUMENT, 's', & o->name_node}, + {0, "hdfs.replicas", "Number of replicas", OPTION_OPTIONAL_ARGUMENT, 'd', & o->replicas}, + {0, "hdfs.block_size", "Blocksize", OPTION_OPTIONAL_ARGUMENT, 'l', & o->block_size}, + LAST_OPTION + }; + option_help * help = malloc(sizeof(h)); + memcpy(help, h, sizeof(h)); + return help; +} + + +int HDFS_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){ + hdfs_options_t * o = (hdfs_options_t*) options; + hdfs_connect(o); + return hdfsCreateDirectory(o->fs, path); +} + +int HDFS_rmdir (const char *path, aiori_mod_opt_t * options){ + hdfs_options_t * o = (hdfs_options_t*) options; + hdfs_connect(o); + return hdfsDelete(o->fs, path, 1); +} + +int HDFS_access (const char *path, int mode, aiori_mod_opt_t * options){ + hdfs_options_t * o = (hdfs_options_t*) options; + hdfs_connect(o); + return hdfsExists(o->fs, path); +} + +int HDFS_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options){ + hdfsFileInfo * stat; + hdfs_options_t * o = (hdfs_options_t*) options; + hdfs_connect(o); + stat = hdfsGetPathInfo(o->fs, path); + if(stat == NULL){ + return 1; + } + memset(buf, 0, sizeof(struct stat)); + buf->st_atime = stat->mLastAccess; + buf->st_size = stat->mSize; + buf->st_mtime = stat->mLastMod; + buf->st_mode = stat->mPermissions; + + hdfsFreeFileInfo(stat, 1); + return 0; +} + +int HDFS_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options){ + hdfs_options_t * o = (hdfs_options_t*) options; + hdfs_connect(o); + + stat->f_bsize = hdfsGetDefaultBlockSize(o->fs); + stat->f_blocks = hdfsGetCapacity(o->fs) / hdfsGetDefaultBlockSize(o->fs); + stat->f_bfree = stat->f_blocks - hdfsGetUsed(o->fs) / hdfsGetDefaultBlockSize(o->fs); + stat->f_bavail = 1; + stat->f_files = 1; + stat->f_ffree = 1; + return 0; +} + /* This is identical to the one in aiori-POSIX.c Doesn't seem like * it would be appropriate in utilities.c. */ @@ -159,16 +273,16 @@ void hdfs_set_o_direct_flag(int *fd) * NOTE: It's okay to call this thing whenever you need to be sure the HDFS * filesystem is connected. */ -static void hdfs_connect( IOR_param_t* param ) { - if (param->verbose >= VERBOSE_4) { +void hdfs_connect( hdfs_options_t* o ) { + if (verbose >= VERBOSE_4) { printf("-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n", - param->hdfs_name_node, - param->hdfs_name_node_port, - param->hdfs_user ); + o->name_node, + o->name_node_port, + o->user ); } - if ( param->hdfs_fs ) { - if (param->verbose >= VERBOSE_4) { + if ( o->fs ) { + if (verbose >= VERBOSE_4) { printf("<- hdfs_connect [nothing to do]\n"); /* DEBUGGING */ } return; @@ -176,34 +290,35 @@ static void hdfs_connect( IOR_param_t* param ) { /* initialize a builder, holding parameters for hdfsBuilderConnect() */ struct hdfsBuilder* builder = hdfsNewBuilder(); - if ( ! builder ) - ERR_SIMPLE("couldn't create an hdfsBuilder"); + if ( ! builder ){ + ERR("couldn't create an hdfsBuilder"); + } hdfsBuilderSetForceNewInstance ( builder ); /* don't use cached instance */ - hdfsBuilderSetNameNode ( builder, param->hdfs_name_node ); - hdfsBuilderSetNameNodePort( builder, param->hdfs_name_node_port ); - hdfsBuilderSetUserName ( builder, param->hdfs_user ); + hdfsBuilderSetNameNode ( builder, o->name_node ); + hdfsBuilderSetNameNodePort( builder, o->name_node_port ); + hdfsBuilderSetUserName ( builder, o->user ); /* NOTE: hdfsBuilderConnect() frees the builder */ - param->hdfs_fs = hdfsBuilderConnect( builder ); - if ( ! param->hdfs_fs ) - ERR_SIMPLE("hdsfsBuilderConnect failed"); + o->fs = hdfsBuilderConnect( builder ); + if ( ! o->fs ) + ERR("hdsfsBuilderConnect failed"); - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- hdfs_connect [success]\n"); } } -static void hdfs_disconnect( IOR_param_t* param ) { - if (param->verbose >= VERBOSE_4) { +static void hdfs_disconnect( hdfs_options_t* o ) { + if (verbose >= VERBOSE_4) { printf("-> hdfs_disconnect\n"); } - if ( param->hdfs_fs ) { - hdfsDisconnect( param->hdfs_fs ); - param->hdfs_fs = NULL; + if ( o->fs ) { + hdfsDisconnect( o->fs ); + o->fs = NULL; } - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- hdfs_disconnect\n"); } } @@ -214,16 +329,17 @@ static void hdfs_disconnect( IOR_param_t* param ) { * Return an hdfsFile. */ -static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsigned char createFile ) { - if (param->verbose >= VERBOSE_4) { +static void *HDFS_Create_Or_Open( char *testFileName, int flags, aiori_mod_opt_t *param, unsigned char createFile ) { + if (verbose >= VERBOSE_4) { printf("-> HDFS_Create_Or_Open\n"); } + hdfs_options_t * o = (hdfs_options_t*) param; hdfsFile hdfs_file = NULL; int fd_oflags = 0, hdfs_return; /* initialize file-system handle, if needed */ - hdfs_connect( param ); + hdfs_connect( o ); /* * Check for unsupported flags. @@ -234,15 +350,15 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign * The other two, we just note that they are not supported and don't do them. */ - if ( param->openFlags & IOR_RDWR ) { + if ( flags & IOR_RDWR ) { ERR( "Opening or creating a file in RDWR is not implemented in HDFS" ); } - if ( param->openFlags & IOR_EXCL ) { + if ( flags & IOR_EXCL ) { fprintf( stdout, "Opening or creating a file in Exclusive mode is not implemented in HDFS\n" ); } - if ( param->openFlags & IOR_APPEND ) { + if ( flags & IOR_APPEND ) { fprintf( stdout, "Opening or creating a file for appending is not implemented in HDFS\n" ); } @@ -254,8 +370,8 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign fd_oflags = O_CREAT; } - if ( param->openFlags & IOR_WRONLY ) { - if ( !param->filePerProc ) { + if ( flags & IOR_WRONLY ) { + if ( ! hints->filePerProc ) { // in N-1 mode, only rank 0 truncates the file if ( rank != 0 ) { @@ -279,7 +395,7 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign * Now see if O_DIRECT is needed. */ - if ( param->useO_DIRECT == TRUE ) { + if ( o->direct_io == TRUE ) { hdfs_set_o_direct_flag( &fd_oflags ); } @@ -290,10 +406,7 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign * truncate each other's writes */ - if (( param->openFlags & IOR_WRONLY ) && - ( !param->filePerProc ) && - ( rank != 0 )) { - + if (( flags & IOR_WRONLY ) && ( ! hints->filePerProc ) && ( rank != 0 )) { MPI_CHECK(MPI_Barrier(testComm), "barrier error"); } @@ -301,21 +414,16 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign * Now rank zero can open and truncate, if necessary. */ - if (param->verbose >= VERBOSE_4) { - printf("\thdfsOpenFile(0x%llx, %s, 0%o, %d, %d, %d)\n", - param->hdfs_fs, + if (verbose >= VERBOSE_4) { + printf("\thdfsOpenFile(%p, %s, 0%o, %lld, %d, %lld)\n", + o->fs, testFileName, fd_oflags, /* shown in octal to compare w/ */ - param->transferSize, - param->hdfs_replicas, - param->hdfs_block_size); + hints->transferSize, + o->replicas, + o->block_size); } - hdfs_file = hdfsOpenFile( param->hdfs_fs, - testFileName, - fd_oflags, - param->transferSize, - param->hdfs_replicas, - param->hdfs_block_size); + hdfs_file = hdfsOpenFile( o->fs, testFileName, fd_oflags, hints->transferSize, o->replicas, o->block_size); if ( ! hdfs_file ) { ERR( "Failed to open the file" ); } @@ -324,14 +432,14 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign * For N-1 write, Rank 0 waits for the other ranks to open the file after it has. */ - if (( param->openFlags & IOR_WRONLY ) && - ( !param->filePerProc ) && + if (( flags & IOR_WRONLY ) && + ( !hints->filePerProc ) && ( rank == 0 )) { MPI_CHECK(MPI_Barrier(testComm), "barrier error"); } - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- HDFS_Create_Or_Open\n"); } return ((void *) hdfs_file ); @@ -341,36 +449,36 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign * Create and open a file through the HDFS interface. */ -static void *HDFS_Create( char *testFileName, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_4) { +static aiori_fd_t *HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t * param) { + if (verbose >= VERBOSE_4) { printf("-> HDFS_Create\n"); } - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- HDFS_Create\n"); } - return HDFS_Create_Or_Open( testFileName, param, TRUE ); + return HDFS_Create_Or_Open( testFileName, flags, param, TRUE ); } /* * Open a file through the HDFS interface. */ -static void *HDFS_Open( char *testFileName, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_4) { +static aiori_fd_t *HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t * param) { + if (verbose >= VERBOSE_4) { printf("-> HDFS_Open\n"); } - if ( param->openFlags & IOR_CREAT ) { - if (param->verbose >= VERBOSE_4) { + if ( flags & IOR_CREAT ) { + if (verbose >= VERBOSE_4) { printf("<- HDFS_Open( ... TRUE)\n"); } - return HDFS_Create_Or_Open( testFileName, param, TRUE ); + return HDFS_Create_Or_Open( testFileName, flags, param, TRUE ); } else { - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- HDFS_Open( ... FALSE)\n"); } - return HDFS_Create_Or_Open( testFileName, param, FALSE ); + return HDFS_Create_Or_Open( testFileName, flags, param, FALSE ); } } @@ -378,19 +486,18 @@ static void *HDFS_Open( char *testFileName, IOR_param_t * param ) { * Write or read to file using the HDFS interface. */ -static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, - IOR_offset_t length, IOR_param_t * param) { - if (param->verbose >= VERBOSE_4) { - printf("-> HDFS_Xfer(acc:%d, file:0x%llx, buf:0x%llx, len:%llu, 0x%llx)\n", +static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, + IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param) { + if (verbose >= VERBOSE_4) { + printf("-> HDFS_Xfer(acc:%d, file:%p, buf:%p, len:%llu, %p)\n", access, file, buffer, length, param); } - + hdfs_options_t * o = (hdfs_options_t*) param; int xferRetries = 0; long long remaining = (long long)length; char* ptr = (char *)buffer; long long rc; - off_t offset = param->offset; - hdfsFS hdfs_fs = param->hdfs_fs; /* (void*) */ + hdfsFS hdfs_fs = o->fs; /* (void*) */ hdfsFile hdfs_file = (hdfsFile)file; /* (void*) */ @@ -401,37 +508,34 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, if (verbose >= VERBOSE_4) { fprintf( stdout, "task %d writing to offset %lld\n", rank, - param->offset + length - remaining); + offset + length - remaining); } - if (param->verbose >= VERBOSE_4) { - printf("\thdfsWrite( 0x%llx, 0x%llx, 0x%llx, %lld)\n", + if (verbose >= VERBOSE_4) { + printf("\thdfsWrite( %p, %p, %p, %lld)\n", hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */ } rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining ); if ( rc < 0 ) { ERR( "hdfsWrite() failed" ); } - offset += rc; - if ( param->fsyncPerWrite == TRUE ) { - HDFS_Fsync( hdfs_file, param ); + if ( hints->fsyncPerWrite == TRUE ) { + HDFS_Fsync( file, param ); } } else { /* READ or CHECK */ if (verbose >= VERBOSE_4) { fprintf( stdout, "task %d reading from offset %lld\n", - rank, - param->offset + length - remaining ); + rank, offset + length - remaining ); } - if (param->verbose >= VERBOSE_4) { - printf("\thdfsRead( 0x%llx, 0x%llx, 0x%llx, %lld)\n", + if (verbose >= VERBOSE_4) { + printf("\thdfsRead( %p, %p, %p, %lld)\n", hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */ } - rc = hdfsRead( hdfs_fs, hdfs_file, ptr, remaining ); - + rc = hdfsPread(hdfs_fs, hdfs_file, offset, ptr, remaining); if ( rc == 0 ) { ERR( "hdfs_read() returned EOF prematurely" ); } @@ -449,9 +553,9 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, rank, access == WRITE ? "hdfsWrite()" : "hdfs_read()", rc, remaining, - param->offset + length - remaining ); + offset + length - remaining ); - if ( param->singleXferAttempt == TRUE ) { + if ( hints->singleXferAttempt == TRUE ) { MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ), "barrier error" ); } @@ -467,7 +571,16 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, xferRetries++; } - if (param->verbose >= VERBOSE_4) { + if(access == WRITE){ + // flush user buffer, this makes the write visible to readers + // it is the expected semantics of read/writes + rc = hdfsHFlush(hdfs_fs, hdfs_file); + if(rc != 0){ + WARN("Error during flush"); + } + } + + if (verbose >= VERBOSE_4) { printf("<- HDFS_Xfer\n"); } return ( length ); @@ -476,67 +589,38 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer, /* * Perform hdfs_sync(). */ - -static void HDFS_Fsync( void *fd, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_4) { - printf("-> HDFS_Fsync\n"); - } - hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */ +static void HDFS_Fsync(aiori_fd_t * fd, aiori_mod_opt_t * param) { + hdfs_options_t * o = (hdfs_options_t*) param; + hdfsFS hdfs_fs = o->fs; /* (void *) */ hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */ -#if 0 - if (param->verbose >= VERBOSE_4) { - printf("\thdfsHSync(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file); + if (verbose >= VERBOSE_4) { + printf("\thdfsFlush(%p, %p)\n", hdfs_fs, hdfs_file); } if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) { - EWARN( "hdfsHSync() failed" ); - } -#elif 0 - if (param->verbose >= VERBOSE_4) { - printf("\thdfsHFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file); - } - if ( hdfsHFlush( hdfs_fs, hdfs_file ) != 0 ) { - EWARN( "hdfsHFlush() failed" ); - } -#else - if (param->verbose >= VERBOSE_4) { - printf("\thdfsFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file); - } - if ( hdfsFlush( hdfs_fs, hdfs_file ) != 0 ) { + // Hsync is implemented to flush out data with newer Hadoop versions EWARN( "hdfsFlush() failed" ); } -#endif - - if (param->verbose >= VERBOSE_4) { - printf("<- HDFS_Fsync\n"); - } } /* * Close a file through the HDFS interface. */ -static void HDFS_Close( void *fd, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_4) { +static void HDFS_Close(aiori_fd_t * fd, aiori_mod_opt_t * param) { + if (verbose >= VERBOSE_4) { printf("-> HDFS_Close\n"); } + hdfs_options_t * o = (hdfs_options_t*) param; - hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */ + hdfsFS hdfs_fs = o->fs; /* (void *) */ hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */ - int open_flags; - - if ( param->openFlags & IOR_WRONLY ) { - open_flags = O_CREAT | O_WRONLY; - } else { - open_flags = O_RDONLY; - } - if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) { ERR( "hdfsCloseFile() failed" ); } - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- HDFS_Close\n"); } } @@ -547,119 +631,66 @@ static void HDFS_Close( void *fd, IOR_param_t * param ) { * NOTE: The signature for ior_aiori.delete doesn't include a parameter to * select recursive deletes. We'll assume that that is never needed. */ -static void HDFS_Delete( char *testFileName, IOR_param_t * param ) { - if (param->verbose >= VERBOSE_4) { +static void HDFS_Delete( char *testFileName, aiori_mod_opt_t * param ) { + if (verbose >= VERBOSE_4) { printf("-> HDFS_Delete\n"); } + hdfs_options_t * o = (hdfs_options_t*) param; char errmsg[256]; /* initialize file-system handle, if needed */ - hdfs_connect( param ); + hdfs_connect(o); - if ( ! param->hdfs_fs ) - ERR_SIMPLE( "Can't delete a file without an HDFS connection" ); + if ( ! o->fs ) + ERR( "Can't delete a file without an HDFS connection" ); - if ( hdfsDelete( param->hdfs_fs, testFileName, 0 ) != 0 ) { - sprintf(errmsg, - "[RANK %03d]: hdfsDelete() of file \"%s\" failed\n", + if ( hdfsDelete( o->fs, testFileName, 0 ) != 0 ) { + sprintf(errmsg, "[RANK %03d]: hdfsDelete() of file \"%s\" failed\n", rank, testFileName); EWARN( errmsg ); } - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- HDFS_Delete\n"); } } -/* - * Determine api version. - */ - -static void HDFS_SetVersion( IOR_param_t * param ) { - if (param->verbose >= VERBOSE_4) { - printf("-> HDFS_SetVersion\n"); - } - - strcpy( param->apiVersion, param->api ); - if (param->verbose >= VERBOSE_4) { - printf("<- HDFS_SetVersion\n"); - } -} - /* * Use hdfsGetPathInfo() to get info about file? * Is there an fstat we can use on hdfs? * Should we just use POSIX fstat? */ -static IOR_offset_t -HDFS_GetFileSize(IOR_param_t * param, - MPI_Comm testComm, +static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t * param, char * testFileName) { - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("-> HDFS_GetFileSize(%s)\n", testFileName); } + hdfs_options_t * o = (hdfs_options_t*) param; IOR_offset_t aggFileSizeFromStat; IOR_offset_t tmpMin, tmpMax, tmpSum; /* make sure file-system is connected */ - hdfs_connect( param ); + hdfs_connect( o ); /* file-info struct includes size in bytes */ - if (param->verbose >= VERBOSE_4) { - printf("\thdfsGetPathInfo(%s) ...", testFileName);fflush(stdout); + if (verbose >= VERBOSE_4) { + printf("\thdfsGetPathInfo(%s) ...", testFileName); + fflush(stdout); } - hdfsFileInfo* info = hdfsGetPathInfo( param->hdfs_fs, testFileName ); + hdfsFileInfo* info = hdfsGetPathInfo( o->fs, testFileName ); if ( ! info ) - ERR_SIMPLE( "hdfsGetPathInfo() failed" ); - if (param->verbose >= VERBOSE_4) { + ERR( "hdfsGetPathInfo() failed" ); + if (verbose >= VERBOSE_4) { printf("done.\n");fflush(stdout); } aggFileSizeFromStat = info->mSize; - if ( param->filePerProc == TRUE ) { - if (param->verbose >= VERBOSE_4) { - printf("\tall-reduce (1)\n"); - } - MPI_CHECK( - MPI_Allreduce( - &aggFileSizeFromStat, &tmpSum, 1, MPI_LONG_LONG_INT, MPI_SUM, testComm ), - "cannot total data moved" ); - - aggFileSizeFromStat = tmpSum; - } - else { - if (param->verbose >= VERBOSE_4) { - printf("\tall-reduce (2a)\n"); - } - MPI_CHECK( - MPI_Allreduce( - &aggFileSizeFromStat, &tmpMin, 1, MPI_LONG_LONG_INT, MPI_MIN, testComm ), - "cannot total data moved" ); - - if (param->verbose >= VERBOSE_4) { - printf("\tall-reduce (2b)\n"); - } - MPI_CHECK( - MPI_Allreduce( - &aggFileSizeFromStat, &tmpMax, 1, MPI_LONG_LONG_INT, MPI_MAX, testComm ), - "cannot total data moved" ); - - if ( tmpMin != tmpMax ) { - if ( rank == 0 ) { - WARN( "inconsistent file size by different tasks" ); - } - - /* incorrect, but now consistent across tasks */ - aggFileSizeFromStat = tmpMin; - } - } - - if (param->verbose >= VERBOSE_4) { + if (verbose >= VERBOSE_4) { printf("<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat); } return ( aggFileSizeFromStat ); diff --git a/src/ior.c b/src/ior.c index aa841de..64e3665 100755 --- a/src/ior.c +++ b/src/ior.c @@ -204,8 +204,6 @@ int ior_main(int argc, char **argv) void init_IOR_Param_t(IOR_param_t * p) { const char *default_aiori = aiori_default (); - char *hdfs_user; - assert (NULL != default_aiori); memset(p, 0, sizeof(IOR_param_t)); @@ -235,16 +233,6 @@ void init_IOR_Param_t(IOR_param_t * p) p->incompressibleSeed = 573; p->testComm = mpi_comm_world; - hdfs_user = getenv("USER"); - if (!hdfs_user) - hdfs_user = ""; - p->hdfs_user = strdup(hdfs_user); - p->hdfs_name_node = "default"; - p->hdfs_name_node_port = 0; /* ??? */ - p->hdfs_fs = NULL; - p->hdfs_replicas = 0; /* invokes the default */ - p->hdfs_block_size = 0; - p->URI = NULL; } diff --git a/src/ior.h b/src/ior.h index 3009720..9073d6a 100755 --- a/src/ior.h +++ b/src/ior.h @@ -160,14 +160,6 @@ typedef struct int fsyncPerWrite; /* fsync() after each write */ int fsync; /* fsync() after write */ - /* HDFS variables */ - char * hdfs_user; /* copied from ENV, for now */ - const char* hdfs_name_node; - tPort hdfs_name_node_port; /* (uint16_t) */ - hdfsFS hdfs_fs; /* file-system handle */ - int hdfs_replicas; /* n block replicas. (0 gets default) */ - int hdfs_block_size; /* internal blk-size. (0 gets default) */ - char* URI; /* "path" to target object */ /* RADOS variables */ diff --git a/testing/build-hdfs.sh b/testing/build-hdfs.sh new file mode 100755 index 0000000..0165dfb --- /dev/null +++ b/testing/build-hdfs.sh @@ -0,0 +1,18 @@ +#!/bin/bash +mkdir build-hdfs +cd build-hdfs + +VER=hadoop-3.2.1 +if [[ ! -e $VER.tar.gz ]] ; then + wget https://www.apache.org/dyn/closer.cgi/hadoop/common/$VER/$VER.tar.gz + tar -xf $VER.tar.gz +fi + +../configure --with-hdfs CFLAGS="-I$PWD/$VER/include/ -O0 -g3" LDFLAGS="-L$PWD/$VER/lib/native -Wl,-rpath=$PWD/$VER/lib/native" +make -j + + +echo "To run execute:" +echo export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/ +echo export CLASSPATH=$(find $VER/ -name "*.jar" -printf "%p:") +echo ./src/ior -a HDFS