initial pthread support

Signed-off-by: Ronnie Sahlberg <ronniesahlberg@gmail.com>
master
Ronnie Sahlberg 2021-10-05 20:24:55 +10:00
parent 81f4609076
commit f55637619e
17 changed files with 1051 additions and 141 deletions

108
README.multithreading Normal file
View File

@ -0,0 +1,108 @@
Multithreading
==============
Libnfs supports multithreading using the pthread API.
By default libnfs is beuilt without pthread support to avoid the overhead
for locking, but pthread support, and thus support for multithreading, can be
configured using --enable-pthread
$ ./configure --prefix=/usr --enable-examples --enable-pthread
It is not supported to mix the eventdriven ASYNC interface with multithreading
thus once multithreading is enabled from the application you can not use
the async interface any more and must only use the multithread safe
SYNC API.
The process to use multithreading from your application is:
1, nfs_init_context() to create an NFS context. This context defaults to
not (yet) be multithreading aware.
2, nfs_mount(...) to mount the NFS share.
IF the share mounted successfully we can now turn on multithreading for all I/O
to the share:
3, nfs_mt_service_thread_start(nfs)
this starts a dedicated thread to manage all socket
I/O and queue management and also flags the context
to be multithreading ready.
If this returns successfully you can now start using
this context from separate threads in your
application. But remember, only the SYNC API
is supported from this point.
... use the nfs context from all your threads ...
Once the application is finished and ready to terminate, first close all other
threads, or make sure that they will NOT perform any further I/O to the nfs
context. Wait until all in-flight I/O has stopped.
Then call nfs_mt_service_thread_stop(nfs) to terminate the service thread and
disable the multithreading support.
See examples/nfs-pthreads-example.c for an example utility that
mounts a share, enables multithreading and then creates a number of worker
threads that share the same nfs context.
(In the example the threads just runs a busy loop calling nfs_stat64().)
nfs_get_error()
---------------
Avoid using this function from the application. This string is hared across
all threads and we will need a new, possibly complex, API to replace it.
For example, how do we correctly match error strings that occur in the service
thread to the error string that is presented to a worker thread?
Porting
-------
The multithreading support is abstracted out into two separate files to make
it easier to add other threading APIs instead of pthreads.
include/libnfs-multithreading.h and lib/multithreading.c
Multithreading requires two separate defines to be present.
The global one is HAVE_MULTITHREADING which will activate general
multithreading support in the library and the second one that enables a specific
implementation of threading.
HAVE_PTHREAD is defined when you want to use libpthread.
For other threading APIs you will need a new define HAVE_OTHER_API
to select it.
include/libnfs-multithreading.h
-------------------------------
This file consists of two parts.
The first part is creating typedefs for a tread, a mutex and a semaphore.
When porting to a different threads API, add a new section that adds suitable
typedefs for thread/mutex/semaphore.
The second part is the wrapper API for the libnfs threading abstractions and
should not need to be changed.
lib/multithreading.c
--------------------
This file contains the actual abstraction used for multithreading.
When porting to a different threads API you will need to create replacement
functions for :
nfs_mt_service_thread() : This is the service thread that is responsible
for performing all socket I/O.
nfs_mt_service_thread_start()
nfs_mt_service_thread_stop()
: Functions to start/stop the service thread.
nfs_mt_mutex_init()
nfs_mt_mutex_destroy()
nfs_mt_mutex_lock()
nfs_mt_mutex_unlock() : Wrappers that implement the 4 basic mutex calls.
nfs_mt_sem_init()
nfs_mt_sem_destroy()
nfs_mt_sem_post()
nfs_mt_sem_wait() : Wrappers that implement the 4 basic semaphore calls.
TODO
----
* [rpc|nfs]_[set|get]_error() needs to be changed to use threads local storage
or a different API.
* Optimization: have the service thread just read the PDU from the socket and
add it to a new queue. Then have a pool of threads to read from this queue and
unmarshall the pdu's in concurrently.

