diff --git a/stomp.c b/stomp.c index 357020e..b7fec1c 100644 --- a/stomp.c +++ b/stomp.c @@ -60,6 +60,7 @@ stomp_t *stomp_init() #endif stomp->frame_stack = NULL; + stomp->read_buffer.size = 0; return stomp; } /* }}} */ @@ -336,10 +337,12 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC) /* {{{ stomp_recv */ -int stomp_recv(stomp_t *stomp, char *msg, size_t length) +static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length) { int len; + stomp_select(stomp); + #if HAVE_STOMP_SSL if(stomp->options.use_ssl) { len = SSL_read(stomp->ssl_handle, msg, length); @@ -357,61 +360,124 @@ int stomp_recv(stomp_t *stomp, char *msg, size_t length) } return len; } + +int stomp_recv(stomp_t *stomp, char *msg, const size_t length) +{ + if (stomp->read_buffer.size == 0) { + if (length >= STOMP_BUFSIZE) { + return _stomp_recv(stomp, msg, length); + } else { + int recv_size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE); + if (recv_size <= length) { + memcpy(msg, stomp->read_buffer.buf, recv_size); + return recv_size; + } else { + memcpy(msg, stomp->read_buffer.buf, length); + stomp->read_buffer.pos = stomp->read_buffer.buf + length; + stomp->read_buffer.size = recv_size - length; + return length; + } + } + } else if (stomp->read_buffer.size >= length) { + memcpy(msg, stomp->read_buffer.pos, length); + stomp->read_buffer.pos += length; + stomp->read_buffer.size -= length; + return length; + } else { + int len = stomp->read_buffer.size; + memcpy(msg, stomp->read_buffer.pos, stomp->read_buffer.size); + stomp->read_buffer.size = 0; + if (stomp_select_ex(stomp, 0, 0)) { + return len + stomp_recv(stomp, msg + len, length - len); + } else { + return len; + } + } +} +/* }}} */ + +/* {{{ _stomp_read_until + */ +static size_t _stomp_read_until(stomp_t *stomp, char **data, const char delimiter) +{ + size_t length = 0; + size_t bufsize = STOMP_BUFSIZE; + char *buffer = (char *) emalloc(STOMP_BUFSIZE); + + while (1) { + int i, found; + found = 0; + char *c; + + // First populate the buffer + if (stomp->read_buffer.size == 0) { + stomp->read_buffer.size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE); + stomp->read_buffer.pos = stomp->read_buffer.buf; + } + + // Then search the delimiter + c = stomp->read_buffer.pos; + for (i = 1; i <= stomp->read_buffer.size ; i++) { + if (*c == delimiter) { + found = 1; + break; + } else { + c++; + } + } + if (!found) i--; + + // Make sure we have enough place in the buffer + if ((i+length) >= bufsize) { + buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE); + bufsize += STOMP_BUFSIZE; + } + + // Copy and update the buffer + memcpy(buffer + length, stomp->read_buffer.pos, i); + length += i; + stomp->read_buffer.pos += i; + stomp->read_buffer.size -= i; + + if (found) { + break; + } + } + + if (length) { + *data = buffer; + } else { + efree(buffer); + *data = NULL; + } + + return length; +} /* }}} */ /* {{{ stomp_read_buffer */ -static int stomp_read_buffer(stomp_t *stomp, char **data) +static size_t stomp_read_buffer(stomp_t *stomp, char **data) { - int rc = 0; - size_t i = 0; - size_t bufsize = STOMP_BUFSIZE + 1; - char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1); - - while (1) { - - size_t length = 1; - rc = stomp_recv(stomp, buffer + i, length); - if (rc < 1) { - efree(buffer); - return -1; - } - - if (1 == length) { - i++; - - if (buffer[i-1] == 0) { - if (stomp_select_ex(stomp, 0, 0)) { - char endline[1]; - if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) { - efree(buffer); - return 0; - } - } - break; + size_t length = _stomp_read_until(stomp, data, 0); + if (stomp_select_ex(stomp, 0, 0)) { + char endline[1]; + if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) { + if (*data) { + efree(*data); + *data = NULL; } - - if (i >= bufsize) { - buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE); - bufsize += STOMP_BUFSIZE; - } - + return 0; } } - - if (i > 1) { - *data = (char *) emalloc(i); - if (NULL == *data) { - efree(buffer); - return -1; - } - - memcpy(*data, buffer, i); + if (length > 1) { + length --; + } else if (length) { + efree(*data); + *data = NULL; + length = 0; } - - efree(buffer); - - return i-1; + return length; } /* }}} */ @@ -419,52 +485,16 @@ static int stomp_read_buffer(stomp_t *stomp, char **data) */ static int stomp_read_line(stomp_t *stomp, char **data) { - int rc = 0; - size_t i = 0; - size_t bufsize = STOMP_BUFSIZE + 1; - char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1); - - while (1) { - - size_t length = 1; - rc = stomp_recv(stomp, buffer + i, length); - if (rc < 1) { - efree(buffer); - return -1; - } - - if (1 == length) { - i++; - - if (buffer[i-1] == '\n') { - buffer[i-1] = 0; - break; - } else if (buffer[i-1] == 0) { - efree(buffer); - return 0; - } - - if (i >= bufsize) { - buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE); - bufsize += STOMP_BUFSIZE; - } - } - + size_t length = _stomp_read_until(stomp, data, '\n'); + if (length > 1) { + (*data)[length - 1] = 0; + length--; + } else if (length) { + efree(*data); + *data = NULL; + length = 0; } - - if (i > 1) { - *data = (char *) emalloc(i); - if (NULL == *data) { - efree(buffer); - return -1; - } - - memcpy(*data, buffer, i); - } - - efree(buffer); - - return i-1; + return length; } /* }}} */ @@ -639,7 +669,7 @@ int stomp_select_ex(stomp_t *stomp, const long int sec, const long int usec) int n; struct timeval tv; - if (stomp->frame_stack) { + if (stomp->read_buffer.size || stomp->frame_stack) { return 1; } tv.tv_sec = sec; diff --git a/stomp.h b/stomp.h index 8f31a4f..a973160 100755 --- a/stomp.h +++ b/stomp.h @@ -73,6 +73,11 @@ typedef struct _stomp { SSL *ssl_handle; #endif stomp_frame_stack_t *frame_stack; + struct { + size_t size; + char buf[STOMP_BUFSIZE]; + char *pos; + } read_buffer; } stomp_t; stomp_t *stomp_init();