Revert some commits that cause subtle API issues

We can not have a static rpc->inbuf buffer since that will no longer guarantee
that the received buffer is valid for the duration of callbacks.

One of the problems is that if we issue new (sync) RPCs from within a
callback, that will overwrite and invalidate the receive buffer that
we passed to the callback.

Revert "init: do not leak rpc->inbuf"
This reverts commit f7bc4c8bb1.

Revert "socket: we have to use memmove in rpc_read_from_socket"
This reverts commit 24429e95b8.

Revert "socket: make rpc->inbuf static and simplify receive logic"
This reverts commit 7000a0aa04.
libnfs-4.0.0-vitalif
Ronnie Sahlberg 2015-12-24 11:46:46 -08:00
parent dc8d86628d
commit f681a2c167
3 changed files with 77 additions and 36 deletions

View File

@ -99,8 +99,8 @@ struct rpc_context {
struct rpc_queue waitpdu[HASHES];
uint32_t inpos;
uint32_t insize;
char *inbuf;
uint32_t inbuflen;
/* special fields for UDP, which can sometimes be BROADCASTed */
int is_udp;

View File

@ -70,13 +70,6 @@ struct rpc_context *rpc_init_context(void)
return NULL;
}
rpc->inbuflen = 2 * (NFS_MAX_XFER_SIZE + 4096);
rpc->inbuf = malloc(rpc->inbuflen);
if (rpc->inbuf == NULL) {
free(rpc);
return NULL;
}
rpc->auth = authunix_create_default();
if (rpc->auth == NULL) {
free(rpc->encodebuf);
@ -284,10 +277,20 @@ void rpc_destroy_context(struct rpc_context *rpc)
close(rpc->fd);
}
free(rpc->encodebuf);
free(rpc->inbuf);
free(rpc->error_string);
free(rpc->udp_dest);
if (rpc->encodebuf != NULL) {
free(rpc->encodebuf);
rpc->encodebuf = NULL;
}
if (rpc->error_string != NULL) {
free(rpc->error_string);
rpc->error_string = NULL;
}
if (rpc->udp_dest != NULL) {
free(rpc->udp_dest);
rpc->udp_dest = NULL;
}
rpc->magic = 0;
free(rpc);

View File

@ -213,6 +213,7 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
assert(rpc->magic == RPC_CONTEXT_MAGIC);
if (rpc->is_udp) {
char *buf;
int available;
socklen_t socklen = sizeof(rpc->udp_src);
@ -226,27 +227,69 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
return -1;
}
if (available > rpc->inbuflen) {
rpc_set_error(rpc, "RPC fragment too big");
buf = malloc(available);
if (buf == NULL) {
rpc_set_error(rpc, "Failed to malloc buffer for recvfrom");
return -1;
}
count = recvfrom(rpc->fd, rpc->inbuf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
count = recvfrom(rpc->fd, buf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
if (count < 0) {
rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno));
free(buf);
return -1;
}
if (rpc_process_pdu(rpc, rpc->inbuf, count) != 0) {
if (rpc_process_pdu(rpc, buf, count) != 0) {
rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU");
free(buf);
return -1;
}
free(buf);
return 0;
}
size = rpc->inbuflen - rpc->inpos;
/* read record marker, 4 bytes at the beginning of every pdu */
if (rpc->inbuf == NULL) {
rpc->insize = 4;
rpc->inbuf = malloc(rpc->insize);
if (rpc->inbuf == NULL) {
rpc_set_error(rpc, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno);
return -1;
}
}
if (rpc->inpos < 4) {
size = 4 - rpc->inpos;
count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, MSG_DONTWAIT);
if (count == -1) {
if (errno == EINTR || errno == EAGAIN) {
return 0;
}
rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
return -1;
}
rpc->inpos += count;
}
pdu_size = rpc_get_pdu_size(rpc->inbuf);
if (pdu_size > NFS_MAX_XFER_SIZE + 4096) {
rpc_set_error(rpc, "Incoming PDU exceeds limit of %d bytes.", NFS_MAX_XFER_SIZE + 4096);
return -1;
}
if (rpc->insize < pdu_size) {
rpc->inbuf = realloc(rpc->inbuf, pdu_size);
if (rpc->inbuf == NULL) {
rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
return -1;
}
rpc->insize = pdu_size;
}
size = rpc->insize - rpc->inpos;
count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, MSG_DONTWAIT);
if (count < 0) {
if (count == -1) {
if (errno == EINTR || errno == EAGAIN) {
return 0;
}
@ -261,29 +304,25 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
rpc->inpos += count;
while (rpc->inpos >= 4) {
pdu_size = rpc_get_pdu_size(rpc->inbuf);
if (pdu_size > NFS_MAX_XFER_SIZE + 4096) {
rpc_set_error(rpc, "Incoming PDU exceeds limit of %d bytes.", NFS_MAX_XFER_SIZE + 4096);
if (rpc->inpos == rpc->insize) {
char *buf = rpc->inbuf;
rpc->inbuf = NULL;
rpc->insize = 0;
rpc->inpos = 0;
if (rpc_process_pdu(rpc, buf, pdu_size) != 0) {
rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
return -1;
}
if (rpc->inpos >= pdu_size) {
if (rpc_process_pdu(rpc, rpc->inbuf, pdu_size) != 0) {
rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
return -1;
}
memmove(rpc->inbuf, rpc->inbuf + pdu_size, rpc->inpos - pdu_size);
rpc->inpos -= pdu_size;
continue;
}
break;
}
free(buf);
}
return 0;
}
int rpc_service(struct rpc_context *rpc, int revents)
{
assert(rpc->magic == RPC_CONTEXT_MAGIC);
@ -607,7 +646,6 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc)
}
rpc->fd = -1;
rpc->is_connected = 0;
rpc->inpos = 0;
if (rpc->outqueue.head) {
rpc->outqueue.head->written = 0;