View File

@ -70,6 +70,26 @@ fi
AC_SUBST(MAYBE_EXAMPLES)
#option: examples
AC_ARG_ENABLE([pthread],
[AC_HELP_STRING([--enable-pthread],
[Build with pthread multithreading support])],
[ENABLE_PTHREAD=$enableval],
[ENABLE_THREAD="no"])
if test x$ENABLE_PTHREAD = xyes; then
# check for lpthread
AC_CACHE_CHECK([for pthread support],libnfs_cv_HAVE_PTHREAD,[
AC_TRY_COMPILE([
#include <pthread.h>],
[pthread_t thread1, thread2;],
libnfs_cv_HAVE_PTHREAD=yes,libnfs_cv_HAVE_PTHREAD=no)])
if test x"$libnfs_cv_HAVE_PTHREAD" = x"yes"; then
AC_DEFINE(HAVE_PTHREAD,1,[Whether we have pthread support])
AC_DEFINE(HAVE_MULTITHREADING,1,[Whether we have multithreading support])
fi
fi
AM_CONDITIONAL([HAVE_PTHREAD], [test $libnfs_cv_HAVE_PTHREAD = yes])
AC_MSG_CHECKING(whether SO_BINDTODEVICE is available)
AC_TRY_COMPILE([#include <net/if.h>], [
int i = SO_BINDTODEVICE;

View File

@ -4,6 +4,11 @@ if HAVE_TALLOC_TEVENT
noinst_PROGRAMS += nfs4-cat-talloc
endif
if HAVE_PTHREAD
LIBS += -lpthread
noinst_PROGRAMS += nfs-pthreads-example
endif
AM_CPPFLAGS = \
-I$(abs_top_srcdir)/include \
-I$(abs_top_srcdir)/include/nfsc \
@ -28,3 +33,4 @@ nfs4_cat_LDADD = $(COMMON_LIBS) -levent
nfs4_cat_talloc_LDADD = $(COMMON_LIBS) -ltevent -ltalloc
portmap_client_LDADD = $(COMMON_LIBS)
portmap_server_LDADD = $(COMMON_LIBS) -levent
nfs_pthreads_example_LDADD = $(COMMON_LIBS)

View File

@ -0,0 +1,205 @@
/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */
/*
Copyright (C) by Ronnie Sahlberg <ronniesahlberg@gmail.com> 2021
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#define _FILE_OFFSET_BITS 64
#define _GNU_SOURCE
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef AROS
#include "aros_compat.h"
#endif
#ifdef WIN32
#include <win32/win32_compat.h>
#pragma comment(lib, "ws2_32.lib")
WSADATA wsaData;
#else
#include <sys/stat.h>
#include <string.h>
#endif
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/types.h>
#include <fcntl.h>
#include <time.h>
#include <pthread.h>
#include "libnfs.h"
#include "libnfs-raw.h"
#include "libnfs-raw-nfs.h"
void usage(void)
{
fprintf(stderr, "Usage: nfs-pthread-example <url> <num-threads>\n");
fprintf(stderr, "\tExample program using pthreads.\n");
exit(0);
}
struct stat_data {
struct nfs_context *nfs;
int idx;
char *path;
int is_finished;
};
static void *nfs_stat_thread(void *arg)
{
struct stat_data *sd = arg;
struct nfs_stat_64 st;
int i, ret;
printf("Stat thread %03d\n", sd->idx);
i = 0;
while(!sd->is_finished) {
ret = nfs_stat64(sd->nfs, sd->path, &st);
if (ret < 0) {
printf("Stat failed: %s\n", nfs_get_error(sd->nfs));
exit(10);
}
i++;
}
printf("%03d:%d ret:%d st->ino:%d\n", sd->idx, i, ret, (int)st.nfs_ino);
return NULL;
}
int main(int argc, char *argv[])
{
int i, num_threads;
int ret = 0;
struct nfs_context *nfs = NULL;
struct nfsfh *nfsfh = NULL;
struct nfs_fh3 *fh3;
struct nfs_url *url = NULL;
pthread_t *stat_thread;
struct stat_data *sd;
#ifdef WIN32
if (WSAStartup(MAKEWORD(2,2), &wsaData) != 0) {
printf("Failed to start Winsock2\n");
return 1;
}
#endif
#ifdef AROS
aros_init_socket();
#endif
if (argc < 3) {
usage();
}
num_threads = atoi(argv[2]);
printf("Number of threads : %d\n", num_threads);
nfs = nfs_init_context();
if (nfs == NULL) {
fprintf(stderr, "failed to init context\n");
goto finished;
}
url = nfs_parse_url_full(nfs, argv[1]);
if (url == NULL) {
fprintf(stderr, "%s\n", nfs_get_error(nfs));
ret = 1;
goto finished;
}
if (nfs_mount(nfs, url->server, url->path) != 0) {
fprintf(stderr, "Failed to mount nfs share : %s\n",
nfs_get_error(nfs));
ret = 1;
goto finished;
}
/*
* Before we can use multithreading we must initialize and
* start the service thread.
*/
printf("Start the service thread\n");
if (nfs_mt_service_thread_start(nfs)) {
printf("failed to start service thread\n");
exit(10);
}
printf("Service thread is active. Ready to do I/O\n");
printf("Start %d thread(s) calling stat on %s\n", num_threads, url->file);
if ((sd = malloc(sizeof(struct stat_data) * num_threads)) == NULL) {
printf("Failed to allocated stat_data\n");
exit(10);
}
if ((stat_thread = malloc(sizeof(pthread_t) * num_threads)) == NULL) {
printf("Failed to allocated stat_thread\n");
exit(10);
}
for (i = 0; i < num_threads; i++) {
sd[i].nfs = nfs;
sd[i].path = url->file;
sd[i].is_finished = 0;
sd[i].idx = i;
if (pthread_create(&stat_thread[i], NULL,
&nfs_stat_thread, &sd[i])) {
printf("Failed to create stat thread %d\n", i);
exit(10);
}
}
sleep(1);
/*
* Terminate all the worker threads
*/
printf("Closing all worker threads\n");
for (i = 0; i < num_threads; i++) {
sd[i].is_finished = 1;
}
for (i = 0; i < num_threads; i++) {
pthread_join(stat_thread[i], NULL);
}
printf("closing service thread\n");
nfs_mt_service_thread_stop(nfs);
finished:
if (nfsfh) {
nfs_close(nfs, nfsfh);
}
nfs_umount(nfs);
if (url) {
nfs_destroy_url(url);
}
if (nfs) {
nfs_destroy_context(nfs);
}
free(sd);
free(stat_thread);
return ret;
}

View File

@ -0,0 +1,56 @@
/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */
/*
Copyright (C) 2021 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _LIBNFS_MULTITHREADING_H_
#define _LIBNFS_MULTITHREADING_H_
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef __cplusplus
extern "C" {
#endif
#ifdef HAVE_MULTITHREADING
#ifdef HAVE_PTHREAD
#include <semaphore.h>
#include <pthread.h>
typedef pthread_t libnfs_thread_t;
typedef pthread_mutex_t libnfs_mutex_t;
typedef sem_t libnfs_sem_t;
#endif /* HAVE_PTHREAD */
int nfs_mt_mutex_init(libnfs_mutex_t *mutex);
int nfs_mt_mutex_destroy(libnfs_mutex_t *mutex);
int nfs_mt_mutex_lock(libnfs_mutex_t *mutex);
int nfs_mt_mutex_unlock(libnfs_mutex_t *mutex);
int nfs_mt_sem_init(libnfs_sem_t *sem, int value);
int nfs_mt_sem_destroy(libnfs_sem_t *sem);
int nfs_mt_sem_post(libnfs_sem_t *sem);
int nfs_mt_sem_wait(libnfs_sem_t *sem);
#endif /* HAVE_MULTITHREADING */
#ifdef __cplusplus
}
#endif
#endif /* !_LIBNFS_MULTITHREADING_H_ */

View File

@ -43,6 +43,7 @@
#define IFNAMSIZ 16
#endif
#include "libnfs-multithreading.h"
#include "libnfs-zdr.h"
#include "../nfs/libnfs-raw-nfs.h"
#include "../nfs4/libnfs-raw-nfs4.h"
@ -131,6 +132,9 @@ struct rpc_context {
struct sockaddr_storage udp_src;
struct rpc_queue waitpdu[HASHES];
uint32_t waitpdu_len;
#ifdef HAVE_MULTITHREADING
libnfs_mutex_t rpc_mutex;
#endif /* HAVE_MULTITHREADING */
uint32_t inpos;
char rm_buf[4];
@ -306,6 +310,11 @@ struct nfs_context {
verifier4 setclientid_confirm;
uint32_t seqid;
int has_lock_owner;
#ifdef HAVE_MULTITHREADING
int multithreading_enabled;
libnfs_mutex_t nfs_mutex;
libnfs_thread_t service_thread;
#endif /* HAVE_MULTITHREADING */
};
typedef int (*continue_func)(struct nfs_context *nfs, struct nfs_attr *attr,

View File

@ -1991,6 +1991,20 @@ EXTERN void nfs4_set_client_name(struct nfs_context *nfs, const char *id);
*/
EXTERN void nfs4_set_verifier(struct nfs_context *nfs, const char *verifier);
/*
* MULTITHREADING
*/
/*
* This function starts a separate service thread for multithreading support.
* When multithreading is enabled the eventdriven async API is no longer
* supported and you can only use the synchronous API.
*/
EXTERN int nfs_mt_service_thread_start(struct nfs_context *nfs);
/*
* Shutdown multithreading support.
*/
EXTERN void nfs_mt_service_thread_stop(struct nfs_context *nfs);
#ifdef __cplusplus
}
#endif

View File

@ -2,6 +2,7 @@ set(SOURCES init.c
libnfs.c
libnfs-sync.c
libnfs-zdr.c
multithreading.c
nfs_v3.c
nfs_v4.c
pdu.c

View File

@ -19,6 +19,7 @@ libnfs_la_SOURCES = \
libnfs.c \
libnfs-sync.c \
libnfs-zdr.c \
multithreading.c \
nfs_v3.c \
nfs_v4.c \
pdu.c \

View File

@ -1,3 +1,4 @@
/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */
/*
Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
@ -103,12 +104,21 @@ struct rpc_context *rpc_init_context(void)
rpc->uid = getuid();
rpc->gid = getgid();
#endif
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
rpc_reset_queue(&rpc->outqueue);
for (i = 0; i < HASHES; i++)
rpc_reset_queue(&rpc->waitpdu[i]);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
/* Default is no timeout */
rpc->timeout = -1;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_init(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
return rpc;
}
@ -131,6 +141,9 @@ struct rpc_context *rpc_init_server_context(int s)
rpc->is_udp = rpc_is_udp_socket(rpc);
rpc_reset_queue(&rpc->outqueue);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_init(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
return rpc;
}
@ -249,10 +262,12 @@ void rpc_set_gid(struct rpc_context *rpc, int gid) {
void rpc_set_error(struct rpc_context *rpc, const char *error_string, ...)
{
va_list ap;
char *old_error_string = rpc->error_string;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
char *old_error_string;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
old_error_string = rpc->error_string;
va_start(ap, error_string);
rpc->error_string = malloc(1024);
vsnprintf(rpc->error_string, 1024, error_string, ap);
@ -263,6 +278,9 @@ void rpc_set_error(struct rpc_context *rpc, const char *error_string, ...)
if (old_error_string != NULL) {
free(old_error_string);
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
}
char *rpc_get_error(struct rpc_context *rpc)
@ -287,20 +305,33 @@ static void rpc_purge_all_pdus(struct rpc_context *rpc, int status, const char *
* pdus when called.
*/
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
outqueue = rpc->outqueue;
rpc_reset_queue(&rpc->outqueue);
while ((pdu = outqueue.head) != NULL) {
outqueue.head = pdu->next;
pdu->next = NULL;
pdu->next = NULL;
pdu->cb(rpc, status, (void *) error, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
for (i = 0; i < HASHES; i++) {
struct rpc_queue waitqueue = rpc->waitpdu[i];
struct rpc_queue waitqueue;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
waitqueue = rpc->waitpdu[i];
rpc_reset_queue(&rpc->waitpdu[i]);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
while((pdu = waitqueue.head) != NULL) {
waitqueue.head = pdu->next;
pdu->next = NULL;
@ -310,8 +341,6 @@ static void rpc_purge_all_pdus(struct rpc_context *rpc, int status, const char *
}
assert(!rpc->outqueue.head);
for (i = 0; i < HASHES; i++)
assert(!rpc->waitpdu[i].head);
}
void rpc_error_all_pdus(struct rpc_context *rpc, const char *error)
@ -388,6 +417,9 @@ void rpc_destroy_context(struct rpc_context *rpc)
rpc->inbuf = NULL;
rpc->magic = 0;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_destroy(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
free(rpc);
}

File diff suppressed because it is too large Load Diff

View File

@ -52,6 +52,8 @@ nfs_mknod
nfs_mknod_async
nfs_mount
nfs_mount_async
nfs_mt_service_thread_start
nfs_mt_service_thread_stop
nfs_open
nfs_open_async
nfs_open2

View File

@ -542,6 +542,9 @@ nfs_init_context(void)
(int)time(NULL));
nfs4_set_client_name(nfs, client_name);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_init(&nfs->nfs_mutex);
#endif /* HAVE_MULTITHREADING */
return nfs;
}
@ -594,6 +597,9 @@ nfs_destroy_context(struct nfs_context *nfs)
nfs_free_nfsdir(nfsdir);
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_destroy(&nfs->nfs_mutex);
#endif /* HAVE_MULTITHREADING */
free(nfs);
}
@ -1917,6 +1923,9 @@ nfs_set_error(struct nfs_context *nfs, char *error_string, ...)
va_list ap;
char *str = NULL;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&nfs->rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
va_start(ap, error_string);
str = malloc(1024);
vsnprintf(str, 1024, error_string, ap);
@ -1925,6 +1934,9 @@ nfs_set_error(struct nfs_context *nfs, char *error_string, ...)
}
nfs->rpc->error_string = str;
va_end(ap);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&nfs->rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
}
struct mount_cb_data {

152
lib/multithreading.c Normal file
View File

@ -0,0 +1,152 @@
/* -*- mode:c; tab-width:8; c-basic-offset:8; indent-tabs-mode:nil; -*- */
/*
Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef AROS
#include "aros_compat.h"
#endif
#ifdef PS2_EE
#include "ps2_compat.h"
#endif
#ifdef PS3_PPU
#include "ps3_compat.h"
#endif
#ifdef WIN32
#include <win32/win32_compat.h>
#endif
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "libnfs.h"
#include "libnfs-raw.h"
#include "libnfs-private.h"
#ifdef HAVE_MULTITHREADING
#ifdef HAVE_PTHREAD
static void *nfs_mt_service_thread(void *arg)
{
struct nfs_context *nfs = (struct nfs_context *)arg;
struct pollfd pfd;
int revents;
int ret;
nfs->multithreading_enabled = 1;
while (nfs->multithreading_enabled) {
pfd.fd = nfs_get_fd(nfs);
pfd.events = nfs_which_events(nfs);
pfd.revents = 0;
//qqq ~12737 iterations with busy loop
ret = poll(&pfd, 1, 0);
if (ret < 0) {
nfs_set_error(nfs, "Poll failed");
revents = -1;
} else {
revents = pfd.revents;
}
if (nfs_service(nfs, revents) < 0) {
if (revents != -1)
nfs_set_error(nfs, "nfs_service failed");
}
}
return NULL;
}
int nfs_mt_service_thread_start(struct nfs_context *nfs)
{
if (pthread_create(&nfs->service_thread, NULL,
&nfs_mt_service_thread, nfs)) {
nfs_set_error(nfs, "Failed to start service thread");
return -1;
}
while (nfs->multithreading_enabled == 0) {
struct timespec ts = {0, 1000000};
nanosleep(&ts, NULL);
}
return 0;
}
void nfs_mt_service_thread_stop(struct nfs_context *nfs)
{
nfs->multithreading_enabled = 0;
pthread_join(nfs->service_thread, NULL);
}
int nfs_mt_mutex_init(libnfs_mutex_t *mutex)
{
pthread_mutex_init(mutex, NULL);
return 0;
}
int nfs_mt_mutex_destroy(libnfs_mutex_t *mutex)
{
pthread_mutex_destroy(mutex);
return 0;
}
int nfs_mt_mutex_lock(libnfs_mutex_t *mutex)
{
return pthread_mutex_lock(mutex);
}
int nfs_mt_mutex_unlock(libnfs_mutex_t *mutex)
{
return pthread_mutex_unlock(mutex);
}
int nfs_mt_sem_init(libnfs_sem_t *sem, int value)
{
return sem_init(sem, 0, value);
}
int nfs_mt_sem_destroy(libnfs_sem_t *sem)
{
return sem_destroy(sem);
}
int nfs_mt_sem_post(libnfs_sem_t *sem)
{
return sem_post(sem);
}
int nfs_mt_sem_wait(libnfs_sem_t *sem)
{
return sem_wait(sem);
}
#endif /* HAVE_PTHREAD */
#endif /* HAVE_MULTITHREADING */

View File

@ -160,7 +160,13 @@ struct rpc_pdu *rpc_allocate_pdu2(struct rpc_context *rpc, int program, int vers
return NULL;
}
memset(pdu, 0, pdu_size);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
pdu->xid = rpc->xid++;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
pdu->cb = cb;
pdu->private_data = private_data;
pdu->zdr_decode_fn = zdr_decode_fn;
@ -222,7 +228,13 @@ void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
void rpc_set_next_xid(struct rpc_context *rpc, uint32_t xid)
{
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
rpc->xid = xid;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
}
int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
@ -263,8 +275,14 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
}
hash = rpc_hash_xid(pdu->xid);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
rpc_enqueue(&rpc->waitpdu[hash], pdu);
rpc->waitpdu_len++;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
return 0;
}
@ -274,7 +292,13 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
zdr_int(&pdu->zdr, &recordmarker);
pdu->outdata.size = size;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
rpc_enqueue(&rpc->outqueue, pdu);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
return 0;
}
@ -560,6 +584,9 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
/* Look up the transaction in a hash table of our requests */
hash = rpc_hash_xid(xid);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
q = &rpc->waitpdu[hash];
/* Follow the hash chain. Linear traverse singly-linked list,
@ -580,6 +607,9 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
prev_pdu->next = pdu->next;
rpc->waitpdu_len--;
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
if (rpc_process_reply(rpc, pdu, &zdr) != 0) {
rpc_set_error(rpc, "rpc_procdess_reply failed");
}
@ -592,6 +622,9 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
}
return 0;
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
zdr_destroy(&zdr);
if (reasbuf != NULL) {

View File

@ -214,9 +214,15 @@ rpc_which_events(struct rpc_context *rpc)
return POLLIN;
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
if (rpc_has_queue(&rpc->outqueue)) {
events |= POLLOUT;
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
return events;
}
@ -225,7 +231,8 @@ rpc_write_to_socket(struct rpc_context *rpc)
{
int32_t count;
struct rpc_pdu *pdu;
int ret = 0;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
if (rpc->fd == -1) {
@ -233,6 +240,9 @@ rpc_write_to_socket(struct rpc_context *rpc)
return -1;
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
while ((pdu = rpc->outqueue.head) != NULL) {
int64_t total;
@ -242,11 +252,13 @@ rpc_write_to_socket(struct rpc_context *rpc)
(int)(total - pdu->written), MSG_NOSIGNAL);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
ret = 0;
goto finished;
}
rpc_set_error(rpc, "Error when writing to socket :%s"
"(%d)", strerror(errno), errno);
return -1;
ret = -1;
goto finished;
}
pdu->written += count;
@ -259,7 +271,8 @@ rpc_write_to_socket(struct rpc_context *rpc)
if (pdu->flags & PDU_DISCARD_AFTER_SENDING) {
rpc_free_pdu(rpc, pdu);
return 0;
ret = 0;
goto finished;
}
hash = rpc_hash_xid(pdu->xid);
@ -267,7 +280,12 @@ rpc_write_to_socket(struct rpc_context *rpc)
rpc->waitpdu_len++;
}
}
return 0;
finished:
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
return ret;
}
#define MAX_UDP_SIZE 65536
@ -402,6 +420,9 @@ rpc_timeout_scan(struct rpc_context *rpc)
uint64_t t = rpc_current_time();
unsigned int i;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
for (pdu = rpc->outqueue.head; pdu; pdu = next_pdu) {
next_pdu = pdu->next;
@ -415,7 +436,7 @@ rpc_timeout_scan(struct rpc_context *rpc)
}
LIBNFS_LIST_REMOVE(&rpc->outqueue.head, pdu);
if (!rpc->outqueue.head) {
rpc->outqueue.tail = NULL;
rpc->outqueue.tail = NULL; //done
}
rpc_set_error(rpc, "command timed out");
pdu->cb(rpc, RPC_STATUS_TIMEOUT,
@ -423,8 +444,9 @@ rpc_timeout_scan(struct rpc_context *rpc)
rpc_free_pdu(rpc, pdu);
}
for (i = 0; i < HASHES; i++) {
struct rpc_queue *q = &rpc->waitpdu[i];
struct rpc_queue *q;
q = &rpc->waitpdu[i];
for (pdu = q->head; pdu; pdu = next_pdu) {
next_pdu = pdu->next;
@ -440,12 +462,17 @@ rpc_timeout_scan(struct rpc_context *rpc)
if (!q->head) {
q->tail = NULL;
}
// qqq move to a temporary queue and process after
// we drop the mutex
rpc_set_error(rpc, "command timed out");
pdu->cb(rpc, RPC_STATUS_TIMEOUT,
NULL, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
}
int
@ -865,6 +892,10 @@ rpc_reconnect_requeue(struct rpc_context *rpc)
* in flight. Move them all over from the waitpdu queue back to the
* out queue.
*/
printf("reconnect reset waitpdu queues\n");
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
for (i = 0; i < HASHES; i++) {
struct rpc_queue *q = &rpc->waitpdu[i];
for (pdu = q->head; pdu; pdu = next) {
@ -876,6 +907,9 @@ rpc_reconnect_requeue(struct rpc_context *rpc)
rpc_reset_queue(q);
}
rpc->waitpdu_len = 0;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
if (rpc->auto_reconnect < 0 || rpc->num_retries > 0) {
rpc->num_retries--;
@ -997,7 +1031,13 @@ rpc_queue_length(struct rpc_context *rpc)
i++;
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
i += rpc->waitpdu_len;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
#endif /* HAVE_MULTITHREADING */
return i;
}

View File

@ -1,5 +1,9 @@
bin_PROGRAMS = nfs-cat nfs-ls
if HAVE_PTHREAD
LIBS += -lpthread
endif
if !HAVE_WIN32
bin_PROGRAMS += nfs-cp nfs-stat
endif