Merge pull request #82 from plieven/readahead

add readahead support
libnfs-4.0.0-vitalif
Ronnie Sahlberg 2014-06-29 14:41:28 -07:00
commit be243cfa9b
5 changed files with 139 additions and 24 deletions

View File

@ -73,6 +73,7 @@ struct rpc_queue {
};
#define HASHES 1024
#define NFS_RA_TIMEOUT 5
struct rpc_context {
uint32_t magic;
@ -115,6 +116,7 @@ struct rpc_context {
int tcp_syncnt;
int uid;
int gid;
uint32_t readahead;
};
struct rpc_pdu {
@ -174,6 +176,7 @@ void rpc_unset_autoreconnect(struct rpc_context *rpc);
void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v);
void rpc_set_uid(struct rpc_context *rpc, int uid);
void rpc_set_gid(struct rpc_context *rpc, int gid);
void rpc_set_readahead(struct rpc_context *rpc, uint32_t v);
int rpc_add_fragment(struct rpc_context *rpc, char *data, uint64_t size);
void rpc_free_all_fragments(struct rpc_context *rpc);

View File

@ -36,6 +36,9 @@
extern "C" {
#endif
#define LIBNFS_FEATURE_READAHEAD
#define NFS_BLKSIZE 4096
struct nfs_context;
struct rpc_context;
@ -184,6 +187,7 @@ EXTERN uint64_t nfs_get_writemax(struct nfs_context *nfs);
EXTERN void nfs_set_tcp_syncnt(struct nfs_context *nfs, int v);
EXTERN void nfs_set_uid(struct nfs_context *nfs, int uid);
EXTERN void nfs_set_gid(struct nfs_context *nfs, int gid);
EXTERN void nfs_set_readahead(struct nfs_context *nfs, uint32_t v);
/*
* MOUNT THE EXPORT

View File

@ -94,6 +94,12 @@ struct rpc_context *rpc_init_context(void)
return rpc;
}
void rpc_set_readahead(struct rpc_context *rpc, uint32_t v)
{
assert(rpc->magic == RPC_CONTEXT_MAGIC);
rpc->readahead = v;
}
struct rpc_context *rpc_init_udp_context(void)
{

View File

@ -70,6 +70,7 @@ nfs_set_auth
nfs_set_gid
nfs_set_tcp_syncnt
nfs_set_uid
nfs_set_readahead
nfs_stat
nfs_stat_async
nfs_stat64

View File

@ -88,11 +88,22 @@ struct nfsdir {
struct nfsdirent *current;
};
struct nfs_readahead {
uint64_t fh_offset;
uint64_t last_offset;
uint64_t buf_offset;
uint64_t buf_count;
time_t buf_ts;
void *buf;
uint32_t cur_ra;
};
struct nfsfh {
struct nfs_fh3 fh;
int is_sync;
int is_append;
uint64_t offset;
struct nfs_readahead ra;
};
struct nfs_context {
@ -173,9 +184,8 @@ struct nfs_cb_data {
int cancel;
int oom;
int num_calls;
uint64_t start_offset, max_offset, count;
uint64_t offset, count, max_offset, org_offset, org_count;
char *buffer;
size_t request_size;
char *usrbuf;
};
@ -220,12 +230,14 @@ char *nfs_get_error(struct nfs_context *nfs)
static int nfs_set_context_args(struct nfs_context *nfs, char *arg, char *val)
{
if (!strncmp(arg, "tcp-syncnt", 10)) {
if (!strcmp(arg, "tcp-syncnt")) {
rpc_set_tcp_syncnt(nfs_get_rpc_context(nfs), atoi(val));
} else if (!strncmp(arg, "uid", 3)) {
} else if (!strcmp(arg, "uid")) {
rpc_set_uid(nfs_get_rpc_context(nfs), atoi(val));
} else if (!strncmp(arg, "gid", 3)) {
} else if (!strcmp(arg, "gid")) {
rpc_set_gid(nfs_get_rpc_context(nfs), atoi(val));
} else if (!strcmp(arg, "readahaed")) {
rpc_set_readahead(nfs_get_rpc_context(nfs), atoi(val));
}
return 0;
}
@ -1237,8 +1249,8 @@ static void nfs_stat_1_cb(struct rpc_context *rpc, int status, void *command_dat
st.st_rdev = 0;
st.st_size = res->GETATTR3res_u.resok.obj_attributes.size;
#ifndef WIN32
st.st_blksize = 4096;
st.st_blocks = res->GETATTR3res_u.resok.obj_attributes.size / 4096;
st.st_blksize = NFS_BLKSIZE;
st.st_blocks = res->GETATTR3res_u.resok.obj_attributes.size / NFS_BLKSIZE;
#endif//WIN32
st.st_atime = res->GETATTR3res_u.resok.obj_attributes.atime.seconds;
st.st_mtime = res->GETATTR3res_u.resok.obj_attributes.mtime.seconds;
@ -1621,9 +1633,9 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat
/* if we have more than one call or we have received a short read we need a reassembly buffer */
if (data->num_calls || (count < mdata->count && !res->READ3res_u.resok.eof)) {
if (data->buffer == NULL) {
data->buffer = malloc(data->request_size);
data->buffer = malloc(data->count);
if (data->buffer == NULL) {
rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)data->request_size);
rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)data->count);
data->oom = 1;
}
}
@ -1632,7 +1644,7 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat
if (count <= mdata->count) {
/* copy data into reassembly buffer if we have one */
if (data->buffer != NULL) {
memcpy(&data->buffer[mdata->offset - data->start_offset], res->READ3res_u.resok.data.data_val, count);
memcpy(&data->buffer[mdata->offset - data->offset], res->READ3res_u.resok.data.data_val, count);
}
if (data->max_offset < mdata->offset + count) {
data->max_offset = mdata->offset + count;
@ -1688,14 +1700,35 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat
}
if (data->buffer) {
data->cb(data->max_offset - data->start_offset, nfs, data->buffer, data->private_data);
if (data->max_offset > data->org_offset + data->org_count) {
data->max_offset = data->org_offset + data->org_count;
}
data->cb(data->max_offset - data->org_offset, nfs, data->buffer + (data->org_offset - data->offset), data->private_data);
} else {
data->cb(res->READ3res_u.resok.count, nfs, res->READ3res_u.resok.data.data_val, data->private_data);
}
data->nfsfh->ra.fh_offset = data->max_offset;
if (data->nfsfh->ra.cur_ra) {
free(data->nfsfh->ra.buf);
data->nfsfh->ra.buf = data->buffer;
data->nfsfh->ra.buf_offset = data->offset;
data->nfsfh->ra.buf_count = data->count;
data->nfsfh->ra.buf_ts = time(NULL);
data->buffer = NULL;
}
free_nfs_cb_data(data);
}
static void nfs_ra_invalidate(struct nfsfh *nfsfh) {
free(nfsfh->ra.buf);
nfsfh->ra.buf = NULL;
nfsfh->ra.buf_offset = 0;
nfsfh->ra.buf_count = 0;
nfsfh->ra.buf_ts = time(NULL);
nfsfh->ra.cur_ra = NFS_BLKSIZE;
}
static int nfs_pread_async_internal(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offset, uint64_t count, nfs_cb cb, void *private_data, int update_pos)
{
struct nfs_cb_data *data;
@ -1710,16 +1743,78 @@ static int nfs_pread_async_internal(struct nfs_context *nfs, struct nfsfh *nfsfh
data->cb = cb;
data->private_data = private_data;
data->nfsfh = nfsfh;
data->request_size = count;
data->org_offset = offset;
data->org_count = count;
assert(data->num_calls == 0);
if (nfs->rpc->readahead && time(NULL) - nfsfh->ra.buf_ts > NFS_RA_TIMEOUT) {
/* readahead cache timeout */
nfs_ra_invalidate(nfsfh);
}
if (nfs->rpc->readahead) {
if (offset >= nfsfh->ra.last_offset &&
offset - NFS_BLKSIZE <= nfsfh->ra.fh_offset + nfsfh->ra.cur_ra) {
if (nfs->rpc->readahead > nfsfh->ra.cur_ra) {
nfsfh->ra.cur_ra <<= 1;
}
} else {
nfsfh->ra.cur_ra = NFS_BLKSIZE;
}
nfsfh->ra.last_offset = offset;
if (nfsfh->ra.buf_offset <= offset &&
nfsfh->ra.buf_offset + nfsfh->ra.buf_count >= offset + count) {
/* serve request completely from cache */
data->buffer = malloc(count);
if (data->buffer == NULL) {
free_nfs_cb_data(data);
return -ENOMEM;
}
memcpy(data->buffer, nfsfh->ra.buf + (offset - nfsfh->ra.buf_offset), count);
data->cb(count, nfs, data->buffer, data->private_data);
nfsfh->ra.fh_offset = offset + count;
free_nfs_cb_data(data);
return 0;
}
/* align start offset to blocksize */
count += offset & (NFS_BLKSIZE - 1);
offset &= ~(NFS_BLKSIZE - 1);
/* align end offset to blocksize and add readahead */
count += nfsfh->ra.cur_ra - 1;
count &= ~(NFS_BLKSIZE - 1);
data->buffer = malloc(count);
if (data->buffer == NULL) {
free_nfs_cb_data(data);
return -ENOMEM;
}
data->offset = offset;
data->count = count;
if (nfsfh->ra.buf_count && nfsfh->ra.buf_offset <= offset &&
nfsfh->ra.buf_offset + nfsfh->ra.buf_count >= offset) {
/* serve request partially from cache */
size_t overlap = (nfsfh->ra.buf_offset + nfsfh->ra.buf_count) - offset;
if (overlap > count) count = overlap;
memcpy(data->buffer, nfsfh->ra.buf + (offset - nfsfh->ra.buf_offset), overlap);
offset += overlap;
count -= overlap;
}
} else {
data->offset = offset;
data->count = count;
}
data->max_offset = offset;
/* chop requests into chunks of at most READMAX bytes if necessary.
* we send all reads in parallel so that performance is still good.
*/
data->max_offset = offset;
data->start_offset = offset;
do {
uint64_t readcount = count;
struct nfs_mcb_data *mdata;
@ -1838,7 +1933,7 @@ static void nfs_pwrite_mcb(struct rpc_context *rpc, int status, void *command_da
mdata->count -= count;
nfs_fill_WRITE3args(&args, data->nfsfh, mdata->offset, mdata->count,
&data->usrbuf[mdata->offset - data->start_offset]);
&data->usrbuf[mdata->offset - data->offset]);
if (rpc_nfs3_write_async(nfs->rpc, nfs_pwrite_mcb, &args, mdata) == 0) {
data->num_calls++;
return;
@ -1878,7 +1973,7 @@ static void nfs_pwrite_mcb(struct rpc_context *rpc, int status, void *command_da
return;
}
data->cb(data->max_offset - data->start_offset, nfs, NULL, data->private_data);
data->cb(data->max_offset - data->offset, nfs, NULL, data->private_data);
free_nfs_cb_data(data);
}
@ -1907,7 +2002,7 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf
* we send all writes in parallel so that performance is still good.
*/
data->max_offset = offset;
data->start_offset = offset;
data->offset = offset;
do {
uint64_t writecount = count;
@ -1934,7 +2029,7 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf
mdata->count = writecount;
mdata->update_pos = update_pos;
nfs_fill_WRITE3args(&args, nfsfh, offset, writecount, &buf[offset - data->start_offset]);
nfs_fill_WRITE3args(&args, nfsfh, offset, writecount, &buf[offset - data->offset]);
if (rpc_nfs3_write_async(nfs->rpc, nfs_pwrite_mcb, &args, mdata) != 0) {
rpc_set_error(nfs->rpc, "RPC error: Failed to send WRITE call for %s", data->path);
@ -2000,6 +2095,7 @@ static void nfs_write_append_cb(struct rpc_context *rpc, int status, void *comma
int nfs_write_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t count, char *buf, nfs_cb cb, void *private_data)
{
nfs_ra_invalidate(nfsfh);
if (nfsfh->is_append) {
struct GETATTR3args args;
struct nfs_cb_data *data;
@ -2043,6 +2139,7 @@ int nfs_close_async(struct nfs_context *nfs, struct nfsfh *nfsfh, nfs_cb cb, voi
free(nfsfh->fh.data.data_val);
nfsfh->fh.data.data_val = NULL;
}
free(nfsfh->ra.buf);
free(nfsfh);
cb(0, nfs, NULL, private_data);
@ -3371,11 +3468,11 @@ static void nfs_statvfs_1_cb(struct rpc_context *rpc, int status, void *command_
return;
}
svfs.f_bsize = 4096;
svfs.f_frsize = 4096;
svfs.f_blocks = res->FSSTAT3res_u.resok.tbytes/4096;
svfs.f_bfree = res->FSSTAT3res_u.resok.fbytes/4096;
svfs.f_bavail = res->FSSTAT3res_u.resok.abytes/4096;
svfs.f_bsize = NFS_BLKSIZE;
svfs.f_frsize = NFS_BLKSIZE;
svfs.f_blocks = res->FSSTAT3res_u.resok.tbytes/NFS_BLKSIZE;
svfs.f_bfree = res->FSSTAT3res_u.resok.fbytes/NFS_BLKSIZE;
svfs.f_bavail = res->FSSTAT3res_u.resok.abytes/NFS_BLKSIZE;
svfs.f_files = res->FSSTAT3res_u.resok.tfiles;
svfs.f_ffree = res->FSSTAT3res_u.resok.ffiles;
#if !defined(ANDROID)
@ -4413,6 +4510,10 @@ void nfs_set_gid(struct nfs_context *nfs, int gid) {
rpc_set_gid(nfs->rpc, gid);
}
void nfs_set_readahead(struct nfs_context *nfs, uint32_t v) {
rpc_set_readahead(nfs->rpc, v);
}
void nfs_set_error(struct nfs_context *nfs, char *error_string, ...)
{
va_list ap;