Only process pdus once when propagating cancellations or errors

Signed-off-by: Earl Chew <earl_chew@yahoo.com>
libnfs-4.0.0-vitalif
Earl Chew 2017-05-10 19:20:34 -07:00
parent 2edbd14779
commit f986a53835
1 changed files with 33 additions and 30 deletions

View File

@ -229,31 +229,51 @@ char *rpc_get_error(struct rpc_context *rpc)
return rpc->error_string;
}
void rpc_error_all_pdus(struct rpc_context *rpc, const char *error)
static void rpc_purge_all_pdus(struct rpc_context *rpc, int status, const char *error)
{
struct rpc_queue outqueue;
struct rpc_pdu *pdu;
unsigned int i;
int i;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
while ((pdu = rpc->outqueue.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_ERROR, (void *)error, pdu->private_data);
rpc->outqueue.head = pdu->next;
/* Remove all entries from each queue before cancellation to prevent
* the callbacks manipulating entries that are about to be removed.
*
* This code assumes that the callbacks will not enqueue any new
* pdus when called.
*/
outqueue = rpc->outqueue;
rpc_reset_queue(&rpc->outqueue);
while ((pdu = outqueue.head) != NULL) {
outqueue.head = pdu->next;
pdu->next = NULL;
pdu->cb(rpc, status, (void *) error, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}
rpc->outqueue.tail = NULL;
for (i = 0; i < HASHES; i++) {
struct rpc_queue *q = &rpc->waitpdu[i];
struct rpc_queue waitqueue = rpc->waitpdu[i];
while((pdu = q->head) != NULL) {
pdu->cb(rpc, RPC_STATUS_ERROR, (void *)error,
pdu->private_data);
q->head = pdu->next;
rpc_reset_queue(&rpc->waitpdu[i]);
while((pdu = waitqueue.head) != NULL) {
waitqueue.head = pdu->next;
pdu->next = NULL;
pdu->cb(rpc, status, (void *) error, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}
q->tail = NULL;
}
assert(!rpc->outqueue.head);
for (i = 0; i < HASHES; i++)
assert(!rpc->waitpdu[i].head);
}
void rpc_error_all_pdus(struct rpc_context *rpc, const char *error)
{
rpc_purge_all_pdus(rpc, RPC_STATUS_ERROR, error);
}
static void rpc_free_fragment(struct rpc_fragment *fragment)
@ -301,26 +321,9 @@ int rpc_add_fragment(struct rpc_context *rpc, char *data, uint32_t size)
void rpc_destroy_context(struct rpc_context *rpc)
{
struct rpc_pdu *pdu;
unsigned int i;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
while((pdu = rpc->outqueue.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
LIBNFS_LIST_REMOVE(&rpc->outqueue.head, pdu);
rpc_free_pdu(rpc, pdu);
}
for (i = 0; i < HASHES; i++) {
struct rpc_queue *q = &rpc->waitpdu[i];
while((pdu = q->head) != NULL) {
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
LIBNFS_LIST_REMOVE(&q->head, pdu);
rpc_free_pdu(rpc, pdu);
}
}
rpc_purge_all_pdus(rpc, RPC_STATUS_CANCEL, NULL);
rpc_free_all_fragments(rpc);