nfs4 multithreading: create a new lock_owner for every single open()

This allows us to perform unlimited concurrent opens as we are longer
bound by rfc3530  8.1.5 which limits to only one active OPEN command
per lock_owner

Signed-off-by: Ronnie Sahlberg <ronniesahlberg@gmail.com>
master
Ronnie Sahlberg 2022-01-05 16:16:27 +10:00
parent 61e5863e46
commit 0550a5c83f
3 changed files with 38 additions and 48 deletions

View File

@ -312,11 +312,10 @@ struct nfs_context {
char *client_name;
uint64_t clientid;
verifier4 setclientid_confirm;
uint32_t seqid;
uint32_t open_counter;
int has_lock_owner;
#ifdef HAVE_MULTITHREADING
int multithreading_enabled;
libnfs_mutex_t nfs_mutex;
libnfs_thread_t service_thread;
libnfs_mutex_t nfs4_open_mutex;
#endif /* HAVE_MULTITHREADING */
@ -407,6 +406,7 @@ struct nfsfh {
/* NFSv4 */
struct stateid stateid;
/* locking */
uint32_t open_seqid;
uint32_t lock_seqid;
struct stateid lock_stateid;
};

View File

@ -542,13 +542,12 @@ nfs_init_context(void)
v >>= 8;
}
nfs4_set_verifier(nfs, verifier);
snprintf(client_name, MAX_CLIENT_NAME, "Libnfs pid:%d %d", getpid(),
(int)time(NULL));
nfs4_set_client_name(nfs, client_name);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_init(&nfs->nfs_mutex);
nfs_mt_mutex_init(&nfs->nfs4_open_mutex);
#endif /* HAVE_MULTITHREADING */
return nfs;
@ -605,7 +604,6 @@ nfs_destroy_context(struct nfs_context *nfs)
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_destroy(&nfs->nfs4_open_mutex);
nfs_mt_mutex_destroy(&nfs->nfs_mutex);
#endif /* HAVE_MULTITHREADING */
free(nfs);
}

View File

