/* -*- mode: c; indent-tabs-mode: t; -*- * vim:noexpandtab: * * Editing with tabs allows different users to pick their own indentation * appearance without changing the file. */ /* * Copyright (c) 2009, Los Alamos National Security, LLC All rights reserved. * Copyright 2009. Los Alamos National Security, LLC. This software was produced * under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National * Laboratory (LANL), which is operated by Los Alamos National Security, LLC for * the U.S. Department of Energy. The U.S. Government has rights to use, * reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS * ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR * ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is * modified to produce derivative works, such modified software should be * clearly marked, so as not to confuse it with the version available from * LANL. * * Additionally, redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following conditions are * met: * * • Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * • Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * • Neither the name of Los Alamos National Security, LLC, Los Alamos National * Laboratory, LANL, the U.S. Government, nor the names of its contributors may be * used to endorse or promote products derived from this software without specific * prior written permission. * * THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY * OF SUCH DAMAGE. */ /******************************************************************************\ * * Implement of abstract I/O interface for HDFS. * * HDFS has the added concept of a "File System Handle" which has to be * connected before files are opened. We store this in the IOR_param_t * object that is always passed to our functions. The thing that callers * think of as the "fd" is an hdfsFile, (a pointer). * \******************************************************************************/ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include #include #ifdef __linux__ # include /* necessary for: */ # define __USE_GNU /* O_DIRECT and */ # include /* IO operations */ # undef __USE_GNU #endif /* __linux__ */ #include #include /* IO operations */ #include #include /* #ifdef HAVE_LUSTRE_USER #include #endif */ #include "ior.h" #include "aiori.h" #include "utilities.h" #ifndef open64 /* necessary for TRU64 -- */ # define open64 open /* unlikely, but may pose */ #endif /* not open64 */ /* conflicting prototypes */ #ifndef lseek64 /* necessary for TRU64 -- */ # define lseek64 lseek /* unlikely, but may pose */ #endif /* not lseek64 */ /* conflicting prototypes */ #ifndef O_BINARY /* Required on Windows */ # define O_BINARY 0 #endif #include "hdfs.h" /**************************** P R O T O T Y P E S *****************************/ static aiori_fd_t *HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t * param); static aiori_fd_t *HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t * param); static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param); static void HDFS_Close(aiori_fd_t *, aiori_mod_opt_t *); static void HDFS_Delete(char *testFileName, aiori_mod_opt_t * param); static void HDFS_Fsync(aiori_fd_t *, aiori_mod_opt_t *); static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t *,char *); static void hdfs_xfer_hints(aiori_xfer_hint_t * params); static option_help * HDFS_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values); static int HDFS_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options); static int HDFS_rmdir (const char *path, aiori_mod_opt_t * options); static int HDFS_access (const char *path, int mode, aiori_mod_opt_t * options); static int HDFS_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options); static int HDFS_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options); static aiori_xfer_hint_t * hints = NULL; /************************** D E C L A R A T I O N S ***************************/ ior_aiori_t hdfs_aiori = { .name = "HDFS", .name_legacy = NULL, .create = HDFS_Create, .open = HDFS_Open, .xfer = HDFS_Xfer, .close = HDFS_Close, .delete = HDFS_Delete, .get_options = HDFS_options, .get_version = aiori_get_version, .xfer_hints = hdfs_xfer_hints, .fsync = HDFS_Fsync, .get_file_size = HDFS_GetFileSize, .statfs = HDFS_statfs, .mkdir = HDFS_mkdir, .rmdir = HDFS_rmdir, .access = HDFS_access, .stat = HDFS_stat, .enable_mdtest = true }; /***************************** F U N C T I O N S ******************************/ void hdfs_xfer_hints(aiori_xfer_hint_t * params){ hints = params; } /************************** O P T I O N S *****************************/ typedef struct { char * user; char * name_node; int replicas; /* n block replicas. (0 gets default) */ int direct_io; IOR_offset_t block_size; /* internal blk-size. (0 gets default) */ // runtime options hdfsFS fs; /* file-system handle */ tPort name_node_port; /* (uint16_t) */ } hdfs_options_t; static void hdfs_connect( hdfs_options_t* o ); option_help * HDFS_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){ hdfs_options_t * o = malloc(sizeof(hdfs_options_t)); if (init_values != NULL){ memcpy(o, init_values, sizeof(hdfs_options_t)); }else{ memset(o, 0, sizeof(hdfs_options_t)); char *hdfs_user; hdfs_user = getenv("USER"); if (!hdfs_user){ hdfs_user = ""; } o->user = strdup(hdfs_user); o->name_node = "default"; } *init_backend_options = (aiori_mod_opt_t*) o; option_help h [] = { {0, "hdfs.odirect", "Direct I/O Mode", OPTION_FLAG, 'd', & o->direct_io}, {0, "hdfs.user", "Username", OPTION_OPTIONAL_ARGUMENT, 's', & o->user}, {0, "hdfs.name_node", "Namenode", OPTION_OPTIONAL_ARGUMENT, 's', & o->name_node}, {0, "hdfs.replicas", "Number of replicas", OPTION_OPTIONAL_ARGUMENT, 'd', & o->replicas}, {0, "hdfs.block_size", "Blocksize", OPTION_OPTIONAL_ARGUMENT, 'l', & o->block_size}, LAST_OPTION }; option_help * help = malloc(sizeof(h)); memcpy(help, h, sizeof(h)); return help; } int HDFS_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){ hdfs_options_t * o = (hdfs_options_t*) options; hdfs_connect(o); return hdfsCreateDirectory(o->fs, path); } int HDFS_rmdir (const char *path, aiori_mod_opt_t * options){ hdfs_options_t * o = (hdfs_options_t*) options; hdfs_connect(o); return hdfsDelete(o->fs, path, 1); } int HDFS_access (const char *path, int mode, aiori_mod_opt_t * options){ hdfs_options_t * o = (hdfs_options_t*) options; hdfs_connect(o); return hdfsExists(o->fs, path); } int HDFS_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options){ hdfsFileInfo * stat; hdfs_options_t * o = (hdfs_options_t*) options; hdfs_connect(o); stat = hdfsGetPathInfo(o->fs, path); if(stat == NULL){ return 1; } memset(buf, 0, sizeof(struct stat)); buf->st_atime = stat->mLastAccess; buf->st_size = stat->mSize; buf->st_mtime = stat->mLastMod; buf->st_mode = stat->mPermissions; hdfsFreeFileInfo(stat, 1); return 0; } int HDFS_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options){ hdfs_options_t * o = (hdfs_options_t*) options; hdfs_connect(o); stat->f_bsize = hdfsGetDefaultBlockSize(o->fs); stat->f_blocks = hdfsGetCapacity(o->fs) / hdfsGetDefaultBlockSize(o->fs); stat->f_bfree = stat->f_blocks - hdfsGetUsed(o->fs) / hdfsGetDefaultBlockSize(o->fs); stat->f_bavail = 1; stat->f_files = 1; stat->f_ffree = 1; return 0; } /* This is identical to the one in aiori-POSIX.c Doesn't seem like * it would be appropriate in utilities.c. */ void hdfs_set_o_direct_flag(int *fd) { /* note that TRU64 needs O_DIRECTIO, SunOS uses directio(), and everyone else needs O_DIRECT */ #ifndef O_DIRECT # ifndef O_DIRECTIO WARN("cannot use O_DIRECT"); # define O_DIRECT 000000 # else /* O_DIRECTIO */ # define O_DIRECT O_DIRECTIO # endif /* not O_DIRECTIO */ #endif /* not O_DIRECT */ *fd |= O_DIRECT; } /* * "Connect" to an HDFS file-system. HDFS requires this be done before and * files are opened. It is easy for ior_aiori.open/create to assure that * we connect, if we haven't already done so. However, there's not a * simple way to assure that we disconnect after the last file-close. For * now, we'll make a special call at the end of ior.c * * NOTE: It's okay to call this thing whenever you need to be sure the HDFS * filesystem is connected. */ void hdfs_connect( hdfs_options_t* o ) { if (verbose >= VERBOSE_4) { printf("-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n", o->name_node, o->name_node_port, o->user ); } if ( o->fs ) { if (verbose >= VERBOSE_4) { printf("<- hdfs_connect [nothing to do]\n"); /* DEBUGGING */ } return; } /* initialize a builder, holding parameters for hdfsBuilderConnect() */ struct hdfsBuilder* builder = hdfsNewBuilder(); if ( ! builder ){ ERR("couldn't create an hdfsBuilder"); } hdfsBuilderSetForceNewInstance ( builder ); /* don't use cached instance */ hdfsBuilderSetNameNode ( builder, o->name_node ); hdfsBuilderSetNameNodePort( builder, o->name_node_port ); hdfsBuilderSetUserName ( builder, o->user ); /* NOTE: hdfsBuilderConnect() frees the builder */ o->fs = hdfsBuilderConnect( builder ); if ( ! o->fs ) ERR("hdsfsBuilderConnect failed"); if (verbose >= VERBOSE_4) { printf("<- hdfs_connect [success]\n"); } } static void hdfs_disconnect( hdfs_options_t* o ) { if (verbose >= VERBOSE_4) { printf("-> hdfs_disconnect\n"); } if ( o->fs ) { hdfsDisconnect( o->fs ); o->fs = NULL; } if (verbose >= VERBOSE_4) { printf("<- hdfs_disconnect\n"); } } /* * Create or open the file. Pass TRUE if creating and FALSE if opening an existing file. * Return an hdfsFile. */ static void *HDFS_Create_Or_Open( char *testFileName, int flags, aiori_mod_opt_t *param, unsigned char createFile ) { if (verbose >= VERBOSE_4) { printf("-> HDFS_Create_Or_Open\n"); } hdfs_options_t * o = (hdfs_options_t*) param; hdfsFile hdfs_file = NULL; int fd_oflags = 0, hdfs_return; /* initialize file-system handle, if needed */ hdfs_connect( o ); /* * Check for unsupported flags. * * If they want RDWR, we don't know if they're going to try to do both, so we * can't default to either O_RDONLY or O_WRONLY. Thus, we error and exit. * * The other two, we just note that they are not supported and don't do them. */ if ( flags & IOR_RDWR ) { ERR( "Opening or creating a file in RDWR is not implemented in HDFS" ); } if ( flags & IOR_EXCL ) { fprintf( stdout, "Opening or creating a file in Exclusive mode is not implemented in HDFS\n" ); } if ( flags & IOR_APPEND ) { fprintf( stdout, "Opening or creating a file for appending is not implemented in HDFS\n" ); } /* * Setup the flags to be used. */ if ( createFile == TRUE ) { fd_oflags = O_CREAT; } if ( flags & IOR_WRONLY ) { if ( ! hints->filePerProc ) { // in N-1 mode, only rank 0 truncates the file if ( rank != 0 ) { fd_oflags |= O_WRONLY; } else { fd_oflags |= O_TRUNC; fd_oflags |= O_WRONLY; } } else { // in N-N mode, everyone does truncate fd_oflags |= O_TRUNC; fd_oflags |= O_WRONLY; } } else { fd_oflags |= O_RDONLY; } /* * Now see if O_DIRECT is needed. */ if ( o->direct_io == TRUE ) { hdfs_set_o_direct_flag( &fd_oflags ); } /* * For N-1 write, All other ranks wait for Rank 0 to open the file. * this is bec 0 does truncate and rest don't * it would be dangerous for all to do truncate as they might * truncate each other's writes */ if (( flags & IOR_WRONLY ) && ( ! hints->filePerProc ) && ( rank != 0 )) { MPI_CHECK(MPI_Barrier(testComm), "barrier error"); } /* * Now rank zero can open and truncate, if necessary. */ if (verbose >= VERBOSE_4) { printf("\thdfsOpenFile(%p, %s, 0%o, %lld, %d, %lld)\n", o->fs, testFileName, fd_oflags, /* shown in octal to compare w/ */ hints->transferSize, o->replicas, o->block_size); } hdfs_file = hdfsOpenFile( o->fs, testFileName, fd_oflags, hints->transferSize, o->replicas, o->block_size); if ( ! hdfs_file ) { ERR( "Failed to open the file" ); } /* * For N-1 write, Rank 0 waits for the other ranks to open the file after it has. */ if (( flags & IOR_WRONLY ) && ( !hints->filePerProc ) && ( rank == 0 )) { MPI_CHECK(MPI_Barrier(testComm), "barrier error"); } if (verbose >= VERBOSE_4) { printf("<- HDFS_Create_Or_Open\n"); } return ((void *) hdfs_file ); } /* * Create and open a file through the HDFS interface. */ static aiori_fd_t *HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t * param) { if (verbose >= VERBOSE_4) { printf("-> HDFS_Create\n"); } if (verbose >= VERBOSE_4) { printf("<- HDFS_Create\n"); } return HDFS_Create_Or_Open( testFileName, flags, param, TRUE ); } /* * Open a file through the HDFS interface. */ static aiori_fd_t *HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t * param) { if (verbose >= VERBOSE_4) { printf("-> HDFS_Open\n"); } if ( flags & IOR_CREAT ) { if (verbose >= VERBOSE_4) { printf("<- HDFS_Open( ... TRUE)\n"); } return HDFS_Create_Or_Open( testFileName, flags, param, TRUE ); } else { if (verbose >= VERBOSE_4) { printf("<- HDFS_Open( ... FALSE)\n"); } return HDFS_Create_Or_Open( testFileName, flags, param, FALSE ); } } /* * Write or read to file using the HDFS interface. */ static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param) { if (verbose >= VERBOSE_4) { printf("-> HDFS_Xfer(acc:%d, file:%p, buf:%p, len:%llu, %p)\n", access, file, buffer, length, param); } hdfs_options_t * o = (hdfs_options_t*) param; int xferRetries = 0; long long remaining = (long long)length; char* ptr = (char *)buffer; long long rc; hdfsFS hdfs_fs = o->fs; /* (void*) */ hdfsFile hdfs_file = (hdfsFile)file; /* (void*) */ while ( remaining > 0 ) { /* write/read file */ if (access == WRITE) { /* WRITE */ if (verbose >= VERBOSE_4) { fprintf( stdout, "task %d writing to offset %lld\n", rank, offset + length - remaining); } if (verbose >= VERBOSE_4) { printf("\thdfsWrite( %p, %p, %p, %lld)\n", hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */ } rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining ); if ( rc < 0 ) { ERR( "hdfsWrite() failed" ); } offset += rc; if ( hints->fsyncPerWrite == TRUE ) { HDFS_Fsync( file, param ); } } else { /* READ or CHECK */ if (verbose >= VERBOSE_4) { fprintf( stdout, "task %d reading from offset %lld\n", rank, offset + length - remaining ); } if (verbose >= VERBOSE_4) { printf("\thdfsRead( %p, %p, %p, %lld)\n", hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */ } rc = hdfsPread(hdfs_fs, hdfs_file, offset, ptr, remaining); if ( rc == 0 ) { ERR( "hdfs_read() returned EOF prematurely" ); } if ( rc < 0 ) { ERR( "hdfs_read() failed" ); } offset += rc; } if ( rc < remaining ) { fprintf(stdout, "WARNING: Task %d, partial %s, %lld of %lld bytes at offset %lld\n", rank, access == WRITE ? "hdfsWrite()" : "hdfs_read()", rc, remaining, offset + length - remaining ); if ( hints->singleXferAttempt == TRUE ) { MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ), "barrier error" ); } if ( xferRetries > MAX_RETRY ) { ERR( "too many retries -- aborting" ); } } assert( rc >= 0 ); assert( rc <= remaining ); remaining -= rc; ptr += rc; xferRetries++; } if(access == WRITE){ // flush user buffer, this makes the write visible to readers // it is the expected semantics of read/writes rc = hdfsHFlush(hdfs_fs, hdfs_file); if(rc != 0){ WARN("Error during flush"); } } if (verbose >= VERBOSE_4) { printf("<- HDFS_Xfer\n"); } return ( length ); } /* * Perform hdfs_sync(). */ static void HDFS_Fsync(aiori_fd_t * fd, aiori_mod_opt_t * param) { hdfs_options_t * o = (hdfs_options_t*) param; hdfsFS hdfs_fs = o->fs; /* (void *) */ hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */ if (verbose >= VERBOSE_4) { printf("\thdfsFlush(%p, %p)\n", hdfs_fs, hdfs_file); } if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) { // Hsync is implemented to flush out data with newer Hadoop versions EWARN( "hdfsFlush() failed" ); } } /* * Close a file through the HDFS interface. */ static void HDFS_Close(aiori_fd_t * fd, aiori_mod_opt_t * param) { if (verbose >= VERBOSE_4) { printf("-> HDFS_Close\n"); } hdfs_options_t * o = (hdfs_options_t*) param; hdfsFS hdfs_fs = o->fs; /* (void *) */ hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */ if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) { ERR( "hdfsCloseFile() failed" ); } if (verbose >= VERBOSE_4) { printf("<- HDFS_Close\n"); } } /* * Delete a file through the HDFS interface. * * NOTE: The signature for ior_aiori.delete doesn't include a parameter to * select recursive deletes. We'll assume that that is never needed. */ static void HDFS_Delete( char *testFileName, aiori_mod_opt_t * param ) { if (verbose >= VERBOSE_4) { printf("-> HDFS_Delete\n"); } hdfs_options_t * o = (hdfs_options_t*) param; char errmsg[256]; /* initialize file-system handle, if needed */ hdfs_connect(o); if ( ! o->fs ) ERR( "Can't delete a file without an HDFS connection" ); if ( hdfsDelete( o->fs, testFileName, 0 ) != 0 ) { sprintf(errmsg, "[RANK %03d]: hdfsDelete() of file \"%s\" failed\n", rank, testFileName); EWARN( errmsg ); } if (verbose >= VERBOSE_4) { printf("<- HDFS_Delete\n"); } } /* * Use hdfsGetPathInfo() to get info about file? * Is there an fstat we can use on hdfs? * Should we just use POSIX fstat? */ static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t * param, char * testFileName) { if (verbose >= VERBOSE_4) { printf("-> HDFS_GetFileSize(%s)\n", testFileName); } hdfs_options_t * o = (hdfs_options_t*) param; IOR_offset_t aggFileSizeFromStat; IOR_offset_t tmpMin, tmpMax, tmpSum; /* make sure file-system is connected */ hdfs_connect( o ); /* file-info struct includes size in bytes */ if (verbose >= VERBOSE_4) { printf("\thdfsGetPathInfo(%s) ...", testFileName); fflush(stdout); } hdfsFileInfo* info = hdfsGetPathInfo( o->fs, testFileName ); if ( ! info ) ERR( "hdfsGetPathInfo() failed" ); if (verbose >= VERBOSE_4) { printf("done.\n");fflush(stdout); } aggFileSizeFromStat = info->mSize; if (verbose >= VERBOSE_4) { printf("<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat); } return ( aggFileSizeFromStat ); }