diff --git a/README.multithreading b/README.multithreading new file mode 100644 index 0000000..77bfcec --- /dev/null +++ b/README.multithreading @@ -0,0 +1,108 @@ +Multithreading +============== +Libnfs supports multithreading using the pthread API. +By default libnfs is beuilt without pthread support to avoid the overhead +for locking, but pthread support, and thus support for multithreading, can be +configured using --enable-pthread + +$ ./configure --prefix=/usr --enable-examples --enable-pthread + +It is not supported to mix the eventdriven ASYNC interface with multithreading +thus once multithreading is enabled from the application you can not use +the async interface any more and must only use the multithread safe +SYNC API. + +The process to use multithreading from your application is: +1, nfs_init_context() to create an NFS context. This context defaults to + not (yet) be multithreading aware. +2, nfs_mount(...) to mount the NFS share. + +IF the share mounted successfully we can now turn on multithreading for all I/O +to the share: +3, nfs_mt_service_thread_start(nfs) + this starts a dedicated thread to manage all socket + I/O and queue management and also flags the context + to be multithreading ready. + If this returns successfully you can now start using + this context from separate threads in your + application. But remember, only the SYNC API + is supported from this point. +... use the nfs context from all your threads ... + +Once the application is finished and ready to terminate, first close all other +threads, or make sure that they will NOT perform any further I/O to the nfs +context. Wait until all in-flight I/O has stopped. + +Then call nfs_mt_service_thread_stop(nfs) to terminate the service thread and +disable the multithreading support. + + +See examples/nfs-pthreads-example.c for an example utility that +mounts a share, enables multithreading and then creates a number of worker +threads that share the same nfs context. +(In the example the threads just runs a busy loop calling nfs_stat64().) + + +nfs_get_error() +--------------- +Avoid using this function from the application. This string is hared across +all threads and we will need a new, possibly complex, API to replace it. +For example, how do we correctly match error strings that occur in the service +thread to the error string that is presented to a worker thread? + + +Porting +------- +The multithreading support is abstracted out into two separate files to make +it easier to add other threading APIs instead of pthreads. +include/libnfs-multithreading.h and lib/multithreading.c + +Multithreading requires two separate defines to be present. +The global one is HAVE_MULTITHREADING which will activate general +multithreading support in the library and the second one that enables a specific +implementation of threading. +HAVE_PTHREAD is defined when you want to use libpthread. +For other threading APIs you will need a new define HAVE_OTHER_API +to select it. + +include/libnfs-multithreading.h +------------------------------- +This file consists of two parts. +The first part is creating typedefs for a tread, a mutex and a semaphore. +When porting to a different threads API, add a new section that adds suitable +typedefs for thread/mutex/semaphore. + +The second part is the wrapper API for the libnfs threading abstractions and +should not need to be changed. + +lib/multithreading.c +-------------------- +This file contains the actual abstraction used for multithreading. +When porting to a different threads API you will need to create replacement +functions for : + +nfs_mt_service_thread() : This is the service thread that is responsible + for performing all socket I/O. +nfs_mt_service_thread_start() +nfs_mt_service_thread_stop() + : Functions to start/stop the service thread. + +nfs_mt_mutex_init() +nfs_mt_mutex_destroy() +nfs_mt_mutex_lock() +nfs_mt_mutex_unlock() : Wrappers that implement the 4 basic mutex calls. + +nfs_mt_sem_init() +nfs_mt_sem_destroy() +nfs_mt_sem_post() +nfs_mt_sem_wait() : Wrappers that implement the 4 basic semaphore calls. + + + +TODO +---- +* [rpc|nfs]_[set|get]_error() needs to be changed to use threads local storage +or a different API. +* Optimization: have the service thread just read the PDU from the socket and +add it to a new queue. Then have a pool of threads to read from this queue and +unmarshall the pdu's in concurrently. diff --git a/configure.ac b/configure.ac index a11f8a6..0566ff9 100755 --- a/configure.ac +++ b/configure.ac @@ -70,6 +70,26 @@ fi AC_SUBST(MAYBE_EXAMPLES) +#option: examples +AC_ARG_ENABLE([pthread], + [AC_HELP_STRING([--enable-pthread], + [Build with pthread multithreading support])], + [ENABLE_PTHREAD=$enableval], + [ENABLE_THREAD="no"]) +if test x$ENABLE_PTHREAD = xyes; then +# check for lpthread +AC_CACHE_CHECK([for pthread support],libnfs_cv_HAVE_PTHREAD,[ +AC_TRY_COMPILE([ +#include ], +[pthread_t thread1, thread2;], +libnfs_cv_HAVE_PTHREAD=yes,libnfs_cv_HAVE_PTHREAD=no)]) +if test x"$libnfs_cv_HAVE_PTHREAD" = x"yes"; then + AC_DEFINE(HAVE_PTHREAD,1,[Whether we have pthread support]) + AC_DEFINE(HAVE_MULTITHREADING,1,[Whether we have multithreading support]) +fi +fi +AM_CONDITIONAL([HAVE_PTHREAD], [test $libnfs_cv_HAVE_PTHREAD = yes]) + AC_MSG_CHECKING(whether SO_BINDTODEVICE is available) AC_TRY_COMPILE([#include ], [ int i = SO_BINDTODEVICE; diff --git a/examples/Makefile.am b/examples/Makefile.am index 4db9df3..0e08252 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -4,6 +4,11 @@ if HAVE_TALLOC_TEVENT noinst_PROGRAMS += nfs4-cat-talloc endif +if HAVE_PTHREAD +LIBS += -lpthread +noinst_PROGRAMS += nfs-pthreads-example +endif + AM_CPPFLAGS = \ -I$(abs_top_srcdir)/include \ -I$(abs_top_srcdir)/include/nfsc \ @@ -28,3 +33,4 @@ nfs4_cat_LDADD = $(COMMON_LIBS) -levent nfs4_cat_talloc_LDADD = $(COMMON_LIBS) -ltevent -ltalloc portmap_client_LDADD = $(COMMON_LIBS) portmap_server_LDADD = $(COMMON_LIBS) -levent +nfs_pthreads_example_LDADD = $(COMMON_LIBS) diff --git a/examples/nfs-pthreads-example.c b/examples/nfs-pthreads-example.c new file mode 100644 index 0000000..370bf63 --- /dev/null +++ b/examples/nfs-pthreads-example.c @@ -0,0 +1,205 @@ +/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */ +/* + Copyright (C) by Ronnie Sahlberg 2021 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . +*/ + +#define _FILE_OFFSET_BITS 64 +#define _GNU_SOURCE + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef AROS +#include "aros_compat.h" +#endif + + +#ifdef WIN32 +#include +#pragma comment(lib, "ws2_32.lib") +WSADATA wsaData; +#else +#include +#include +#endif + +#ifdef HAVE_POLL_H +#include +#endif + +#ifdef HAVE_UNISTD_H +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include "libnfs.h" +#include "libnfs-raw.h" +#include "libnfs-raw-nfs.h" + +void usage(void) +{ + fprintf(stderr, "Usage: nfs-pthread-example \n"); + fprintf(stderr, "\tExample program using pthreads.\n"); + exit(0); +} + +struct stat_data { + struct nfs_context *nfs; + int idx; + char *path; + int is_finished; +}; + +static void *nfs_stat_thread(void *arg) +{ + struct stat_data *sd = arg; + struct nfs_stat_64 st; + int i, ret; + + printf("Stat thread %03d\n", sd->idx); + i = 0; + while(!sd->is_finished) { + ret = nfs_stat64(sd->nfs, sd->path, &st); + if (ret < 0) { + printf("Stat failed: %s\n", nfs_get_error(sd->nfs)); + exit(10); + } + i++; + } + printf("%03d:%d ret:%d st->ino:%d\n", sd->idx, i, ret, (int)st.nfs_ino); + return NULL; +} + +int main(int argc, char *argv[]) +{ + int i, num_threads; + int ret = 0; + struct nfs_context *nfs = NULL; + struct nfsfh *nfsfh = NULL; + struct nfs_fh3 *fh3; + struct nfs_url *url = NULL; + pthread_t *stat_thread; + struct stat_data *sd; + +#ifdef WIN32 + if (WSAStartup(MAKEWORD(2,2), &wsaData) != 0) { + printf("Failed to start Winsock2\n"); + return 1; + } +#endif + +#ifdef AROS + aros_init_socket(); +#endif + + if (argc < 3) { + usage(); + } + + num_threads = atoi(argv[2]); + printf("Number of threads : %d\n", num_threads); + + nfs = nfs_init_context(); + if (nfs == NULL) { + fprintf(stderr, "failed to init context\n"); + goto finished; + } + + url = nfs_parse_url_full(nfs, argv[1]); + if (url == NULL) { + fprintf(stderr, "%s\n", nfs_get_error(nfs)); + ret = 1; + goto finished; + } + + if (nfs_mount(nfs, url->server, url->path) != 0) { + fprintf(stderr, "Failed to mount nfs share : %s\n", + nfs_get_error(nfs)); + ret = 1; + goto finished; + } + + /* + * Before we can use multithreading we must initialize and + * start the service thread. + */ + printf("Start the service thread\n"); + if (nfs_mt_service_thread_start(nfs)) { + printf("failed to start service thread\n"); + exit(10); + } + printf("Service thread is active. Ready to do I/O\n"); + + + printf("Start %d thread(s) calling stat on %s\n", num_threads, url->file); + if ((sd = malloc(sizeof(struct stat_data) * num_threads)) == NULL) { + printf("Failed to allocated stat_data\n"); + exit(10); + } + if ((stat_thread = malloc(sizeof(pthread_t) * num_threads)) == NULL) { + printf("Failed to allocated stat_thread\n"); + exit(10); + } + for (i = 0; i < num_threads; i++) { + sd[i].nfs = nfs; + sd[i].path = url->file; + sd[i].is_finished = 0; + sd[i].idx = i; + if (pthread_create(&stat_thread[i], NULL, + &nfs_stat_thread, &sd[i])) { + printf("Failed to create stat thread %d\n", i); + exit(10); + } + } + + + sleep(1); + /* + * Terminate all the worker threads + */ + printf("Closing all worker threads\n"); + for (i = 0; i < num_threads; i++) { + sd[i].is_finished = 1; + } + for (i = 0; i < num_threads; i++) { + pthread_join(stat_thread[i], NULL); + } + + printf("closing service thread\n"); + nfs_mt_service_thread_stop(nfs); + + finished: + if (nfsfh) { + nfs_close(nfs, nfsfh); + } + nfs_umount(nfs); + if (url) { + nfs_destroy_url(url); + } + if (nfs) { + nfs_destroy_context(nfs); + } + free(sd); + free(stat_thread); + return ret; + } diff --git a/include/libnfs-multithreading.h b/include/libnfs-multithreading.h new file mode 100644 index 0000000..f30a7ec --- /dev/null +++ b/include/libnfs-multithreading.h @@ -0,0 +1,56 @@ +/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */ +/* + Copyright (C) 2021 by Ronnie Sahlberg + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, see . +*/ + +#ifndef _LIBNFS_MULTITHREADING_H_ +#define _LIBNFS_MULTITHREADING_H_ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef HAVE_MULTITHREADING + +#ifdef HAVE_PTHREAD +#include +#include +typedef pthread_t libnfs_thread_t; +typedef pthread_mutex_t libnfs_mutex_t; +typedef sem_t libnfs_sem_t; +#endif /* HAVE_PTHREAD */ + +int nfs_mt_mutex_init(libnfs_mutex_t *mutex); +int nfs_mt_mutex_destroy(libnfs_mutex_t *mutex); +int nfs_mt_mutex_lock(libnfs_mutex_t *mutex); +int nfs_mt_mutex_unlock(libnfs_mutex_t *mutex); + +int nfs_mt_sem_init(libnfs_sem_t *sem, int value); +int nfs_mt_sem_destroy(libnfs_sem_t *sem); +int nfs_mt_sem_post(libnfs_sem_t *sem); +int nfs_mt_sem_wait(libnfs_sem_t *sem); + +#endif /* HAVE_MULTITHREADING */ + +#ifdef __cplusplus +} +#endif + +#endif /* !_LIBNFS_MULTITHREADING_H_ */ diff --git a/include/libnfs-private.h b/include/libnfs-private.h index 1cfd204..9ed9ea9 100644 --- a/include/libnfs-private.h +++ b/include/libnfs-private.h @@ -43,6 +43,7 @@ #define IFNAMSIZ 16 #endif +#include "libnfs-multithreading.h" #include "libnfs-zdr.h" #include "../nfs/libnfs-raw-nfs.h" #include "../nfs4/libnfs-raw-nfs4.h" @@ -131,6 +132,9 @@ struct rpc_context { struct sockaddr_storage udp_src; struct rpc_queue waitpdu[HASHES]; uint32_t waitpdu_len; +#ifdef HAVE_MULTITHREADING + libnfs_mutex_t rpc_mutex; +#endif /* HAVE_MULTITHREADING */ uint32_t inpos; char rm_buf[4]; @@ -306,6 +310,11 @@ struct nfs_context { verifier4 setclientid_confirm; uint32_t seqid; int has_lock_owner; +#ifdef HAVE_MULTITHREADING + int multithreading_enabled; + libnfs_mutex_t nfs_mutex; + libnfs_thread_t service_thread; +#endif /* HAVE_MULTITHREADING */ }; typedef int (*continue_func)(struct nfs_context *nfs, struct nfs_attr *attr, diff --git a/include/nfsc/libnfs.h b/include/nfsc/libnfs.h index f2fb319..202ff46 100755 --- a/include/nfsc/libnfs.h +++ b/include/nfsc/libnfs.h @@ -1991,6 +1991,20 @@ EXTERN void nfs4_set_client_name(struct nfs_context *nfs, const char *id); */ EXTERN void nfs4_set_verifier(struct nfs_context *nfs, const char *verifier); +/* + * MULTITHREADING + */ +/* + * This function starts a separate service thread for multithreading support. + * When multithreading is enabled the eventdriven async API is no longer + * supported and you can only use the synchronous API. + */ +EXTERN int nfs_mt_service_thread_start(struct nfs_context *nfs); +/* + * Shutdown multithreading support. + */ +EXTERN void nfs_mt_service_thread_stop(struct nfs_context *nfs); + #ifdef __cplusplus } #endif diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index fb98349..88ba7b8 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -2,6 +2,7 @@ set(SOURCES init.c libnfs.c libnfs-sync.c libnfs-zdr.c + multithreading.c nfs_v3.c nfs_v4.c pdu.c diff --git a/lib/Makefile.am b/lib/Makefile.am index b56d486..e9afedd 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -19,6 +19,7 @@ libnfs_la_SOURCES = \ libnfs.c \ libnfs-sync.c \ libnfs-zdr.c \ + multithreading.c \ nfs_v3.c \ nfs_v4.c \ pdu.c \ diff --git a/lib/init.c b/lib/init.c index a7d5fd1..bef4633 100644 --- a/lib/init.c +++ b/lib/init.c @@ -1,3 +1,4 @@ +/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */ /* Copyright (C) 2010 by Ronnie Sahlberg @@ -103,12 +104,21 @@ struct rpc_context *rpc_init_context(void) rpc->uid = getuid(); rpc->gid = getgid(); #endif +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ rpc_reset_queue(&rpc->outqueue); for (i = 0; i < HASHES; i++) rpc_reset_queue(&rpc->waitpdu[i]); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ /* Default is no timeout */ rpc->timeout = -1; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_init(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ return rpc; } @@ -131,6 +141,9 @@ struct rpc_context *rpc_init_server_context(int s) rpc->is_udp = rpc_is_udp_socket(rpc); rpc_reset_queue(&rpc->outqueue); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_init(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ return rpc; } @@ -249,10 +262,12 @@ void rpc_set_gid(struct rpc_context *rpc, int gid) { void rpc_set_error(struct rpc_context *rpc, const char *error_string, ...) { va_list ap; - char *old_error_string = rpc->error_string; - - assert(rpc->magic == RPC_CONTEXT_MAGIC); + char *old_error_string; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ + old_error_string = rpc->error_string; va_start(ap, error_string); rpc->error_string = malloc(1024); vsnprintf(rpc->error_string, 1024, error_string, ap); @@ -263,6 +278,9 @@ void rpc_set_error(struct rpc_context *rpc, const char *error_string, ...) if (old_error_string != NULL) { free(old_error_string); } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ } char *rpc_get_error(struct rpc_context *rpc) @@ -287,20 +305,33 @@ static void rpc_purge_all_pdus(struct rpc_context *rpc, int status, const char * * pdus when called. */ +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ outqueue = rpc->outqueue; rpc_reset_queue(&rpc->outqueue); while ((pdu = outqueue.head) != NULL) { outqueue.head = pdu->next; - pdu->next = NULL; + pdu->next = NULL; pdu->cb(rpc, status, (void *) error, pdu->private_data); rpc_free_pdu(rpc, pdu); } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ for (i = 0; i < HASHES; i++) { - struct rpc_queue waitqueue = rpc->waitpdu[i]; + struct rpc_queue waitqueue; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ + waitqueue = rpc->waitpdu[i]; rpc_reset_queue(&rpc->waitpdu[i]); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ while((pdu = waitqueue.head) != NULL) { waitqueue.head = pdu->next; pdu->next = NULL; @@ -310,8 +341,6 @@ static void rpc_purge_all_pdus(struct rpc_context *rpc, int status, const char * } assert(!rpc->outqueue.head); - for (i = 0; i < HASHES; i++) - assert(!rpc->waitpdu[i].head); } void rpc_error_all_pdus(struct rpc_context *rpc, const char *error) @@ -388,6 +417,9 @@ void rpc_destroy_context(struct rpc_context *rpc) rpc->inbuf = NULL; rpc->magic = 0; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_destroy(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ free(rpc); } diff --git a/lib/libnfs-sync.c b/lib/libnfs-sync.c index 6b48da7..0607c94 100644 --- a/lib/libnfs-sync.c +++ b/lib/libnfs-sync.c @@ -114,8 +114,55 @@ struct sync_cb_data { void *return_data; int return_int; const char *call; +#ifdef HAVE_MULTITHREADING + int has_sem; + libnfs_sem_t wait_sem; +#endif /* HAVE_MULTITHREADING */ }; +static inline int +nfs_init_cb_data(struct nfs_context *nfs, struct sync_cb_data *cb_data) +{ + cb_data->is_finished = 0; +#ifdef HAVE_MULTITHREADING + /* + * Create a semaphore and initialize it to zero. So that we + * can wait for it and immetiately block until the service thread + * has received the reply. + */ + if (nfs_mt_sem_init(&cb_data->wait_sem, 0)) { + if (nfs) { + nfs_set_error(nfs, "Failed to initialize semaphore"); + } + return -1; + } + cb_data->has_sem = 1; +#endif /* HAVE_MULTITHREADING */ + return 0; +} + +static inline void +nfs_destroy_cb_sem(struct sync_cb_data *cb_data) +{ +#ifdef HAVE_MULTITHREADING + if (cb_data->has_sem) { + nfs_mt_sem_destroy(&cb_data->wait_sem); + } + cb_data->has_sem = 0; +#endif +} + +static inline void +cb_data_is_finished(struct sync_cb_data *cb_data, int status) +{ + cb_data->is_finished = 1; + cb_data->status = status; +#ifdef HAVE_MULTITHREADING + if (cb_data->has_sem) { + nfs_mt_sem_post(&cb_data->wait_sem); + } +#endif +} static void wait_for_reply(struct rpc_context *rpc, struct sync_cb_data *cb_data) @@ -184,6 +231,12 @@ wait_for_nfs_reply(struct nfs_context *nfs, struct sync_cb_data *cb_data) int revents; int ret; +#ifdef HAVE_MULTITHREADING + if(nfs->multithreading_enabled) { + nfs_mt_sem_wait(&cb_data->wait_sem); + return; + } +#endif while (!cb_data->is_finished) { pfd.fd = nfs_get_fd(nfs); @@ -217,8 +270,7 @@ mount_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "%s: %s", @@ -235,15 +287,19 @@ nfs_mount(struct nfs_context *nfs, const char *server, const char *export) assert(rpc->magic == RPC_CONTEXT_MAGIC); - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_mount_async(nfs, server, export, mount_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_mount_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); /* Dont want any more callbacks even if the socket is closed */ rpc->connect_cb = NULL; @@ -268,8 +324,7 @@ umount_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "%s: %s", @@ -286,15 +341,19 @@ nfs_umount(struct nfs_context *nfs) assert(rpc->magic == RPC_CONTEXT_MAGIC); - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_umount_async(nfs, umount_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_umount_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); /* Dont want any more callbacks even if the socket is closed */ rpc->connect_cb = NULL; @@ -319,8 +378,7 @@ stat_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "stat call failed with \"%s\"", @@ -343,15 +401,19 @@ nfs_stat(struct nfs_context *nfs, const char *path, struct stat *st) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = st; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_stat_async(nfs, path, stat_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_stat_async failed"); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -361,8 +423,7 @@ stat64_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "stat call failed with \"%s\"", @@ -377,16 +438,20 @@ nfs_stat64(struct nfs_context *nfs, const char *path, struct nfs_stat_64 *st) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = st; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_stat64_async(nfs, path, stat64_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_stat64_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -396,16 +461,20 @@ nfs_lstat64(struct nfs_context *nfs, const char *path, struct nfs_stat_64 *st) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = st; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_lstat64_async(nfs, path, stat64_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_lstat64_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -419,8 +488,7 @@ open_cb(int status, struct nfs_context *nfs, void *data, void *private_data) struct sync_cb_data *cb_data = private_data; struct nfsfh *fh, **nfsfh; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "open call failed with \"%s\"", @@ -439,16 +507,20 @@ nfs_open(struct nfs_context *nfs, const char *path, int flags, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = nfsfh; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_open_async(nfs, path, flags, open_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_open_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -459,16 +531,20 @@ nfs_open2(struct nfs_context *nfs, const char *path, int flags, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = nfsfh; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_open2_async(nfs, path, flags, mode, open_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_open2_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -481,8 +557,7 @@ chdir_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "chdir call failed with \"%s\"", @@ -496,15 +571,19 @@ nfs_chdir(struct nfs_context *nfs, const char *path) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_chdir_async(nfs, path, chdir_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_chdir_async failed with %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -518,8 +597,8 @@ pread_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; char *buffer; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "%s call failed with \"%s\"", cb_data->call, @@ -537,18 +616,22 @@ nfs_pread(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offset, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = buffer; cb_data.call = "pread"; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_pread_async(nfs, nfsfh, offset, count, pread_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_pread_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -562,17 +645,21 @@ nfs_read(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t count, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = buffer; cb_data.call = "read"; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_read_async(nfs, nfsfh, count, pread_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_read_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -584,8 +671,8 @@ static void close_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "close call failed with \"%s\"", @@ -599,15 +686,19 @@ nfs_close(struct nfs_context *nfs, struct nfsfh *nfsfh) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_close_async(nfs, nfsfh, close_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_close_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -625,15 +716,19 @@ nfs_fstat(struct nfs_context *nfs, struct nfsfh *nfsfh, struct stat *st) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = st; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_fstat_async(nfs, nfsfh, stat_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_fstat_async failed"); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -647,16 +742,20 @@ nfs_fstat64(struct nfs_context *nfs, struct nfsfh *nfsfh, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = st; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_fstat64_async(nfs, nfsfh, stat64_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_fstat64_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -669,8 +768,8 @@ static void pwrite_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) nfs_set_error(nfs, "%s call failed with \"%s\"", @@ -683,17 +782,21 @@ nfs_pwrite(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offset, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.call = "pwrite"; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_pwrite_async(nfs, nfsfh, offset, count, buf, pwrite_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_pwrite_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -707,16 +810,20 @@ nfs_write(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t count, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.call = "write"; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_write_async(nfs, nfsfh, count, buf, pwrite_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_write_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -729,8 +836,8 @@ static void fsync_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "fsync call failed with \"%s\"", @@ -744,15 +851,19 @@ nfs_fsync(struct nfs_context *nfs, struct nfsfh *nfsfh) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_fsync_async(nfs, nfsfh, fsync_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_fsync_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -766,8 +877,8 @@ ftruncate_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "ftruncate call failed with \"%s\"", @@ -781,16 +892,20 @@ nfs_ftruncate(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t length) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_ftruncate_async(nfs, nfsfh, length, ftruncate_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_ftruncate_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -804,8 +919,8 @@ static void truncate_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "truncate call failed with \"%s\"", @@ -818,15 +933,19 @@ int nfs_truncate(struct nfs_context *nfs, const char *path, uint64_t length) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_truncate_async(nfs, path, length, truncate_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_ftruncate_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -839,8 +958,8 @@ static void mkdir_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "mkdir call failed with \"%s\"", @@ -854,15 +973,19 @@ nfs_mkdir(struct nfs_context *nfs, const char *path) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_mkdir_async(nfs, path, mkdir_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_mkdir_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -872,14 +995,18 @@ nfs_mkdir2(struct nfs_context *nfs, const char *path, int mode) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_mkdir2_async(nfs, path, mode, mkdir_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_mkdir2_async failed"); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -894,8 +1021,8 @@ static void rmdir_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "rmdir call failed with \"%s\"", @@ -908,16 +1035,20 @@ int nfs_rmdir(struct nfs_context *nfs, const char *path) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_rmdir_async(nfs, path, rmdir_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_rmdir_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); - + nfs_destroy_cb_sem(&cb_data); + return cb_data.status; } @@ -932,8 +1063,7 @@ creat_cb(int status, struct nfs_context *nfs, void *data, void *private_data) struct sync_cb_data *cb_data = private_data; struct nfsfh *fh, **nfsfh; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "creat call failed with \"%s\"", @@ -952,16 +1082,20 @@ nfs_create(struct nfs_context *nfs, const char *path, int flags, int mode, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = nfsfh; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_create_async(nfs, path, flags, mode, creat_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_create_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -981,8 +1115,7 @@ mknod_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "mknod call failed with \"%s\"", @@ -996,15 +1129,19 @@ nfs_mknod(struct nfs_context *nfs, const char *path, int mode, int dev) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_mknod_async(nfs, path, mode, dev, mknod_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_creat_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1018,8 +1155,7 @@ unlink_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "unlink call failed with \"%s\"", @@ -1033,15 +1169,19 @@ nfs_unlink(struct nfs_context *nfs, const char *path) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_unlink_async(nfs, path, unlink_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_unlink_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1057,8 +1197,7 @@ opendir_cb(int status, struct nfs_context *nfs, void *data, void *private_data) struct sync_cb_data *cb_data = private_data; struct nfsdir *dir, **nfsdir; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "opendir call failed with \"%s\"", @@ -1076,16 +1215,20 @@ nfs_opendir(struct nfs_context *nfs, const char *path, struct nfsdir **nfsdir) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = nfsdir; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_opendir_async(nfs, path, opendir_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_opendir_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1099,8 +1242,7 @@ lseek_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "lseek call failed with \"%s\"", @@ -1118,17 +1260,21 @@ nfs_lseek(struct nfs_context *nfs, struct nfsfh *nfsfh, int64_t offset, int when { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = current_offset; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_lseek_async(nfs, nfsfh, offset, whence, lseek_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_lseek_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1142,8 +1288,7 @@ lockf_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "lockf call failed with \"%s\"", @@ -1158,16 +1303,20 @@ nfs_lockf(struct nfs_context *nfs, struct nfsfh *nfsfh, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_lockf_async(nfs, nfsfh, cmd, count, lockf_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_lockf_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1180,8 +1329,7 @@ fcntl_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "fcntl call failed with \"%s\"", @@ -1196,16 +1344,20 @@ nfs_fcntl(struct nfs_context *nfs, struct nfsfh *nfsfh, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_fcntl_async(nfs, nfsfh, cmd, arg, fcntl_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_fcntl_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1218,8 +1370,7 @@ statvfs_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "statvfs call failed with \"%s\"", @@ -1235,16 +1386,20 @@ nfs_statvfs(struct nfs_context *nfs, const char *path, struct statvfs *svfs) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = svfs; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_statvfs_async(nfs, path, statvfs_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_statvfs_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1258,8 +1413,7 @@ statvfs64_cb(int status, struct nfs_context *nfs, void *data, { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "statvfs64 call failed with \"%s\"", @@ -1276,16 +1430,20 @@ nfs_statvfs64(struct nfs_context *nfs, const char *path, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = svfs; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_statvfs64_async(nfs, path, statvfs64_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_statvfs64_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1298,8 +1456,7 @@ readlink_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "readlink call failed with \"%s\"", @@ -1321,17 +1478,21 @@ nfs_readlink(struct nfs_context *nfs, const char *path, char *buf, int bufsize) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = buf; cb_data.return_int = bufsize; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_readlink_async(nfs, path, readlink_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_readlink_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1343,8 +1504,7 @@ readlink2_cb(int status, struct nfs_context *nfs, void *data, void *private_data char **bufptr; char *buf; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "readlink call failed with \"%s\"", @@ -1369,15 +1529,19 @@ nfs_readlink2(struct nfs_context *nfs, const char *path, char **bufptr) struct sync_cb_data cb_data; *bufptr = NULL; - cb_data.is_finished = 0; cb_data.return_data = bufptr; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_readlink_async(nfs, path, readlink2_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_readlink_async failed"); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1392,8 +1556,7 @@ chmod_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "chmod call failed with \"%s\"", @@ -1407,15 +1570,19 @@ nfs_chmod(struct nfs_context *nfs, const char *path, int mode) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_chmod_async(nfs, path, mode, chmod_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_chmod_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1425,15 +1592,19 @@ nfs_lchmod(struct nfs_context *nfs, const char *path, int mode) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_lchmod_async(nfs, path, mode, chmod_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_lchmod_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1449,8 +1620,7 @@ fchmod_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "fchmod call failed with \"%s\"", @@ -1464,15 +1634,19 @@ nfs_fchmod(struct nfs_context *nfs, struct nfsfh *nfsfh, int mode) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_fchmod_async(nfs, nfsfh, mode, fchmod_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_fchmod_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1488,8 +1662,7 @@ chown_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "chown call failed with \"%s\"", @@ -1503,15 +1676,19 @@ nfs_chown(struct nfs_context *nfs, const char *path, int uid, int gid) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_chown_async(nfs, path, uid, gid, chown_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_chown_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1524,15 +1701,19 @@ nfs_lchown(struct nfs_context *nfs, const char *path, int uid, int gid) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_lchown_async(nfs, path, uid, gid, chown_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_lchown_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1545,8 +1726,7 @@ fchown_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "fchown call failed with \"%s\"", @@ -1560,15 +1740,19 @@ nfs_fchown(struct nfs_context *nfs, struct nfsfh *nfsfh, int uid, int gid) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_fchown_async(nfs, nfsfh, uid, gid, fchown_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_fchown_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1583,8 +1767,7 @@ utimes_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "utimes call failed with \"%s\"", @@ -1598,15 +1781,19 @@ nfs_utimes(struct nfs_context *nfs, const char *path, struct timeval *times) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_utimes_async(nfs, path, times, utimes_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_utimes_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1616,15 +1803,19 @@ nfs_lutimes(struct nfs_context *nfs, const char *path, struct timeval *times) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_lutimes_async(nfs, path, times, utimes_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_lutimes_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1639,8 +1830,7 @@ utime_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "utime call failed with \"%s\"", @@ -1654,14 +1844,18 @@ nfs_utime(struct nfs_context *nfs, const char *path, struct utimbuf *times) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_utime_async(nfs, path, times, utime_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_utimes_async failed"); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1675,8 +1869,7 @@ access_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "access call failed with \"%s\"", @@ -1690,15 +1883,19 @@ nfs_access(struct nfs_context *nfs, const char *path, int mode) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_access_async(nfs, path, mode, access_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_access_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1713,8 +1910,7 @@ access2_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "access2 call failed with \"%s\"", @@ -1728,15 +1924,19 @@ nfs_access2(struct nfs_context *nfs, const char *path) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_access2_async(nfs, path, access2_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_access2_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1751,8 +1951,7 @@ symlink_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "symlink call failed with \"%s\"", @@ -1766,16 +1965,20 @@ nfs_symlink(struct nfs_context *nfs, const char *target, const char *linkname) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_symlink_async(nfs, target, linkname, symlink_cb, &cb_data) != 0) { - nfs_set_error(nfs, "nfs_symlink_async failed: %s", - nfs_get_error(nfs)); + nfs_set_error(nfs, "nfs_symlink_async failed: %s", + nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1789,8 +1992,7 @@ rename_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "rename call failed with \"%s\"", @@ -1804,15 +2006,19 @@ nfs_rename(struct nfs_context *nfs, const char *oldpath, const char *newpath) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_rename_async(nfs, oldpath, newpath, rename_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_rename_async failed: %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1827,8 +2033,7 @@ link_cb(int status, struct nfs_context *nfs, void *data, void *private_data) { struct sync_cb_data *cb_data = private_data; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "link call failed with \"%s\"", @@ -1842,15 +2047,19 @@ nfs_link(struct nfs_context *nfs, const char *oldpath, const char *newpath) { struct sync_cb_data cb_data; - cb_data.is_finished = 0; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs_link_async(nfs, oldpath, newpath, link_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_link_async failed: %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1877,8 +2086,7 @@ nfs4_getacl_cb(int status, struct nfs_context *nfs, void *data, void *private_da fattr4_acl *dst = cb_data->return_data; int i; - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); if (status < 0) { nfs_set_error(nfs, "getacl call failed with \"%s\"", @@ -1916,16 +2124,20 @@ nfs4_getacl(struct nfs_context *nfs, struct nfsfh *nfsfh, { struct sync_cb_data cb_data; - cb_data.is_finished = 0; cb_data.return_data = acl; + if (nfs_init_cb_data(nfs, &cb_data)) { + return -1; + } if (nfs4_getacl_async(nfs, nfsfh, nfs4_getacl_cb, &cb_data) != 0) { nfs_set_error(nfs, "nfs_getacl_async failed. %s", nfs_get_error(nfs)); + nfs_destroy_cb_sem(&cb_data); return -1; } wait_for_nfs_reply(nfs, &cb_data); + nfs_destroy_cb_sem(&cb_data); return cb_data.status; } @@ -1939,8 +2151,7 @@ mount_getexports_cb(struct rpc_context *mount_context, int status, void *data, assert(mount_context->magic == RPC_CONTEXT_MAGIC); - cb_data->is_finished = 1; - cb_data->status = status; + cb_data_is_finished(cb_data, status); cb_data->return_data = NULL; if (status != 0) { @@ -1971,18 +2182,22 @@ mount_getexports_timeout(const char *server, int timeout) struct rpc_context *rpc; - cb_data.is_finished = 0; cb_data.return_data = NULL; + if (nfs_init_cb_data(NULL, &cb_data)) { + return NULL; + } rpc = rpc_init_context(); rpc_set_timeout(rpc, timeout); if (mount_getexports_async(rpc, server, mount_getexports_cb, &cb_data) != 0) { rpc_destroy_context(rpc); + nfs_destroy_cb_sem(&cb_data); return NULL; } wait_for_reply(rpc, &cb_data); + nfs_destroy_cb_sem(&cb_data); rpc_destroy_context(rpc); return cb_data.return_data; diff --git a/lib/libnfs-win32.def b/lib/libnfs-win32.def index 341eef0..e3198f2 100644 --- a/lib/libnfs-win32.def +++ b/lib/libnfs-win32.def @@ -52,6 +52,8 @@ nfs_mknod nfs_mknod_async nfs_mount nfs_mount_async +nfs_mt_service_thread_start +nfs_mt_service_thread_stop nfs_open nfs_open_async nfs_open2 diff --git a/lib/libnfs.c b/lib/libnfs.c index 93bb4a2..a8009fd 100755 --- a/lib/libnfs.c +++ b/lib/libnfs.c @@ -542,6 +542,9 @@ nfs_init_context(void) (int)time(NULL)); nfs4_set_client_name(nfs, client_name); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_init(&nfs->nfs_mutex); +#endif /* HAVE_MULTITHREADING */ return nfs; } @@ -594,6 +597,9 @@ nfs_destroy_context(struct nfs_context *nfs) nfs_free_nfsdir(nfsdir); } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_destroy(&nfs->nfs_mutex); +#endif /* HAVE_MULTITHREADING */ free(nfs); } @@ -1917,6 +1923,9 @@ nfs_set_error(struct nfs_context *nfs, char *error_string, ...) va_list ap; char *str = NULL; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&nfs->rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ va_start(ap, error_string); str = malloc(1024); vsnprintf(str, 1024, error_string, ap); @@ -1925,6 +1934,9 @@ nfs_set_error(struct nfs_context *nfs, char *error_string, ...) } nfs->rpc->error_string = str; va_end(ap); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&nfs->rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ } struct mount_cb_data { diff --git a/lib/multithreading.c b/lib/multithreading.c new file mode 100644 index 0000000..2a270df --- /dev/null +++ b/lib/multithreading.c @@ -0,0 +1,152 @@ +/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */ +/* + Copyright (C) 2010 by Ronnie Sahlberg + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, see . +*/ +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef AROS +#include "aros_compat.h" +#endif + +#ifdef PS2_EE +#include "ps2_compat.h" +#endif + +#ifdef PS3_PPU +#include "ps3_compat.h" +#endif + +#ifdef WIN32 +#include +#endif + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#ifdef HAVE_POLL_H +#include +#endif + +#ifdef HAVE_SYS_TIME_H +#include +#endif + +#include "libnfs.h" +#include "libnfs-raw.h" +#include "libnfs-private.h" + +#ifdef HAVE_MULTITHREADING + +#ifdef HAVE_PTHREAD +static void *nfs_mt_service_thread(void *arg) +{ + struct nfs_context *nfs = (struct nfs_context *)arg; + struct pollfd pfd; + int revents; + int ret; + + nfs->multithreading_enabled = 1; + + while (nfs->multithreading_enabled) { + pfd.fd = nfs_get_fd(nfs); + pfd.events = nfs_which_events(nfs); + pfd.revents = 0; + + //qqq ~12737 iterations with busy loop + ret = poll(&pfd, 1, 0); + if (ret < 0) { + nfs_set_error(nfs, "Poll failed"); + revents = -1; + } else { + revents = pfd.revents; + } + if (nfs_service(nfs, revents) < 0) { + if (revents != -1) + nfs_set_error(nfs, "nfs_service failed"); + } + } + return NULL; +} + +int nfs_mt_service_thread_start(struct nfs_context *nfs) +{ + if (pthread_create(&nfs->service_thread, NULL, + &nfs_mt_service_thread, nfs)) { + nfs_set_error(nfs, "Failed to start service thread"); + return -1; + } + while (nfs->multithreading_enabled == 0) { + struct timespec ts = {0, 1000000}; + nanosleep(&ts, NULL); + } + return 0; +} + +void nfs_mt_service_thread_stop(struct nfs_context *nfs) +{ + nfs->multithreading_enabled = 0; + pthread_join(nfs->service_thread, NULL); +} + +int nfs_mt_mutex_init(libnfs_mutex_t *mutex) +{ + pthread_mutex_init(mutex, NULL); + return 0; +} + +int nfs_mt_mutex_destroy(libnfs_mutex_t *mutex) +{ + pthread_mutex_destroy(mutex); + return 0; +} + +int nfs_mt_mutex_lock(libnfs_mutex_t *mutex) +{ + return pthread_mutex_lock(mutex); +} + +int nfs_mt_mutex_unlock(libnfs_mutex_t *mutex) +{ + return pthread_mutex_unlock(mutex); +} + +int nfs_mt_sem_init(libnfs_sem_t *sem, int value) +{ + return sem_init(sem, 0, value); +} + +int nfs_mt_sem_destroy(libnfs_sem_t *sem) +{ + return sem_destroy(sem); +} + +int nfs_mt_sem_post(libnfs_sem_t *sem) +{ + return sem_post(sem); +} + +int nfs_mt_sem_wait(libnfs_sem_t *sem) +{ + return sem_wait(sem); +} + +#endif /* HAVE_PTHREAD */ + +#endif /* HAVE_MULTITHREADING */ + diff --git a/lib/pdu.c b/lib/pdu.c index 4f587ee..5388a3f 100644 --- a/lib/pdu.c +++ b/lib/pdu.c @@ -160,7 +160,13 @@ struct rpc_pdu *rpc_allocate_pdu2(struct rpc_context *rpc, int program, int vers return NULL; } memset(pdu, 0, pdu_size); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ pdu->xid = rpc->xid++; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ pdu->cb = cb; pdu->private_data = private_data; pdu->zdr_decode_fn = zdr_decode_fn; @@ -222,7 +228,13 @@ void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) void rpc_set_next_xid(struct rpc_context *rpc, uint32_t xid) { +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ rpc->xid = xid; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ } int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) @@ -263,8 +275,14 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) } hash = rpc_hash_xid(pdu->xid); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ rpc_enqueue(&rpc->waitpdu[hash], pdu); rpc->waitpdu_len++; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ return 0; } @@ -274,7 +292,13 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) zdr_int(&pdu->zdr, &recordmarker); pdu->outdata.size = size; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ rpc_enqueue(&rpc->outqueue, pdu); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ return 0; } @@ -560,6 +584,9 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) /* Look up the transaction in a hash table of our requests */ hash = rpc_hash_xid(xid); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ q = &rpc->waitpdu[hash]; /* Follow the hash chain. Linear traverse singly-linked list, @@ -580,6 +607,9 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) prev_pdu->next = pdu->next; rpc->waitpdu_len--; } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ if (rpc_process_reply(rpc, pdu, &zdr) != 0) { rpc_set_error(rpc, "rpc_procdess_reply failed"); } @@ -592,6 +622,9 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) } return 0; } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ zdr_destroy(&zdr); if (reasbuf != NULL) { diff --git a/lib/socket.c b/lib/socket.c index ac1d004..07185d5 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -214,9 +214,15 @@ rpc_which_events(struct rpc_context *rpc) return POLLIN; } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ if (rpc_has_queue(&rpc->outqueue)) { events |= POLLOUT; } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ return events; } @@ -225,7 +231,8 @@ rpc_write_to_socket(struct rpc_context *rpc) { int32_t count; struct rpc_pdu *pdu; - + int ret = 0; + assert(rpc->magic == RPC_CONTEXT_MAGIC); if (rpc->fd == -1) { @@ -233,6 +240,9 @@ rpc_write_to_socket(struct rpc_context *rpc) return -1; } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ while ((pdu = rpc->outqueue.head) != NULL) { int64_t total; @@ -242,11 +252,13 @@ rpc_write_to_socket(struct rpc_context *rpc) (int)(total - pdu->written), MSG_NOSIGNAL); if (count == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - return 0; + ret = 0; + goto finished; } rpc_set_error(rpc, "Error when writing to socket :%s" "(%d)", strerror(errno), errno); - return -1; + ret = -1; + goto finished; } pdu->written += count; @@ -259,7 +271,8 @@ rpc_write_to_socket(struct rpc_context *rpc) if (pdu->flags & PDU_DISCARD_AFTER_SENDING) { rpc_free_pdu(rpc, pdu); - return 0; + ret = 0; + goto finished; } hash = rpc_hash_xid(pdu->xid); @@ -267,7 +280,12 @@ rpc_write_to_socket(struct rpc_context *rpc) rpc->waitpdu_len++; } } - return 0; + + finished: +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ + return ret; } #define MAX_UDP_SIZE 65536 @@ -402,6 +420,9 @@ rpc_timeout_scan(struct rpc_context *rpc) uint64_t t = rpc_current_time(); unsigned int i; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ for (pdu = rpc->outqueue.head; pdu; pdu = next_pdu) { next_pdu = pdu->next; @@ -415,7 +436,7 @@ rpc_timeout_scan(struct rpc_context *rpc) } LIBNFS_LIST_REMOVE(&rpc->outqueue.head, pdu); if (!rpc->outqueue.head) { - rpc->outqueue.tail = NULL; + rpc->outqueue.tail = NULL; //done } rpc_set_error(rpc, "command timed out"); pdu->cb(rpc, RPC_STATUS_TIMEOUT, @@ -423,8 +444,9 @@ rpc_timeout_scan(struct rpc_context *rpc) rpc_free_pdu(rpc, pdu); } for (i = 0; i < HASHES; i++) { - struct rpc_queue *q = &rpc->waitpdu[i]; + struct rpc_queue *q; + q = &rpc->waitpdu[i]; for (pdu = q->head; pdu; pdu = next_pdu) { next_pdu = pdu->next; @@ -440,12 +462,17 @@ rpc_timeout_scan(struct rpc_context *rpc) if (!q->head) { q->tail = NULL; } + // qqq move to a temporary queue and process after + // we drop the mutex rpc_set_error(rpc, "command timed out"); pdu->cb(rpc, RPC_STATUS_TIMEOUT, NULL, pdu->private_data); rpc_free_pdu(rpc, pdu); } } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ } int @@ -865,6 +892,10 @@ rpc_reconnect_requeue(struct rpc_context *rpc) * in flight. Move them all over from the waitpdu queue back to the * out queue. */ + printf("reconnect reset waitpdu queues\n"); +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ for (i = 0; i < HASHES; i++) { struct rpc_queue *q = &rpc->waitpdu[i]; for (pdu = q->head; pdu; pdu = next) { @@ -876,6 +907,9 @@ rpc_reconnect_requeue(struct rpc_context *rpc) rpc_reset_queue(q); } rpc->waitpdu_len = 0; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ if (rpc->auto_reconnect < 0 || rpc->num_retries > 0) { rpc->num_retries--; @@ -997,7 +1031,13 @@ rpc_queue_length(struct rpc_context *rpc) i++; } +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_lock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ i += rpc->waitpdu_len; +#ifdef HAVE_MULTITHREADING + nfs_mt_mutex_unlock(&rpc->rpc_mutex); +#endif /* HAVE_MULTITHREADING */ return i; } diff --git a/utils/Makefile.am b/utils/Makefile.am index be472e1..2ae7636 100644 --- a/utils/Makefile.am +++ b/utils/Makefile.am @@ -1,5 +1,9 @@ bin_PROGRAMS = nfs-cat nfs-ls +if HAVE_PTHREAD +LIBS += -lpthread +endif + if !HAVE_WIN32 bin_PROGRAMS += nfs-cp nfs-stat endif