@ -162,6 +162,9 @@ struct nfs4_cb_data {
nfs_cb cb;
void *private_data;
/* Used to track lock_owner during open() */
uint32_t lock_owner;
/* internal callback */
rpc_cb continue_cb;
@ -244,11 +247,6 @@ nfs4_resolve_path(struct nfs_context *nfs, const char *path)
static void
free_nfs4_cb_data(struct nfs4_cb_data *data)
{
#ifdef HAVE_MULTITHREADING
if (data->flags & MUTEX_HELD) {
nfs_mt_mutex_unlock(&data->nfs->nfs4_open_mutex);
}
#endif
free(data->path);
free(data->filler.data);
if (data->filler.blob0.val && data->filler.blob0.free) {
@ -657,7 +655,7 @@ nfs4_op_close(struct nfs_context *nfs, nfs_argop4 *op, struct nfsfh *fh)
op[i].argop = OP_CLOSE;
clargs = &op[i++].nfs_argop4_u.opclose;
clargs->seqid = nfs->seqid;
clargs->seqid = fh->open_seqid;
clargs->open_stateid.seqid = fh->stateid.seqid;
memcpy(clargs->open_stateid.other, fh->stateid.other, 12);
@ -706,8 +704,7 @@ nfs4_op_setclientid(struct nfs_context *nfs, nfs_argop4 *op, verifier4 verifier,
}
static int
nfs4_op_open_confirm(struct nfs_context *nfs, nfs_argop4 *op, uint32_t seqid,
struct nfsfh *fh)
nfs4_op_open_confirm(struct nfs_context *nfs, nfs_argop4 *op, struct nfsfh *fh)
{
OPEN_CONFIRM4args *ocargs;
@ -715,7 +712,7 @@ nfs4_op_open_confirm(struct nfs_context *nfs, nfs_argop4 *op, uint32_t seqid,
ocargs = &op[0].nfs_argop4_u.opopen_confirm;
ocargs->open_stateid.seqid = fh->stateid.seqid;
memcpy(&ocargs->open_stateid.other, fh->stateid.other, 12);
ocargs->seqid = seqid;
ocargs->seqid = fh->open_seqid;
return 1;
}
@ -958,7 +955,7 @@ nfs4_op_lock(struct nfs_context *nfs, nfs_argop4 *op, struct nfsfh *fh,
} else {
largs->locker.new_lock_owner = 1;
largs->locker.locker4_u.open_owner.open_seqid =
nfs->seqid;
fh->open_seqid;
largs->locker.locker4_u.open_owner.open_stateid.seqid =
fh->stateid.seqid;
memcpy(largs->locker.locker4_u.open_owner.open_stateid.other,
@ -1923,7 +1920,7 @@ nfs4_rmdir_async(struct nfs_context *nfs, const char *path,
}
static void
nfs_increment_seqid(struct nfs_context *nfs, uint32_t status)
nfs_increment_seqid(struct nfsfh *nfsfh, uint32_t status)
{
/* RFC3530 8.1.5 */
switch (status) {
@ -1936,7 +1933,7 @@ nfs_increment_seqid(struct nfs_context *nfs, uint32_t status)
case NFS4ERR_NOFILEHANDLE:
break;
default:
nfs->seqid++;
nfsfh->open_seqid++;
}
}
@ -2032,12 +2029,12 @@ nfs4_open_confirm_cb(struct rpc_context *rpc, int status, void *command_data,
COMPOUND4res *res = command_data;
OPEN_CONFIRM4resok *ocresok;
int i;
struct nfsfh *fh;
struct nfsfh *fh = data->filler.blob0.val;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
if (res) {
nfs_increment_seqid(nfs, res->status);
nfs_increment_seqid(fh, res->status);
}
if (check_nfs4_error(nfs, status, data, res, "OPEN_CONFIRM")) {
@ -2079,10 +2076,6 @@ nfs4_open_cb(struct rpc_context *rpc, int status, void *command_data,
assert(rpc->magic == RPC_CONTEXT_MAGIC);
if (res) {
nfs_increment_seqid(nfs, res->status);
}
if (check_nfs4_error(nfs, status, data, res, "OPEN")) {
return;
}
@ -2129,6 +2122,7 @@ nfs4_open_cb(struct rpc_context *rpc, int status, void *command_data,
return;
}
memcpy(fh->fh.val, gresok->object.nfs_fh4_val, fh->fh.len);
fh->open_seqid = 1;
if (data->filler.flags & O_SYNC) {
fh->is_sync = 1;
@ -2153,7 +2147,7 @@ nfs4_open_cb(struct rpc_context *rpc, int status, void *command_data,
memset(op, 0, sizeof(op));
i = nfs4_op_putfh(nfs, &op[0], fh);
i += nfs4_op_open_confirm(nfs, &op[i], nfs->seqid, fh);
i += nfs4_op_open_confirm(nfs, &op[i], fh);
memset(&args, 0, sizeof(args));
args.argarray.argarray_len = i;
@ -2227,7 +2221,6 @@ nfs4_populate_open(struct nfs4_cb_data *data, nfs_argop4 *op)
oargs = &op[i++].nfs_argop4_u.opopen;
memset(oargs, 0, sizeof(*oargs));
oargs->seqid = nfs->seqid;
if (access_mask & ACCESS4_READ) {
oargs->share_access |= OPEN4_SHARE_ACCESS_READ;
}
@ -2236,8 +2229,9 @@ nfs4_populate_open(struct nfs4_cb_data *data, nfs_argop4 *op)
}
oargs->share_deny = OPEN4_SHARE_DENY_NONE;
oargs->owner.clientid = nfs->clientid;
oargs->owner.owner.owner_len = strlen(nfs->client_name);
oargs->owner.owner.owner_val = nfs->client_name;
oargs->owner.owner.owner_len = 4;
oargs->owner.owner.owner_val = (char *)&data->lock_owner;
oargs->seqid = 0;
if (data->filler.flags & O_CREAT) {
createhow4 *ch;
fattr4 *fa;
@ -2328,6 +2322,13 @@ nfs4_open_readlink_cb(struct rpc_context *rpc, int status, void *command_data,
data_split_path(data);
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&data->nfs->nfs4_open_mutex);
#endif
data->lock_owner = nfs->open_counter++;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&data->nfs->nfs4_open_mutex);
#endif
data->filler.func = nfs4_populate_open;
data->filler.max_op = 3;
@ -2444,6 +2445,13 @@ nfs4_open_async_internal(struct nfs_context *nfs, struct nfs4_cb_data *data,
data->filler.blob2.free = free;
}
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_lock(&data->nfs->nfs4_open_mutex);
#endif
data->lock_owner = nfs->open_counter++;
#ifdef HAVE_MULTITHREADING
nfs_mt_mutex_unlock(&data->nfs->nfs4_open_mutex);
#endif
data->filler.func = nfs4_populate_open;
data->filler.max_op = 3;
data->filler.flags = flags;
@ -2509,12 +2517,6 @@ nfs4_open_async(struct nfs_context *nfs, const char *path, int flags,
memcpy(data->filler.blob3.val, &m, sizeof(uint32_t));
}
#ifdef HAVE_MULTITHREADING
if (nfs->multithreading_enabled) {
nfs_mt_mutex_lock(&nfs->nfs4_open_mutex);
data->flags |= MUTEX_HELD;
}
#endif
ret = nfs4_open_async_internal(nfs, data, flags, mode);
return ret;
}
@ -2643,11 +2645,12 @@ nfs4_close_cb(struct rpc_context *rpc, int status, void *command_data,
struct nfs4_cb_data *data = private_data;
struct nfs_context *nfs = data->nfs;
COMPOUND4res *res = command_data;
struct nfsfh *nfsfh = data->filler.blob0.val;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
if (res) {
nfs_increment_seqid(nfs, res->status);
nfs_increment_seqid(nfsfh, res->status);
}
if (check_nfs4_error(nfs, status, data, res, "CLOSE")) {
@ -2675,12 +2678,6 @@ nfs4_close_async(struct nfs_context *nfs, struct nfsfh *nfsfh, nfs_cb cb,
}
memset(data, 0, sizeof(*data));
#ifdef HAVE_MULTITHREADING
if (nfs->multithreading_enabled) {
nfs_mt_mutex_lock(&nfs->nfs4_open_mutex);
data->flags |= MUTEX_HELD;
}
#endif
data->nfs = nfs;
data->cb = cb;
data->private_data = private_data;
@ -3797,11 +3794,12 @@ nfs4_truncate_close_cb(struct rpc_context *rpc, int status, void *command_data,
struct nfs4_cb_data *data = private_data;
struct nfs_context *nfs = data->nfs;
COMPOUND4res *res = command_data;
struct nfsfh *fh = data->filler.blob0.val;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
if (res) {
nfs_increment_seqid(nfs, res->status);
nfs_increment_seqid(fh, res->status);
}
if (check_nfs4_error(nfs, status, data, res, "CLOSE")) {
@ -3875,12 +3873,6 @@ nfs4_truncate_async(struct nfs_context *nfs, const char *path, uint64_t length,
length = nfs_hton64(length);
memcpy(data->filler.blob3.val, &length, sizeof(uint64_t));
#ifdef HAVE_MULTITHREADING
if (nfs->multithreading_enabled) {
nfs_mt_mutex_lock(&nfs->nfs4_open_mutex);
data->flags |= MUTEX_HELD;
}
#endif
if (nfs4_open_async_internal(nfs, data, O_WRONLY, 0) < 0) {
return -1;
}