Revert some commits that cause subtle API issues
We can not have a static rpc->inbuf buffer since that will no longer guarantee that the received buffer is valid for the duration of callbacks. One of the problems is that if we issue new (sync) RPCs from within a callback, that will overwrite and invalidate the receive buffer that we passed to the callback. Revert "init: do not leak rpc->inbuf" This reverts commitlibnfs-4.0.0-vitaliff7bc4c8bb1
. Revert "socket: we have to use memmove in rpc_read_from_socket" This reverts commit24429e95b8
. Revert "socket: make rpc->inbuf static and simplify receive logic" This reverts commit7000a0aa04
.
parent
dc8d86628d
commit
f681a2c167
|
@ -99,8 +99,8 @@ struct rpc_context {
|
|||
struct rpc_queue waitpdu[HASHES];
|
||||
|
||||
uint32_t inpos;
|
||||
uint32_t insize;
|
||||
char *inbuf;
|
||||
uint32_t inbuflen;
|
||||
|
||||
/* special fields for UDP, which can sometimes be BROADCASTed */
|
||||
int is_udp;
|
||||
|
|
25
lib/init.c
25
lib/init.c
|
@ -70,13 +70,6 @@ struct rpc_context *rpc_init_context(void)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
rpc->inbuflen = 2 * (NFS_MAX_XFER_SIZE + 4096);
|
||||
rpc->inbuf = malloc(rpc->inbuflen);
|
||||
if (rpc->inbuf == NULL) {
|
||||
free(rpc);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rpc->auth = authunix_create_default();
|
||||
if (rpc->auth == NULL) {
|
||||
free(rpc->encodebuf);
|
||||
|
@ -284,10 +277,20 @@ void rpc_destroy_context(struct rpc_context *rpc)
|
|||
close(rpc->fd);
|
||||
}
|
||||
|
||||
free(rpc->encodebuf);
|
||||
free(rpc->inbuf);
|
||||
free(rpc->error_string);
|
||||
free(rpc->udp_dest);
|
||||
if (rpc->encodebuf != NULL) {
|
||||
free(rpc->encodebuf);
|
||||
rpc->encodebuf = NULL;
|
||||
}
|
||||
|
||||
if (rpc->error_string != NULL) {
|
||||
free(rpc->error_string);
|
||||
rpc->error_string = NULL;
|
||||
}
|
||||
|
||||
if (rpc->udp_dest != NULL) {
|
||||
free(rpc->udp_dest);
|
||||
rpc->udp_dest = NULL;
|
||||
}
|
||||
|
||||
rpc->magic = 0;
|
||||
free(rpc);
|
||||
|
|
86
lib/socket.c
86
lib/socket.c
|
@ -213,6 +213,7 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
|
|||
assert(rpc->magic == RPC_CONTEXT_MAGIC);
|
||||
|
||||
if (rpc->is_udp) {
|
||||
char *buf;
|
||||
int available;
|
||||
socklen_t socklen = sizeof(rpc->udp_src);
|
||||
|
||||
|
@ -226,27 +227,69 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (available > rpc->inbuflen) {
|
||||
rpc_set_error(rpc, "RPC fragment too big");
|
||||
buf = malloc(available);
|
||||
if (buf == NULL) {
|
||||
rpc_set_error(rpc, "Failed to malloc buffer for recvfrom");
|
||||
return -1;
|
||||
}
|
||||
|
||||
count = recvfrom(rpc->fd, rpc->inbuf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
|
||||
count = recvfrom(rpc->fd, buf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
|
||||
if (count < 0) {
|
||||
rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno));
|
||||
free(buf);
|
||||
return -1;
|
||||
}
|
||||
if (rpc_process_pdu(rpc, rpc->inbuf, count) != 0) {
|
||||
if (rpc_process_pdu(rpc, buf, count) != 0) {
|
||||
rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU");
|
||||
free(buf);
|
||||
return -1;
|
||||
}
|
||||
free(buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
size = rpc->inbuflen - rpc->inpos;
|
||||
/* read record marker, 4 bytes at the beginning of every pdu */
|
||||
if (rpc->inbuf == NULL) {
|
||||
rpc->insize = 4;
|
||||
rpc->inbuf = malloc(rpc->insize);
|
||||
if (rpc->inbuf == NULL) {
|
||||
rpc_set_error(rpc, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (rpc->inpos < 4) {
|
||||
size = 4 - rpc->inpos;
|
||||
|
||||
count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, MSG_DONTWAIT);
|
||||
if (count == -1) {
|
||||
if (errno == EINTR || errno == EAGAIN) {
|
||||
return 0;
|
||||
}
|
||||
rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
|
||||
return -1;
|
||||
}
|
||||
rpc->inpos += count;
|
||||
}
|
||||
|
||||
pdu_size = rpc_get_pdu_size(rpc->inbuf);
|
||||
|
||||
if (pdu_size > NFS_MAX_XFER_SIZE + 4096) {
|
||||
rpc_set_error(rpc, "Incoming PDU exceeds limit of %d bytes.", NFS_MAX_XFER_SIZE + 4096);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (rpc->insize < pdu_size) {
|
||||
rpc->inbuf = realloc(rpc->inbuf, pdu_size);
|
||||
if (rpc->inbuf == NULL) {
|
||||
rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
|
||||
return -1;
|
||||
}
|
||||
rpc->insize = pdu_size;
|
||||
}
|
||||
|
||||
size = rpc->insize - rpc->inpos;
|
||||
count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, MSG_DONTWAIT);
|
||||
if (count < 0) {
|
||||
if (count == -1) {
|
||||
if (errno == EINTR || errno == EAGAIN) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -261,29 +304,25 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
|
|||
|
||||
rpc->inpos += count;
|
||||
|
||||
while (rpc->inpos >= 4) {
|
||||
pdu_size = rpc_get_pdu_size(rpc->inbuf);
|
||||
if (pdu_size > NFS_MAX_XFER_SIZE + 4096) {
|
||||
rpc_set_error(rpc, "Incoming PDU exceeds limit of %d bytes.", NFS_MAX_XFER_SIZE + 4096);
|
||||
if (rpc->inpos == rpc->insize) {
|
||||
char *buf = rpc->inbuf;
|
||||
|
||||
rpc->inbuf = NULL;
|
||||
rpc->insize = 0;
|
||||
rpc->inpos = 0;
|
||||
|
||||
if (rpc_process_pdu(rpc, buf, pdu_size) != 0) {
|
||||
rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (rpc->inpos >= pdu_size) {
|
||||
if (rpc_process_pdu(rpc, rpc->inbuf, pdu_size) != 0) {
|
||||
rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
|
||||
return -1;
|
||||
}
|
||||
memmove(rpc->inbuf, rpc->inbuf + pdu_size, rpc->inpos - pdu_size);
|
||||
rpc->inpos -= pdu_size;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
free(buf);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int rpc_service(struct rpc_context *rpc, int revents)
|
||||
{
|
||||
assert(rpc->magic == RPC_CONTEXT_MAGIC);
|
||||
|
@ -607,7 +646,6 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc)
|
|||
}
|
||||
rpc->fd = -1;
|
||||
rpc->is_connected = 0;
|
||||
rpc->inpos = 0;
|
||||
|
||||
if (rpc->outqueue.head) {
|
||||
rpc->outqueue.head->written = 0;
|
||||
|
|
Loading…
Reference in New Issue