Merge pull request #146 from plieven/pagecache_v2

Pagecache v2
libnfs-4.0.0-vitalif
Ronnie Sahlberg 2016-05-17 06:18:26 -07:00
commit cef46f4803
6 changed files with 214 additions and 130 deletions

View File

@ -78,23 +78,14 @@ fi
AC_SUBST(MAYBE_EXAMPLES)
WERROR_CFLAGS=""
AC_MSG_CHECKING(whether to set -Werror)
AC_ARG_ENABLE(werror, [ --disable-werror do not treat warnings as errors during build],
[ case "${enableval}" in
no)
AC_MSG_RESULT(no)
;;
*)
AC_MSG_RESULT(yes)
WERROR_CFLAGS="-Werror"
;;
esac ],
)
AC_ARG_ENABLE([werror], [AS_HELP_STRING([--disable-werror],
[Disables building with -Werror by default])])
if test "$ac_cv_prog_gcc" = yes; then
WARN_CFLAGS="-Wall -Wshadow -Wno-write-strings -Wstrict-prototypes -Wpointer-arith -Wcast-align -Wno-strict-aliasing $WERROR_CFLAGS"
WARN_CFLAGS="-Wall -Wshadow -Wno-write-strings -Wstrict-prototypes -Wpointer-arith -Wcast-align -Wno-strict-aliasing"
if test "x$enable_werror" != "xno"; then
WARN_CFLAGS="$WARN_CFLAGS -Werror"
fi
fi
AC_SUBST(WARN_CFLAGS)

View File

@ -32,6 +32,13 @@
extern "C" {
#endif
#ifndef MIN
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#endif
#ifndef MAX
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
#endif
#if !defined(HAVE_SOCKADDR_STORAGE) && !defined(WIN32)
/*
* RFC 2553: protocol-independent placeholder for socket addresses
@ -119,6 +126,8 @@ struct rpc_context {
int uid;
int gid;
uint32_t readahead;
uint32_t pagecache;
uint32_t pagecache_ttl;
int debug;
int timeout;
};
@ -187,6 +196,8 @@ void rpc_unset_autoreconnect(struct rpc_context *rpc);
void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v);
void rpc_set_uid(struct rpc_context *rpc, int uid);
void rpc_set_gid(struct rpc_context *rpc, int gid);
void rpc_set_pagecache(struct rpc_context *rpc, uint32_t v);
void rpc_set_pagecache_ttl(struct rpc_context *rpc, uint32_t v);
void rpc_set_readahead(struct rpc_context *rpc, uint32_t v);
void rpc_set_debug(struct rpc_context *rpc, int level);
void rpc_set_timeout(struct rpc_context *rpc, int timeout);

View File

@ -37,8 +37,10 @@ extern "C" {
#endif
#define LIBNFS_FEATURE_READAHEAD
#define LIBNFS_FEATURE_PAGECACHE
#define LIBNFS_FEATURE_DEBUG
#define NFS_BLKSIZE 4096
#define NFS_PAGECACHE_DEFAULT_TTL 5
struct nfs_context;
struct rpc_context;
@ -219,10 +221,17 @@ EXTERN uint64_t nfs_get_writemax(struct nfs_context *nfs);
EXTERN void nfs_set_tcp_syncnt(struct nfs_context *nfs, int v);
EXTERN void nfs_set_uid(struct nfs_context *nfs, int uid);
EXTERN void nfs_set_gid(struct nfs_context *nfs, int gid);
EXTERN void nfs_set_pagecache(struct nfs_context *nfs, uint32_t v);
EXTERN void nfs_set_pagecache_ttl(struct nfs_context *nfs, uint32_t v);
EXTERN void nfs_set_readahead(struct nfs_context *nfs, uint32_t v);
EXTERN void nfs_set_debug(struct nfs_context *nfs, int level);
EXTERN void nfs_set_dircache(struct nfs_context *nfs, int enabled);
/*
* Invalidate the pagecache
*/
EXTERN void nfs_pagecache_invalidate(struct nfs_context *nfs, struct nfsfh *nfsfh);
/*
* Sets timeout in milliseconds. A negative value means infinite timeout.
*/

View File

@ -80,6 +80,7 @@ struct rpc_context *rpc_init_context(void)
salt += 0x01000000;
rpc->fd = -1;
rpc->tcp_syncnt = RPC_PARAM_UNDEFINED;
rpc->pagecache_ttl = NFS_PAGECACHE_DEFAULT_TTL;
#if defined(WIN32) || defined(ANDROID)
rpc->uid = 65534;
rpc->gid = 65534;
@ -97,11 +98,44 @@ struct rpc_context *rpc_init_context(void)
return rpc;
}
uint32_t static round_to_power_of_two(uint32_t x) {
uint32_t power = 1;
while (power < x) {
power <<= 1;
}
return power;
}
void rpc_set_pagecache(struct rpc_context *rpc, uint32_t v)
{
assert(rpc->magic == RPC_CONTEXT_MAGIC);
v = MAX(rpc->pagecache, round_to_power_of_two(v));
RPC_LOG(rpc, 2, "pagecache set to %d pages of size %d", v, NFS_BLKSIZE);
rpc->pagecache = v;
}
void rpc_set_pagecache_ttl(struct rpc_context *rpc, uint32_t v) {
if (v) {
RPC_LOG(rpc, 2, "set pagecache ttl to %d seconds\n", v);
} else {
RPC_LOG(rpc, 2, "set pagecache ttl to infinite");
}
rpc->pagecache_ttl = v;
}
void rpc_set_readahead(struct rpc_context *rpc, uint32_t v)
{
assert(rpc->magic == RPC_CONTEXT_MAGIC);
if (v) {
v = MAX(NFS_BLKSIZE, round_to_power_of_two(v));
}
RPC_LOG(rpc, 2, "readahead set to %d byte", v);
rpc->readahead = v;
if (v) {
/* current pagecache implementation needs a pagecache bigger
* than the readahead size to avoid collisions */
rpc_set_pagecache(rpc, (2 * v) / NFS_BLKSIZE);
}
}
void rpc_set_debug(struct rpc_context *rpc, int level)

