HDFS module ported to current AIORI API and improved (#302)

* HDFS module ported to current AIORI API and extended
* Added instructions how to build and run with a HDFS.
* Replaced read with pread to support offsets.
* Implemented HDFS MDTest API
* Improved sync semantics
master
Julian Kunkel 2020-12-23 11:51:31 +00:00 committed by GitHub
parent 3e4a0f69d9
commit 8de13884a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 257 additions and 228 deletions

View File

@ -81,10 +81,9 @@
#include <lustre/lustre_user.h>
#endif
*/
#include "ior.h"
#include "aiori.h"
#include "iordef.h"
#include "utilities.h"
#ifndef open64 /* necessary for TRU64 -- */
# define open64 open /* unlikely, but may pose */
@ -101,15 +100,23 @@
#include "hdfs.h"
/**************************** P R O T O T Y P E S *****************************/
static void *HDFS_Create(char *, IOR_param_t *);
static void *HDFS_Open(char *, IOR_param_t *);
static IOR_offset_t HDFS_Xfer(int, void *, IOR_size_t *,
IOR_offset_t, IOR_param_t *);
static void HDFS_Close(void *, IOR_param_t *);
static void HDFS_Delete(char *, IOR_param_t *);
static void HDFS_SetVersion(IOR_param_t *);
static void HDFS_Fsync(void *, IOR_param_t *);
static IOR_offset_t HDFS_GetFileSize(IOR_param_t *, MPI_Comm, char *);
static aiori_fd_t *HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t * param);
static aiori_fd_t *HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t * param);
static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param);
static void HDFS_Close(aiori_fd_t *, aiori_mod_opt_t *);
static void HDFS_Delete(char *testFileName, aiori_mod_opt_t * param);
static void HDFS_Fsync(aiori_fd_t *, aiori_mod_opt_t *);
static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t *,char *);
static void hdfs_xfer_hints(aiori_xfer_hint_t * params);
static option_help * HDFS_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
static int HDFS_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options);
static int HDFS_rmdir (const char *path, aiori_mod_opt_t * options);
static int HDFS_access (const char *path, int mode, aiori_mod_opt_t * options);
static int HDFS_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options);
static int HDFS_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options);
static aiori_xfer_hint_t * hints = NULL;
/************************** D E C L A R A T I O N S ***************************/
@ -121,13 +128,120 @@ ior_aiori_t hdfs_aiori = {
.xfer = HDFS_Xfer,
.close = HDFS_Close,
.delete = HDFS_Delete,
.set_version = HDFS_SetVersion,
.get_options = HDFS_options,
.get_version = aiori_get_version,
.xfer_hints = hdfs_xfer_hints,
.fsync = HDFS_Fsync,
.get_file_size = HDFS_GetFileSize,
.statfs = HDFS_statfs,
.mkdir = HDFS_mkdir,
.rmdir = HDFS_rmdir,
.access = HDFS_access,
.stat = HDFS_stat,
.enable_mdtest = true
};
/***************************** F U N C T I O N S ******************************/
void hdfs_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
}
/************************** O P T I O N S *****************************/
typedef struct {
char * user;
char * name_node;
int replicas; /* n block replicas. (0 gets default) */
int direct_io;
IOR_offset_t block_size; /* internal blk-size. (0 gets default) */
// runtime options
hdfsFS fs; /* file-system handle */
tPort name_node_port; /* (uint16_t) */
} hdfs_options_t;
static void hdfs_connect( hdfs_options_t* o );
option_help * HDFS_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
hdfs_options_t * o = malloc(sizeof(hdfs_options_t));
if (init_values != NULL){
memcpy(o, init_values, sizeof(hdfs_options_t));
}else{
memset(o, 0, sizeof(hdfs_options_t));
char *hdfs_user;
hdfs_user = getenv("USER");
if (!hdfs_user){
hdfs_user = "";
}
o->user = strdup(hdfs_user);
o->name_node = "default";
}
*init_backend_options = (aiori_mod_opt_t*) o;
option_help h [] = {
{0, "hdfs.odirect", "Direct I/O Mode", OPTION_FLAG, 'd', & o->direct_io},
{0, "hdfs.user", "Username", OPTION_OPTIONAL_ARGUMENT, 's', & o->user},
{0, "hdfs.name_node", "Namenode", OPTION_OPTIONAL_ARGUMENT, 's', & o->name_node},
{0, "hdfs.replicas", "Number of replicas", OPTION_OPTIONAL_ARGUMENT, 'd', & o->replicas},
{0, "hdfs.block_size", "Blocksize", OPTION_OPTIONAL_ARGUMENT, 'l', & o->block_size},
LAST_OPTION
};
option_help * help = malloc(sizeof(h));
memcpy(help, h, sizeof(h));
return help;
}
int HDFS_mkdir (const char *path, mode_t mode, aiori_mod_opt_t * options){
hdfs_options_t * o = (hdfs_options_t*) options;
hdfs_connect(o);
return hdfsCreateDirectory(o->fs, path);
}
int HDFS_rmdir (const char *path, aiori_mod_opt_t * options){
hdfs_options_t * o = (hdfs_options_t*) options;
hdfs_connect(o);
return hdfsDelete(o->fs, path, 1);
}
int HDFS_access (const char *path, int mode, aiori_mod_opt_t * options){
hdfs_options_t * o = (hdfs_options_t*) options;
hdfs_connect(o);
return hdfsExists(o->fs, path);
}
int HDFS_stat (const char *path, struct stat *buf, aiori_mod_opt_t * options){
hdfsFileInfo * stat;
hdfs_options_t * o = (hdfs_options_t*) options;
hdfs_connect(o);
stat = hdfsGetPathInfo(o->fs, path);
if(stat == NULL){
return 1;
}
memset(buf, 0, sizeof(struct stat));
buf->st_atime = stat->mLastAccess;
buf->st_size = stat->mSize;
buf->st_mtime = stat->mLastMod;
buf->st_mode = stat->mPermissions;
hdfsFreeFileInfo(stat, 1);
return 0;
}
int HDFS_statfs (const char * path, ior_aiori_statfs_t * stat, aiori_mod_opt_t * options){
hdfs_options_t * o = (hdfs_options_t*) options;
hdfs_connect(o);
stat->f_bsize = hdfsGetDefaultBlockSize(o->fs);
stat->f_blocks = hdfsGetCapacity(o->fs) / hdfsGetDefaultBlockSize(o->fs);
stat->f_bfree = stat->f_blocks - hdfsGetUsed(o->fs) / hdfsGetDefaultBlockSize(o->fs);
stat->f_bavail = 1;
stat->f_files = 1;
stat->f_ffree = 1;
return 0;
}
/* This is identical to the one in aiori-POSIX.c Doesn't seem like
* it would be appropriate in utilities.c.
*/
@ -159,16 +273,16 @@ void hdfs_set_o_direct_flag(int *fd)
* NOTE: It's okay to call this thing whenever you need to be sure the HDFS
* filesystem is connected.
*/
static void hdfs_connect( IOR_param_t* param ) {
if (param->verbose >= VERBOSE_4) {
void hdfs_connect( hdfs_options_t* o ) {
if (verbose >= VERBOSE_4) {
printf("-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n",
param->hdfs_name_node,
param->hdfs_name_node_port,
param->hdfs_user );
o->name_node,
o->name_node_port,
o->user );
}
if ( param->hdfs_fs ) {
if (param->verbose >= VERBOSE_4) {
if ( o->fs ) {
if (verbose >= VERBOSE_4) {
printf("<- hdfs_connect [nothing to do]\n"); /* DEBUGGING */
}
return;
@ -176,34 +290,35 @@ static void hdfs_connect( IOR_param_t* param ) {
/* initialize a builder, holding parameters for hdfsBuilderConnect() */
struct hdfsBuilder* builder = hdfsNewBuilder();
if ( ! builder )
ERR_SIMPLE("couldn't create an hdfsBuilder");
if ( ! builder ){
ERR("couldn't create an hdfsBuilder");
}
hdfsBuilderSetForceNewInstance ( builder ); /* don't use cached instance */
hdfsBuilderSetNameNode ( builder, param->hdfs_name_node );
hdfsBuilderSetNameNodePort( builder, param->hdfs_name_node_port );
hdfsBuilderSetUserName ( builder, param->hdfs_user );
hdfsBuilderSetNameNode ( builder, o->name_node );
hdfsBuilderSetNameNodePort( builder, o->name_node_port );
hdfsBuilderSetUserName ( builder, o->user );
/* NOTE: hdfsBuilderConnect() frees the builder */
param->hdfs_fs = hdfsBuilderConnect( builder );
if ( ! param->hdfs_fs )
ERR_SIMPLE("hdsfsBuilderConnect failed");
o->fs = hdfsBuilderConnect( builder );
if ( ! o->fs )
ERR("hdsfsBuilderConnect failed");
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- hdfs_connect [success]\n");
}
}
static void hdfs_disconnect( IOR_param_t* param ) {
if (param->verbose >= VERBOSE_4) {
static void hdfs_disconnect( hdfs_options_t* o ) {
if (verbose >= VERBOSE_4) {
printf("-> hdfs_disconnect\n");
}
if ( param->hdfs_fs ) {
hdfsDisconnect( param->hdfs_fs );
param->hdfs_fs = NULL;
if ( o->fs ) {
hdfsDisconnect( o->fs );
o->fs = NULL;
}
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- hdfs_disconnect\n");
}
}
@ -214,16 +329,17 @@ static void hdfs_disconnect( IOR_param_t* param ) {
* Return an hdfsFile.
*/
static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsigned char createFile ) {
if (param->verbose >= VERBOSE_4) {
static void *HDFS_Create_Or_Open( char *testFileName, int flags, aiori_mod_opt_t *param, unsigned char createFile ) {
if (verbose >= VERBOSE_4) {
printf("-> HDFS_Create_Or_Open\n");
}
hdfs_options_t * o = (hdfs_options_t*) param;
hdfsFile hdfs_file = NULL;
int fd_oflags = 0, hdfs_return;
/* initialize file-system handle, if needed */
hdfs_connect( param );
hdfs_connect( o );
/*
* Check for unsupported flags.
@ -234,15 +350,15 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
* The other two, we just note that they are not supported and don't do them.
*/
if ( param->openFlags & IOR_RDWR ) {
if ( flags & IOR_RDWR ) {
ERR( "Opening or creating a file in RDWR is not implemented in HDFS" );
}
if ( param->openFlags & IOR_EXCL ) {
if ( flags & IOR_EXCL ) {
fprintf( stdout, "Opening or creating a file in Exclusive mode is not implemented in HDFS\n" );
}
if ( param->openFlags & IOR_APPEND ) {
if ( flags & IOR_APPEND ) {
fprintf( stdout, "Opening or creating a file for appending is not implemented in HDFS\n" );
}
@ -254,8 +370,8 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
fd_oflags = O_CREAT;
}
if ( param->openFlags & IOR_WRONLY ) {
if ( !param->filePerProc ) {
if ( flags & IOR_WRONLY ) {
if ( ! hints->filePerProc ) {
// in N-1 mode, only rank 0 truncates the file
if ( rank != 0 ) {
@ -279,7 +395,7 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
* Now see if O_DIRECT is needed.
*/
if ( param->useO_DIRECT == TRUE ) {
if ( o->direct_io == TRUE ) {
hdfs_set_o_direct_flag( &fd_oflags );
}
@ -290,10 +406,7 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
* truncate each other's writes
*/
if (( param->openFlags & IOR_WRONLY ) &&
( !param->filePerProc ) &&
( rank != 0 )) {
if (( flags & IOR_WRONLY ) && ( ! hints->filePerProc ) && ( rank != 0 )) {
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
}
@ -301,21 +414,16 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
* Now rank zero can open and truncate, if necessary.
*/
if (param->verbose >= VERBOSE_4) {
printf("\thdfsOpenFile(0x%llx, %s, 0%o, %d, %d, %d)\n",
param->hdfs_fs,
if (verbose >= VERBOSE_4) {
printf("\thdfsOpenFile(%p, %s, 0%o, %lld, %d, %lld)\n",
o->fs,
testFileName,
fd_oflags, /* shown in octal to compare w/ <bits/fcntl.h> */
param->transferSize,
param->hdfs_replicas,
param->hdfs_block_size);
hints->transferSize,
o->replicas,
o->block_size);
}
hdfs_file = hdfsOpenFile( param->hdfs_fs,
testFileName,
fd_oflags,
param->transferSize,
param->hdfs_replicas,
param->hdfs_block_size);
hdfs_file = hdfsOpenFile( o->fs, testFileName, fd_oflags, hints->transferSize, o->replicas, o->block_size);
if ( ! hdfs_file ) {
ERR( "Failed to open the file" );
}
@ -324,14 +432,14 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
* For N-1 write, Rank 0 waits for the other ranks to open the file after it has.
*/
if (( param->openFlags & IOR_WRONLY ) &&
( !param->filePerProc ) &&
if (( flags & IOR_WRONLY ) &&
( !hints->filePerProc ) &&
( rank == 0 )) {
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
}
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- HDFS_Create_Or_Open\n");
}
return ((void *) hdfs_file );
@ -341,36 +449,36 @@ static void *HDFS_Create_Or_Open( char *testFileName, IOR_param_t *param, unsign
* Create and open a file through the HDFS interface.
*/
static void *HDFS_Create( char *testFileName, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
static aiori_fd_t *HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t * param) {
if (verbose >= VERBOSE_4) {
printf("-> HDFS_Create\n");
}
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- HDFS_Create\n");
}
return HDFS_Create_Or_Open( testFileName, param, TRUE );
return HDFS_Create_Or_Open( testFileName, flags, param, TRUE );
}
/*
* Open a file through the HDFS interface.
*/
static void *HDFS_Open( char *testFileName, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
static aiori_fd_t *HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t * param) {
if (verbose >= VERBOSE_4) {
printf("-> HDFS_Open\n");
}
if ( param->openFlags & IOR_CREAT ) {
if (param->verbose >= VERBOSE_4) {
if ( flags & IOR_CREAT ) {
if (verbose >= VERBOSE_4) {
printf("<- HDFS_Open( ... TRUE)\n");
}
return HDFS_Create_Or_Open( testFileName, param, TRUE );
return HDFS_Create_Or_Open( testFileName, flags, param, TRUE );
}
else {
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- HDFS_Open( ... FALSE)\n");
}
return HDFS_Create_Or_Open( testFileName, param, FALSE );
return HDFS_Create_Or_Open( testFileName, flags, param, FALSE );
}
}
@ -378,19 +486,18 @@ static void *HDFS_Open( char *testFileName, IOR_param_t * param ) {
* Write or read to file using the HDFS interface.
*/
static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_param_t * param) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Xfer(acc:%d, file:0x%llx, buf:0x%llx, len:%llu, 0x%llx)\n",
static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t * buffer,
IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * param) {
if (verbose >= VERBOSE_4) {
printf("-> HDFS_Xfer(acc:%d, file:%p, buf:%p, len:%llu, %p)\n",
access, file, buffer, length, param);
}
hdfs_options_t * o = (hdfs_options_t*) param;
int xferRetries = 0;
long long remaining = (long long)length;
char* ptr = (char *)buffer;
long long rc;
off_t offset = param->offset;
hdfsFS hdfs_fs = param->hdfs_fs; /* (void*) */
hdfsFS hdfs_fs = o->fs; /* (void*) */
hdfsFile hdfs_file = (hdfsFile)file; /* (void*) */
@ -401,37 +508,34 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
if (verbose >= VERBOSE_4) {
fprintf( stdout, "task %d writing to offset %lld\n",
rank,
param->offset + length - remaining);
offset + length - remaining);
}
if (param->verbose >= VERBOSE_4) {
printf("\thdfsWrite( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
if (verbose >= VERBOSE_4) {
printf("\thdfsWrite( %p, %p, %p, %lld)\n",
hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */
}
rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining );
if ( rc < 0 ) {
ERR( "hdfsWrite() failed" );
}
offset += rc;
if ( param->fsyncPerWrite == TRUE ) {
HDFS_Fsync( hdfs_file, param );
if ( hints->fsyncPerWrite == TRUE ) {
HDFS_Fsync( file, param );
}
}
else { /* READ or CHECK */
if (verbose >= VERBOSE_4) {
fprintf( stdout, "task %d reading from offset %lld\n",
rank,
param->offset + length - remaining );
rank, offset + length - remaining );
}
if (param->verbose >= VERBOSE_4) {
printf("\thdfsRead( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
if (verbose >= VERBOSE_4) {
printf("\thdfsRead( %p, %p, %p, %lld)\n",
hdfs_fs, hdfs_file, ptr, remaining ); /* DEBUGGING */
}
rc = hdfsRead( hdfs_fs, hdfs_file, ptr, remaining );
rc = hdfsPread(hdfs_fs, hdfs_file, offset, ptr, remaining);
if ( rc == 0 ) {
ERR( "hdfs_read() returned EOF prematurely" );
}
@ -449,9 +553,9 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
rank,
access == WRITE ? "hdfsWrite()" : "hdfs_read()",
rc, remaining,
param->offset + length - remaining );
offset + length - remaining );
if ( param->singleXferAttempt == TRUE ) {
if ( hints->singleXferAttempt == TRUE ) {
MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ), "barrier error" );
}
@ -467,7 +571,16 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
xferRetries++;
}
if (param->verbose >= VERBOSE_4) {
if(access == WRITE){
// flush user buffer, this makes the write visible to readers
// it is the expected semantics of read/writes
rc = hdfsHFlush(hdfs_fs, hdfs_file);
if(rc != 0){
WARN("Error during flush");
}
}
if (verbose >= VERBOSE_4) {
printf("<- HDFS_Xfer\n");
}
return ( length );
@ -476,67 +589,38 @@ static IOR_offset_t HDFS_Xfer(int access, void *file, IOR_size_t * buffer,
/*
* Perform hdfs_sync().
*/
static void HDFS_Fsync( void *fd, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_Fsync\n");
}
hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */
static void HDFS_Fsync(aiori_fd_t * fd, aiori_mod_opt_t * param) {
hdfs_options_t * o = (hdfs_options_t*) param;
hdfsFS hdfs_fs = o->fs; /* (void *) */
hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */
#if 0
if (param->verbose >= VERBOSE_4) {
printf("\thdfsHSync(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
if (verbose >= VERBOSE_4) {
printf("\thdfsFlush(%p, %p)\n", hdfs_fs, hdfs_file);
}
if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) {
EWARN( "hdfsHSync() failed" );
}
#elif 0
if (param->verbose >= VERBOSE_4) {
printf("\thdfsHFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
}
if ( hdfsHFlush( hdfs_fs, hdfs_file ) != 0 ) {
EWARN( "hdfsHFlush() failed" );
}
#else
if (param->verbose >= VERBOSE_4) {
printf("\thdfsFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
}
if ( hdfsFlush( hdfs_fs, hdfs_file ) != 0 ) {
// Hsync is implemented to flush out data with newer Hadoop versions
EWARN( "hdfsFlush() failed" );
}
#endif
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_Fsync\n");
}
}
/*
* Close a file through the HDFS interface.
*/
static void HDFS_Close( void *fd, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
static void HDFS_Close(aiori_fd_t * fd, aiori_mod_opt_t * param) {
if (verbose >= VERBOSE_4) {
printf("-> HDFS_Close\n");
}
hdfs_options_t * o = (hdfs_options_t*) param;
hdfsFS hdfs_fs = param->hdfs_fs; /* (void *) */
hdfsFS hdfs_fs = o->fs; /* (void *) */
hdfsFile hdfs_file = (hdfsFile)fd; /* (void *) */
int open_flags;
if ( param->openFlags & IOR_WRONLY ) {
open_flags = O_CREAT | O_WRONLY;
} else {
open_flags = O_RDONLY;
}
if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) {
ERR( "hdfsCloseFile() failed" );
}
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- HDFS_Close\n");
}
}
@ -547,119 +631,66 @@ static void HDFS_Close( void *fd, IOR_param_t * param ) {
* NOTE: The signature for ior_aiori.delete doesn't include a parameter to
* select recursive deletes. We'll assume that that is never needed.
*/
static void HDFS_Delete( char *testFileName, IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
static void HDFS_Delete( char *testFileName, aiori_mod_opt_t * param ) {
if (verbose >= VERBOSE_4) {
printf("-> HDFS_Delete\n");
}
hdfs_options_t * o = (hdfs_options_t*) param;
char errmsg[256];
/* initialize file-system handle, if needed */
hdfs_connect( param );
hdfs_connect(o);
if ( ! param->hdfs_fs )
ERR_SIMPLE( "Can't delete a file without an HDFS connection" );
if ( ! o->fs )
ERR( "Can't delete a file without an HDFS connection" );
if ( hdfsDelete( param->hdfs_fs, testFileName, 0 ) != 0 ) {
sprintf(errmsg,
"[RANK %03d]: hdfsDelete() of file \"%s\" failed\n",
if ( hdfsDelete( o->fs, testFileName, 0 ) != 0 ) {
sprintf(errmsg, "[RANK %03d]: hdfsDelete() of file \"%s\" failed\n",
rank, testFileName);
EWARN( errmsg );
}
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- HDFS_Delete\n");
}
}
/*
* Determine api version.
*/
static void HDFS_SetVersion( IOR_param_t * param ) {
if (param->verbose >= VERBOSE_4) {
printf("-> HDFS_SetVersion\n");
}
strcpy( param->apiVersion, param->api );
if (param->verbose >= VERBOSE_4) {
printf("<- HDFS_SetVersion\n");
}
}
/*
* Use hdfsGetPathInfo() to get info about file?
* Is there an fstat we can use on hdfs?
* Should we just use POSIX fstat?
*/
static IOR_offset_t
HDFS_GetFileSize(IOR_param_t * param,
MPI_Comm testComm,
static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t * param,
char * testFileName) {
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("-> HDFS_GetFileSize(%s)\n", testFileName);
}
hdfs_options_t * o = (hdfs_options_t*) param;
IOR_offset_t aggFileSizeFromStat;
IOR_offset_t tmpMin, tmpMax, tmpSum;
/* make sure file-system is connected */
hdfs_connect( param );
hdfs_connect( o );
/* file-info struct includes size in bytes */
if (param->verbose >= VERBOSE_4) {
printf("\thdfsGetPathInfo(%s) ...", testFileName);fflush(stdout);
if (verbose >= VERBOSE_4) {
printf("\thdfsGetPathInfo(%s) ...", testFileName);
fflush(stdout);
}
hdfsFileInfo* info = hdfsGetPathInfo( param->hdfs_fs, testFileName );
hdfsFileInfo* info = hdfsGetPathInfo( o->fs, testFileName );
if ( ! info )
ERR_SIMPLE( "hdfsGetPathInfo() failed" );
if (param->verbose >= VERBOSE_4) {
ERR( "hdfsGetPathInfo() failed" );
if (verbose >= VERBOSE_4) {
printf("done.\n");fflush(stdout);
}
aggFileSizeFromStat = info->mSize;
if ( param->filePerProc == TRUE ) {
if (param->verbose >= VERBOSE_4) {
printf("\tall-reduce (1)\n");
}
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpSum, 1, MPI_LONG_LONG_INT, MPI_SUM, testComm ),
"cannot total data moved" );
aggFileSizeFromStat = tmpSum;
}
else {
if (param->verbose >= VERBOSE_4) {
printf("\tall-reduce (2a)\n");
}
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpMin, 1, MPI_LONG_LONG_INT, MPI_MIN, testComm ),
"cannot total data moved" );
if (param->verbose >= VERBOSE_4) {
printf("\tall-reduce (2b)\n");
}
MPI_CHECK(
MPI_Allreduce(
&aggFileSizeFromStat, &tmpMax, 1, MPI_LONG_LONG_INT, MPI_MAX, testComm ),
"cannot total data moved" );
if ( tmpMin != tmpMax ) {
if ( rank == 0 ) {
WARN( "inconsistent file size by different tasks" );
}
/* incorrect, but now consistent across tasks */
aggFileSizeFromStat = tmpMin;
}
}
if (param->verbose >= VERBOSE_4) {
if (verbose >= VERBOSE_4) {
printf("<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat);
}
return ( aggFileSizeFromStat );

View File

@ -204,8 +204,6 @@ int ior_main(int argc, char **argv)
void init_IOR_Param_t(IOR_param_t * p)
{
const char *default_aiori = aiori_default ();
char *hdfs_user;
assert (NULL != default_aiori);
memset(p, 0, sizeof(IOR_param_t));
@ -235,16 +233,6 @@ void init_IOR_Param_t(IOR_param_t * p)
p->incompressibleSeed = 573;
p->testComm = mpi_comm_world;
hdfs_user = getenv("USER");
if (!hdfs_user)
hdfs_user = "";
p->hdfs_user = strdup(hdfs_user);
p->hdfs_name_node = "default";
p->hdfs_name_node_port = 0; /* ??? */
p->hdfs_fs = NULL;
p->hdfs_replicas = 0; /* invokes the default */
p->hdfs_block_size = 0;
p->URI = NULL;
}

View File

@ -160,14 +160,6 @@ typedef struct
int fsyncPerWrite; /* fsync() after each write */
int fsync; /* fsync() after write */
/* HDFS variables */
char * hdfs_user; /* copied from ENV, for now */
const char* hdfs_name_node;
tPort hdfs_name_node_port; /* (uint16_t) */
hdfsFS hdfs_fs; /* file-system handle */
int hdfs_replicas; /* n block replicas. (0 gets default) */
int hdfs_block_size; /* internal blk-size. (0 gets default) */
char* URI; /* "path" to target object */
/* RADOS variables */

18
testing/build-hdfs.sh Executable file
View File

@ -0,0 +1,18 @@
#!/bin/bash
mkdir build-hdfs
cd build-hdfs
VER=hadoop-3.2.1
if [[ ! -e $VER.tar.gz ]] ; then
wget https://www.apache.org/dyn/closer.cgi/hadoop/common/$VER/$VER.tar.gz
tar -xf $VER.tar.gz
fi
../configure --with-hdfs CFLAGS="-I$PWD/$VER/include/ -O0 -g3" LDFLAGS="-L$PWD/$VER/lib/native -Wl,-rpath=$PWD/$VER/lib/native"
make -j
echo "To run execute:"
echo export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
echo export CLASSPATH=$(find $VER/ -name "*.jar" -printf "%p:")
echo ./src/ior -a HDFS