Add a buffer for receipts

v1.1
Pierrick Charron 2012-11-18 11:09:14 -05:00
parent d83dfa7117
commit 5793a7efee
2 changed files with 93 additions and 28 deletions

98
stomp.c
View File

@ -59,10 +59,51 @@ stomp_t *stomp_init()
stomp->ssl_handle = NULL; stomp->ssl_handle = NULL;
#endif #endif
stomp->buffer = NULL;
return stomp; return stomp;
} }
/* }}} */ /* }}} */
/* {{{ stomp_frame_buffer_push
*/
void stomp_frame_buffer_push(stomp_frame_cell_t **pcell, stomp_frame_t *frame)
{
stomp_frame_cell_t *cell = (stomp_frame_cell_t *) emalloc(sizeof(stomp_frame_cell_t));
cell->frame = frame;
cell->next = NULL;
if (!*pcell) {
*pcell = cell;
} else {
stomp_frame_cell_t *cursor = *pcell;
while (cursor->next != NULL) cursor = cursor->next;
cursor->next = cell;
}
}
/* }}} */
/* {{{ stomp_frame_buffer_shift
*/
stomp_frame_t *stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) {
stomp_frame_t *frame = NULL;
if (*pcell) {
stomp_frame_cell_t *cell = *pcell;
*pcell = cell->next;
frame = cell->frame;
efree(cell);
}
return frame;
}
/* }}} */
/* {{{ stomp_frame_buffer_clear
*/
void stomp_frame_buffer_clear(stomp_frame_cell_t **pcell) {
stomp_frame_t *frame = NULL;
while (frame = stomp_frame_buffer_shift(pcell)) efree(frame);
}
/* }}} */
/* {{{ stomp_set_error /* {{{ stomp_set_error
*/ */
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details) void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details)
@ -205,7 +246,7 @@ void stomp_close(stomp_t *stomp)
if (stomp->error_details) { if (stomp->error_details) {
efree(stomp->error_details); efree(stomp->error_details);
} }
stomp_frame_buffer_clear(&stomp->buffer);
efree(stomp); efree(stomp);
} }
/* }}} */ /* }}} */
@ -451,6 +492,10 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
char *cmd = NULL, *length_str = NULL; char *cmd = NULL, *length_str = NULL;
int length = 0; int length = 0;
if (stomp->buffer) {
return stomp_frame_buffer_shift(&stomp->buffer);
}
if (!stomp_select(stomp)) { if (!stomp_select(stomp)) {
return NULL; return NULL;
} }
@ -535,30 +580,41 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
int success = 1; int success = 1;
char error[1024]; char error[1024];
char *receipt = NULL; char *receipt = NULL;
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) { if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
stomp_frame_t *res = stomp_read_frame(stomp); stomp_frame_cell_t *buffer = NULL;
success = 0; success = 0;
if (res) { while (1) {
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) { stomp_frame_t *res = stomp_read_frame(stomp);
char *receipt_id = NULL; if (res) {
if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
&& strlen(receipt) == strlen(receipt_id) char *receipt_id = NULL;
&& !strcmp(receipt, receipt_id)) { if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS
success = 1; && strlen(receipt) == strlen(receipt_id)
&& !strcmp(receipt, receipt_id)) {
success = 1;
} else {
snprintf(error, sizeof(error), "Unexpected receipt id : %s", receipt_id);
stomp_set_error(stomp, error, 0, NULL);
}
stomp_free_frame(res);
stomp->buffer = buffer;
return success;
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
stomp_set_error(stomp, error_msg, 0, res->body);
}
stomp_free_frame(res);
stomp->buffer = buffer;
return success;
} else { } else {
snprintf(error, sizeof(error), "Unexpected receipt id : %s", receipt_id); stomp_frame_buffer_push(&buffer, res);
stomp_set_error(stomp, error, 0, NULL);
}
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
stomp_set_error(stomp, error_msg, 0, res->body);
} }
} else { } else {
snprintf(error, sizeof(error), "Receipt not received, unexpected command : %s", res->command); stomp->buffer = buffer;
stomp_set_error(stomp, error, 0, NULL); return success;
} }
stomp_free_frame(res);
} }
} }
return success; return success;
@ -572,6 +628,10 @@ int stomp_select(stomp_t *stomp)
int n; int n;
struct timeval tv; struct timeval tv;
if (stomp->buffer) {
return 1;
}
tv.tv_sec = stomp->options.read_timeout_sec; tv.tv_sec = stomp->options.read_timeout_sec;
tv.tv_usec = stomp->options.read_timeout_usec; tv.tv_usec = stomp->options.read_timeout_usec;

23
stomp.h
View File

@ -45,6 +45,19 @@ typedef struct _stomp_options {
#endif #endif
} stomp_options_t; } stomp_options_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
typedef struct _stomp_frame_cell {
stomp_frame_t *frame;
struct _stomp_frame_cell *next;
} stomp_frame_cell_t;
typedef struct _stomp { typedef struct _stomp {
php_socket_t fd; php_socket_t fd;
php_sockaddr_storage localaddr; php_sockaddr_storage localaddr;
@ -59,17 +72,9 @@ typedef struct _stomp {
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
SSL *ssl_handle; SSL *ssl_handle;
#endif #endif
stomp_frame_cell_t *buffer;
} stomp_t; } stomp_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
stomp_t *stomp_init(); stomp_t *stomp_init();
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC); int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
void stomp_close(stomp_t *stomp); void stomp_close(stomp_t *stomp);