View File

@ -53,6 +53,7 @@ nfs_parse_url_full
nfs_parse_url_dir
nfs_parse_url_incomplete
nfs_destroy_url
nfs_pagecache_invalidate
nfs_pread
nfs_pread_async
nfs_pwrite
@ -71,6 +72,8 @@ nfs_set_auth
nfs_set_debug
nfs_set_dircache
nfs_set_gid
nfs_set_pagecache
nfs_set_pagecache_ttl
nfs_set_readahead
nfs_set_tcp_syncnt
nfs_set_timeout

View File

@ -27,6 +27,9 @@
#ifdef WIN32
#include "win32_compat.h"
#define PRIu64 "llu"
#else
#include <inttypes.h>
#endif
#ifdef HAVE_UTIME_H
@ -100,20 +103,28 @@ struct nfsdir {
struct nfs_readahead {
uint64_t fh_offset;
uint64_t last_offset;
uint64_t buf_offset;
uint64_t buf_count;
time_t buf_ts;
char *buf;
uint32_t cur_ra;
};
struct nfs_pagecache_entry {
char buf[NFS_BLKSIZE];
uint64_t offset;
time_t ts;
};
struct nfs_pagecache {
struct nfs_pagecache_entry *entries;
uint32_t num_entries;
uint32_t ttl;
};
struct nfsfh {
struct nfs_fh3 fh;
int is_sync;
int is_append;
uint64_t offset;
struct nfs_readahead ra;
struct nfs_pagecache pagecache;
};
struct nested_mounts {
@ -192,6 +203,50 @@ static void nfs_dircache_drop(struct nfs_context *nfs, struct nfs_fh3 *fh)
}
}
static uint32_t nfs_pagecache_hash(struct nfs_pagecache *pagecache, uint64_t offset) {
return (2654435761 * (1 + ((uint32_t)(offset) / NFS_BLKSIZE))) & (pagecache->num_entries - 1);
}
void nfs_pagecache_invalidate(struct nfs_context *nfs, struct nfsfh *nfsfh) {
if (nfsfh->pagecache.entries) {
RPC_LOG(nfs->rpc, 2, "invalidating pagecache");
memset(nfsfh->pagecache.entries, 0x00, sizeof(struct nfs_pagecache_entry) * nfsfh->pagecache.num_entries);
}
}
void nfs_pagecache_put(struct nfs_pagecache *pagecache, uint64_t offset, char *buf, int len) {
time_t ts = time(NULL);
if (!pagecache->num_entries) return;
while (len > 0) {
uint64_t page_offset = offset & ~(NFS_BLKSIZE - 1);
uint32_t entry = nfs_pagecache_hash(pagecache, page_offset);
struct nfs_pagecache_entry *e = &pagecache->entries[entry];
uint64_t n = MIN(NFS_BLKSIZE - offset % NFS_BLKSIZE, len);
/* we can only write to the cache if we add a full page or
* partially update a page that is still valid */
if (n == NFS_BLKSIZE ||
(e->ts && e->offset == page_offset &&
(!pagecache->ttl || ts - e->ts <= pagecache->ttl))) {
e->ts = ts;
e->offset = page_offset;
memcpy(e->buf + offset % NFS_BLKSIZE, buf, n);
}
buf += n;
offset += n;
len -= n;
}
}
char *nfs_pagecache_get(struct nfs_pagecache *pagecache, uint64_t offset) {
assert(!(offset % NFS_BLKSIZE));
uint32_t entry = nfs_pagecache_hash(pagecache, offset);
struct nfs_pagecache_entry *e = &pagecache->entries[entry];
if (offset != e->offset) return NULL;
if (!e->ts) return NULL;
if (pagecache->ttl && time(NULL) - e->ts > pagecache->ttl) return NULL;
return e->buf;
}
struct nfs_cb_data;
typedef int (*continue_func)(struct nfs_context *nfs, fattr3 *attr,
struct nfs_cb_data *data);
@ -220,13 +275,13 @@ struct nfs_cb_data {
uint64_t offset, count, max_offset, org_offset, org_count;
char *buffer;
char *usrbuf;
int update_pos;
};
struct nfs_mcb_data {
struct nfs_cb_data *data;
uint64_t offset;
uint64_t count;
int update_pos;
};
static int nfs_lookup_path_async_internal(struct nfs_context *nfs, fattr3 *attr, struct nfs_cb_data *data, struct nfs_fh3 *fh);
@ -272,6 +327,8 @@ static int nfs_set_context_args(struct nfs_context *nfs, const char *arg, const
rpc_set_gid(nfs_get_rpc_context(nfs), atoi(val));
} else if (!strcmp(arg, "readahead")) {
rpc_set_readahead(nfs_get_rpc_context(nfs), atoi(val));
} else if (!strcmp(arg, "pagecache")) {
rpc_set_pagecache(nfs_get_rpc_context(nfs), atoi(val));
} else if (!strcmp(arg, "debug")) {
rpc_set_debug(nfs_get_rpc_context(nfs), atoi(val));
} else if (!strcmp(arg, "auto-traverse-mounts")) {
@ -752,7 +809,7 @@ static void free_nfsfh(struct nfsfh *nfsfh)
free(nfsfh->fh.data.data_val);
nfsfh->fh.data.data_val = NULL;
}
free(nfsfh->ra.buf);
free(nfsfh->pagecache.entries);
free(nfsfh);
}
@ -887,7 +944,7 @@ static void nfs_mount_10_cb(struct rpc_context *rpc, int status, void *command_d
nfs->writemax = res->FSINFO3res_u.resok.wtmax;
if (nfs->readmax > NFS_MAX_XFER_SIZE) {
rpc_set_error(rpc, "server max rsize of %lu is greater than libnfs supported %d bytes",
rpc_set_error(rpc, "server max rsize of %" PRIu64 " is greater than libnfs supported %d bytes",
nfs->readmax, NFS_MAX_XFER_SIZE);
data->cb(-EINVAL, nfs, command_data, data->private_data);
free_nfs_cb_data(data);
@ -895,7 +952,7 @@ static void nfs_mount_10_cb(struct rpc_context *rpc, int status, void *command_d
}
if (nfs->writemax > NFS_MAX_XFER_SIZE) {
rpc_set_error(rpc, "server max wsize of %lu is greater than libnfs supported %d bytes",
rpc_set_error(rpc, "server max wsize of %" PRIu64 " is greater than libnfs supported %d bytes",
nfs->writemax, NFS_MAX_XFER_SIZE);
data->cb(-EINVAL, nfs, command_data, data->private_data);
free_nfs_cb_data(data);
@ -2065,6 +2122,15 @@ static void nfs_open_cb(struct rpc_context *rpc, int status, void *command_data,
nfsfh->fh = data->fh;
data->fh.data.data_val = NULL;
/* init page cache */
if (rpc->pagecache) {
nfsfh->pagecache.num_entries = rpc->pagecache;
nfsfh->pagecache.ttl = rpc->pagecache_ttl;
nfsfh->pagecache.entries = malloc(sizeof(struct nfs_pagecache_entry) * nfsfh->pagecache.num_entries);
nfs_pagecache_invalidate(nfs, nfsfh);
RPC_LOG(nfs->rpc, 2, "init pagecache entries %d pagesize %d\n", nfsfh->pagecache.num_entries, NFS_BLKSIZE);
}
data->cb(0, nfs, nfsfh, data->private_data);
free_nfs_cb_data(data);
}
@ -2178,25 +2244,10 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat
} else {
uint64_t count = res->READ3res_u.resok.count;
if (mdata->update_pos)
data->nfsfh->offset += count;
/* if we have more than one call or we have received a short read we need a reassembly buffer */
if (data->num_calls || (count < mdata->count && !res->READ3res_u.resok.eof)) {
if (data->buffer == NULL) {
data->buffer = malloc(data->count);
if (data->buffer == NULL) {
rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)data->count);
data->oom = 1;
}
}
}
if (count > 0) {
if (count <= mdata->count) {
/* copy data into reassembly buffer if we have one */
if (data->buffer != NULL) {
memcpy(&data->buffer[mdata->offset - data->offset], res->READ3res_u.resok.data.data_val, count);
}
/* copy data into reassembly buffer */
memcpy(&data->buffer[mdata->offset - data->offset], res->READ3res_u.resok.data.data_val, count);
if (data->max_offset < mdata->offset + count) {
data->max_offset = mdata->offset + count;
}
@ -2250,39 +2301,22 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat
return;
}
if (data->buffer) {
if (data->max_offset > data->org_offset + data->org_count) {
data->max_offset = data->org_offset + data->org_count;
}
cb_err = data->max_offset - data->org_offset;
cb_data = data->buffer + (data->org_offset - data->offset);
} else {
res = command_data;
cb_err = res->READ3res_u.resok.count;
cb_data = res->READ3res_u.resok.data.data_val;
}
data->nfsfh->ra.fh_offset = data->max_offset;
if (data->nfsfh->ra.cur_ra) {
free(data->nfsfh->ra.buf);
data->nfsfh->ra.buf = data->buffer;
data->nfsfh->ra.buf_offset = data->offset;
data->nfsfh->ra.buf_count = data->count;
data->nfsfh->ra.buf_ts = time(NULL);
data->buffer = NULL;
nfs_pagecache_put(&data->nfsfh->pagecache, data->offset, data->buffer, data->max_offset - data->offset);
if (data->max_offset > data->org_offset + data->org_count) {
data->max_offset = data->org_offset + data->org_count;
}
if (data->update_pos) {
data->nfsfh->offset = data->max_offset;
}
cb_err = data->max_offset - data->org_offset;
cb_data = data->buffer + (data->org_offset - data->offset);
data->cb(cb_err, nfs, cb_data, data->private_data);
free_nfs_cb_data(data);
}
static void nfs_ra_invalidate(struct nfsfh *nfsfh) {
free(nfsfh->ra.buf);
nfsfh->ra.buf = NULL;
nfsfh->ra.buf_offset = 0;
nfsfh->ra.buf_count = 0;
nfsfh->ra.buf_ts = time(NULL);
nfsfh->ra.cur_ra = NFS_BLKSIZE;
return;
}
static int nfs_pread_async_internal(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offset, uint64_t count, nfs_cb cb, void *private_data, int update_pos)
@ -2301,72 +2335,65 @@ static int nfs_pread_async_internal(struct nfs_context *nfs, struct nfsfh *nfsfh
data->nfsfh = nfsfh;
data->org_offset = offset;
data->org_count = count;
data->update_pos = update_pos;
assert(data->num_calls == 0);
if (nfs->rpc->readahead && time(NULL) - nfsfh->ra.buf_ts > NFS_RA_TIMEOUT) {
/* readahead cache timeout */
nfs_ra_invalidate(nfsfh);
if (nfsfh->pagecache.num_entries) {
/* align start offset to blocksize */
count += offset & (NFS_BLKSIZE - 1);
offset &= ~(NFS_BLKSIZE - 1);
/* align end offset to blocksize */
count += NFS_BLKSIZE - 1 ;
count &= ~(NFS_BLKSIZE - 1);
}
data->offset = offset;
data->count = count;
nfsfh->ra.cur_ra = MAX(NFS_BLKSIZE, nfsfh->ra.cur_ra);
data->buffer = malloc(count + nfs->rpc->readahead);
if (data->buffer == NULL) {
free_nfs_cb_data(data);
return -ENOMEM;
}
if (nfsfh->pagecache.num_entries) {
while (count > 0) {
char *cdata = nfs_pagecache_get(&nfsfh->pagecache, offset);
if (!cdata) {
break;
}
memcpy(data->buffer + offset - data->offset, cdata, NFS_BLKSIZE);
offset += NFS_BLKSIZE;
count -= NFS_BLKSIZE;
}
if (!count) {
data->nfsfh->ra.fh_offset = data->offset + data->count;
if (update_pos) {
data->nfsfh->offset = data->org_offset + data->org_count;
}
data->cb(data->org_count, nfs, data->buffer + (data->org_offset - data->offset), data->private_data);
free_nfs_cb_data(data);
return 0;
}
}
if (nfs->rpc->readahead) {
if (offset >= nfsfh->ra.last_offset &&
if (offset >= nfsfh->ra.fh_offset &&
offset - NFS_BLKSIZE <= nfsfh->ra.fh_offset + nfsfh->ra.cur_ra) {
if (nfs->rpc->readahead > nfsfh->ra.cur_ra) {
nfsfh->ra.cur_ra <<= 1;
}
} else {
nfsfh->ra.cur_ra = NFS_BLKSIZE;
nfsfh->ra.cur_ra = 0;
}
nfsfh->ra.last_offset = offset;
if (nfsfh->ra.buf_offset <= offset &&
nfsfh->ra.buf_offset + nfsfh->ra.buf_count >= offset + count) {
/* serve request completely from cache */
data->buffer = malloc(count);
if (data->buffer == NULL) {
free_nfs_cb_data(data);
return -ENOMEM;
}
memcpy(data->buffer, nfsfh->ra.buf + (offset - nfsfh->ra.buf_offset), count);
data->cb(count, nfs, data->buffer, data->private_data);
nfsfh->ra.fh_offset = offset + count;
free_nfs_cb_data(data);
return 0;
}
/* align start offset to blocksize */
count += offset & (NFS_BLKSIZE - 1);
offset &= ~(NFS_BLKSIZE - 1);
/* align end offset to blocksize and add readahead */
count += nfsfh->ra.cur_ra - 1;
count &= ~(NFS_BLKSIZE - 1);
data->buffer = malloc(count);
if (data->buffer == NULL) {
free_nfs_cb_data(data);
return -ENOMEM;
}
data->offset = offset;
data->count = count;
if (nfsfh->ra.buf_count && nfsfh->ra.buf_offset <= offset &&
nfsfh->ra.buf_offset + nfsfh->ra.buf_count >= offset) {
/* serve request partially from cache */
size_t overlap = (nfsfh->ra.buf_offset + nfsfh->ra.buf_count) - offset;
if (overlap > count) count = overlap;
memcpy(data->buffer, nfsfh->ra.buf + (offset - nfsfh->ra.buf_offset), overlap);
offset += overlap;
count -= overlap;
}
} else {
data->offset = offset;
data->count = count;
count += nfsfh->ra.cur_ra;
data->count += nfsfh->ra.cur_ra;
}
data->max_offset = offset;
data->max_offset = data->offset;
/* chop requests into chunks of at most READMAX bytes if necessary.
* we send all reads in parallel so that performance is still good.
@ -2394,7 +2421,6 @@ static int nfs_pread_async_internal(struct nfs_context *nfs, struct nfsfh *nfsfh
mdata->data = data;
mdata->offset = offset;
mdata->count = readcount;
mdata->update_pos = update_pos;
nfs_fill_READ3args(&args, nfsfh, offset, readcount);
@ -2475,9 +2501,6 @@ static void nfs_pwrite_mcb(struct rpc_context *rpc, int status, void *command_da
} else {
uint64_t count = res->WRITE3res_u.resok.count;
if (mdata->update_pos)
data->nfsfh->offset += count;
if (count < mdata->count) {
if (count == 0) {
rpc_set_error(nfs->rpc, "NFS: Write failed. No bytes written!");
@ -2529,6 +2552,12 @@ static void nfs_pwrite_mcb(struct rpc_context *rpc, int status, void *command_da
return;
}
if (data->update_pos) {
data->nfsfh->offset = data->max_offset;
}
nfs_pagecache_put(&data->nfsfh->pagecache, data->offset, data->usrbuf, data->count);
data->cb(data->max_offset - data->offset, nfs, NULL, data->private_data);
free_nfs_cb_data(data);
@ -2539,7 +2568,6 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf
{
struct nfs_cb_data *data;
nfs_ra_invalidate(nfsfh);
data = malloc(sizeof(struct nfs_cb_data));
if (data == NULL) {
rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
@ -2551,6 +2579,7 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf
data->private_data = private_data;
data->nfsfh = nfsfh;
data->usrbuf = buf;
data->update_pos = update_pos;
/* hello, clang-analyzer */
assert(data->num_calls == 0);
@ -2560,6 +2589,7 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf
*/
data->max_offset = offset;
data->offset = offset;
data->count = count;
do {
uint64_t writecount = count;
@ -2584,7 +2614,6 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf
mdata->data = data;
mdata->offset = offset;
mdata->count = writecount;
mdata->update_pos = update_pos;
nfs_fill_WRITE3args(&args, nfsfh, offset, writecount, &buf[offset - data->offset]);
@ -2656,7 +2685,6 @@ int nfs_write_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t count
struct GETATTR3args args;
struct nfs_cb_data *data;
nfs_ra_invalidate(nfsfh);
data = malloc(sizeof(struct nfs_cb_data));
if (data == NULL) {
rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
@ -2867,7 +2895,7 @@ int nfs_ftruncate_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t l
struct nfs_cb_data *data;
SETATTR3args args;
nfs_ra_invalidate(nfsfh);
nfs_pagecache_invalidate(nfs, nfsfh);
data = malloc(sizeof(struct nfs_cb_data));
if (data == NULL) {
rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
@ -5393,6 +5421,14 @@ void nfs_set_gid(struct nfs_context *nfs, int gid) {
rpc_set_gid(nfs->rpc, gid);
}
void nfs_set_pagecache(struct nfs_context *nfs, uint32_t v) {
rpc_set_pagecache(nfs->rpc, v);
}
void nfs_set_pagecache_ttl(struct nfs_context *nfs, uint32_t v) {
rpc_set_pagecache_ttl(nfs->rpc, v);
}
void nfs_set_readahead(struct nfs_context *nfs, uint32_t v) {
rpc_set_readahead(nfs->rpc, v);
}