diff --git a/stomp.c b/stomp.c index fba3466..30ea164 100644 --- a/stomp.c +++ b/stomp.c @@ -59,10 +59,51 @@ stomp_t *stomp_init() stomp->ssl_handle = NULL; #endif + stomp->buffer = NULL; return stomp; } /* }}} */ +/* {{{ stomp_frame_buffer_push + */ +void stomp_frame_buffer_push(stomp_frame_cell_t **pcell, stomp_frame_t *frame) +{ + stomp_frame_cell_t *cell = (stomp_frame_cell_t *) emalloc(sizeof(stomp_frame_cell_t)); + cell->frame = frame; + cell->next = NULL; + + if (!*pcell) { + *pcell = cell; + } else { + stomp_frame_cell_t *cursor = *pcell; + while (cursor->next != NULL) cursor = cursor->next; + cursor->next = cell; + } +} +/* }}} */ + +/* {{{ stomp_frame_buffer_shift + */ +stomp_frame_t *stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) { + stomp_frame_t *frame = NULL; + if (*pcell) { + stomp_frame_cell_t *cell = *pcell; + *pcell = cell->next; + frame = cell->frame; + efree(cell); + } + return frame; +} +/* }}} */ + +/* {{{ stomp_frame_buffer_clear + */ +void stomp_frame_buffer_clear(stomp_frame_cell_t **pcell) { + stomp_frame_t *frame = NULL; + while (frame = stomp_frame_buffer_shift(pcell)) efree(frame); +} +/* }}} */ + /* {{{ stomp_set_error */ void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details) @@ -205,7 +246,7 @@ void stomp_close(stomp_t *stomp) if (stomp->error_details) { efree(stomp->error_details); } - + stomp_frame_buffer_clear(&stomp->buffer); efree(stomp); } /* }}} */ @@ -451,6 +492,10 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp) char *cmd = NULL, *length_str = NULL; int length = 0; + if (stomp->buffer) { + return stomp_frame_buffer_shift(&stomp->buffer); + } + if (!stomp_select(stomp)) { return NULL; } @@ -535,30 +580,41 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) { int success = 1; char error[1024]; char *receipt = NULL; + if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) { - stomp_frame_t *res = stomp_read_frame(stomp); + stomp_frame_cell_t *buffer = NULL; success = 0; - if (res) { - if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) { - char *receipt_id = NULL; - if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS - && strlen(receipt) == strlen(receipt_id) - && !strcmp(receipt, receipt_id)) { - success = 1; + while (1) { + stomp_frame_t *res = stomp_read_frame(stomp); + if (res) { + if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) { + char *receipt_id = NULL; + if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS + && strlen(receipt) == strlen(receipt_id) + && !strcmp(receipt, receipt_id)) { + success = 1; + } else { + snprintf(error, sizeof(error), "Unexpected receipt id : %s", receipt_id); + stomp_set_error(stomp, error, 0, NULL); + } + stomp_free_frame(res); + stomp->buffer = buffer; + return success; + } else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) { + char *error_msg = NULL; + if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) { + stomp_set_error(stomp, error_msg, 0, res->body); + } + stomp_free_frame(res); + stomp->buffer = buffer; + return success; } else { - snprintf(error, sizeof(error), "Unexpected receipt id : %s", receipt_id); - stomp_set_error(stomp, error, 0, NULL); - } - } else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) { - char *error_msg = NULL; - if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) { - stomp_set_error(stomp, error_msg, 0, res->body); + stomp_frame_buffer_push(&buffer, res); } } else { - snprintf(error, sizeof(error), "Receipt not received, unexpected command : %s", res->command); - stomp_set_error(stomp, error, 0, NULL); + stomp->buffer = buffer; + return success; } - stomp_free_frame(res); } } return success; @@ -572,6 +628,10 @@ int stomp_select(stomp_t *stomp) int n; struct timeval tv; + if (stomp->buffer) { + return 1; + } + tv.tv_sec = stomp->options.read_timeout_sec; tv.tv_usec = stomp->options.read_timeout_usec; diff --git a/stomp.h b/stomp.h index ebbfd1b..fcf7259 100755 --- a/stomp.h +++ b/stomp.h @@ -45,6 +45,19 @@ typedef struct _stomp_options { #endif } stomp_options_t; +typedef struct _stomp_frame { + char *command; + int command_length; + HashTable *headers; + char *body; + int body_length; +} stomp_frame_t; + +typedef struct _stomp_frame_cell { + stomp_frame_t *frame; + struct _stomp_frame_cell *next; +} stomp_frame_cell_t; + typedef struct _stomp { php_socket_t fd; php_sockaddr_storage localaddr; @@ -59,17 +72,9 @@ typedef struct _stomp { #if HAVE_STOMP_SSL SSL *ssl_handle; #endif + stomp_frame_cell_t *buffer; } stomp_t; - -typedef struct _stomp_frame { - char *command; - int command_length; - HashTable *headers; - char *body; - int body_length; -} stomp_frame_t; - stomp_t *stomp_init(); int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC); void stomp_close(stomp_t *stomp);