Merge branch 'read_buffer'
* read_buffer: Reimplement read_line and read_buffer read buffer implementation on top of stomp_recvv1.1
commit
d705083ab3
216
stomp.c
216
stomp.c
|
@ -60,6 +60,7 @@ stomp_t *stomp_init()
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
stomp->frame_stack = NULL;
|
stomp->frame_stack = NULL;
|
||||||
|
stomp->read_buffer.size = 0;
|
||||||
return stomp;
|
return stomp;
|
||||||
}
|
}
|
||||||
/* }}} */
|
/* }}} */
|
||||||
|
@ -336,10 +337,12 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
|
||||||
|
|
||||||
/* {{{ stomp_recv
|
/* {{{ stomp_recv
|
||||||
*/
|
*/
|
||||||
int stomp_recv(stomp_t *stomp, char *msg, size_t length)
|
static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length)
|
||||||
{
|
{
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
|
stomp_select(stomp);
|
||||||
|
|
||||||
#if HAVE_STOMP_SSL
|
#if HAVE_STOMP_SSL
|
||||||
if(stomp->options.use_ssl) {
|
if(stomp->options.use_ssl) {
|
||||||
len = SSL_read(stomp->ssl_handle, msg, length);
|
len = SSL_read(stomp->ssl_handle, msg, length);
|
||||||
|
@ -357,61 +360,124 @@ int stomp_recv(stomp_t *stomp, char *msg, size_t length)
|
||||||
}
|
}
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int stomp_recv(stomp_t *stomp, char *msg, const size_t length)
|
||||||
|
{
|
||||||
|
if (stomp->read_buffer.size == 0) {
|
||||||
|
if (length >= STOMP_BUFSIZE) {
|
||||||
|
return _stomp_recv(stomp, msg, length);
|
||||||
|
} else {
|
||||||
|
int recv_size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
|
||||||
|
if (recv_size <= length) {
|
||||||
|
memcpy(msg, stomp->read_buffer.buf, recv_size);
|
||||||
|
return recv_size;
|
||||||
|
} else {
|
||||||
|
memcpy(msg, stomp->read_buffer.buf, length);
|
||||||
|
stomp->read_buffer.pos = stomp->read_buffer.buf + length;
|
||||||
|
stomp->read_buffer.size = recv_size - length;
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (stomp->read_buffer.size >= length) {
|
||||||
|
memcpy(msg, stomp->read_buffer.pos, length);
|
||||||
|
stomp->read_buffer.pos += length;
|
||||||
|
stomp->read_buffer.size -= length;
|
||||||
|
return length;
|
||||||
|
} else {
|
||||||
|
int len = stomp->read_buffer.size;
|
||||||
|
memcpy(msg, stomp->read_buffer.pos, stomp->read_buffer.size);
|
||||||
|
stomp->read_buffer.size = 0;
|
||||||
|
if (stomp_select_ex(stomp, 0, 0)) {
|
||||||
|
return len + stomp_recv(stomp, msg + len, length - len);
|
||||||
|
} else {
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* }}} */
|
||||||
|
|
||||||
|
/* {{{ _stomp_read_until
|
||||||
|
*/
|
||||||
|
static size_t _stomp_read_until(stomp_t *stomp, char **data, const char delimiter)
|
||||||
|
{
|
||||||
|
size_t length = 0;
|
||||||
|
size_t bufsize = STOMP_BUFSIZE;
|
||||||
|
char *buffer = (char *) emalloc(STOMP_BUFSIZE);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int i, found;
|
||||||
|
found = 0;
|
||||||
|
char *c;
|
||||||
|
|
||||||
|
// First populate the buffer
|
||||||
|
if (stomp->read_buffer.size == 0) {
|
||||||
|
stomp->read_buffer.size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
|
||||||
|
stomp->read_buffer.pos = stomp->read_buffer.buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then search the delimiter
|
||||||
|
c = stomp->read_buffer.pos;
|
||||||
|
for (i = 1; i <= stomp->read_buffer.size ; i++) {
|
||||||
|
if (*c == delimiter) {
|
||||||
|
found = 1;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
c++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found) i--;
|
||||||
|
|
||||||
|
// Make sure we have enough place in the buffer
|
||||||
|
if ((i+length) >= bufsize) {
|
||||||
|
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||||
|
bufsize += STOMP_BUFSIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy and update the buffer
|
||||||
|
memcpy(buffer + length, stomp->read_buffer.pos, i);
|
||||||
|
length += i;
|
||||||
|
stomp->read_buffer.pos += i;
|
||||||
|
stomp->read_buffer.size -= i;
|
||||||
|
|
||||||
|
if (found) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (length) {
|
||||||
|
*data = buffer;
|
||||||
|
} else {
|
||||||
|
efree(buffer);
|
||||||
|
*data = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return length;
|
||||||
|
}
|
||||||
/* }}} */
|
/* }}} */
|
||||||
|
|
||||||
/* {{{ stomp_read_buffer
|
/* {{{ stomp_read_buffer
|
||||||
*/
|
*/
|
||||||
static int stomp_read_buffer(stomp_t *stomp, char **data)
|
static size_t stomp_read_buffer(stomp_t *stomp, char **data)
|
||||||
{
|
{
|
||||||
int rc = 0;
|
size_t length = _stomp_read_until(stomp, data, 0);
|
||||||
size_t i = 0;
|
if (stomp_select_ex(stomp, 0, 0)) {
|
||||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
char endline[1];
|
||||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
|
||||||
|
if (*data) {
|
||||||
while (1) {
|
efree(*data);
|
||||||
|
*data = NULL;
|
||||||
size_t length = 1;
|
|
||||||
rc = stomp_recv(stomp, buffer + i, length);
|
|
||||||
if (rc < 1) {
|
|
||||||
efree(buffer);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (1 == length) {
|
|
||||||
i++;
|
|
||||||
|
|
||||||
if (buffer[i-1] == 0) {
|
|
||||||
if (stomp_select_ex(stomp, 0, 0)) {
|
|
||||||
char endline[1];
|
|
||||||
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
|
|
||||||
efree(buffer);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
if (i >= bufsize) {
|
|
||||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
|
||||||
bufsize += STOMP_BUFSIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (length > 1) {
|
||||||
if (i > 1) {
|
length --;
|
||||||
*data = (char *) emalloc(i);
|
} else if (length) {
|
||||||
if (NULL == *data) {
|
efree(*data);
|
||||||
efree(buffer);
|
*data = NULL;
|
||||||
return -1;
|
length = 0;
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(*data, buffer, i);
|
|
||||||
}
|
}
|
||||||
|
return length;
|
||||||
efree(buffer);
|
|
||||||
|
|
||||||
return i-1;
|
|
||||||
}
|
}
|
||||||
/* }}} */
|
/* }}} */
|
||||||
|
|
||||||
|
@ -419,52 +485,16 @@ static int stomp_read_buffer(stomp_t *stomp, char **data)
|
||||||
*/
|
*/
|
||||||
static int stomp_read_line(stomp_t *stomp, char **data)
|
static int stomp_read_line(stomp_t *stomp, char **data)
|
||||||
{
|
{
|
||||||
int rc = 0;
|
size_t length = _stomp_read_until(stomp, data, '\n');
|
||||||
size_t i = 0;
|
if (length > 1) {
|
||||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
(*data)[length - 1] = 0;
|
||||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
length--;
|
||||||
|
} else if (length) {
|
||||||
while (1) {
|
efree(*data);
|
||||||
|
*data = NULL;
|
||||||
size_t length = 1;
|
length = 0;
|
||||||
rc = stomp_recv(stomp, buffer + i, length);
|
|
||||||
if (rc < 1) {
|
|
||||||
efree(buffer);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (1 == length) {
|
|
||||||
i++;
|
|
||||||
|
|
||||||
if (buffer[i-1] == '\n') {
|
|
||||||
buffer[i-1] = 0;
|
|
||||||
break;
|
|
||||||
} else if (buffer[i-1] == 0) {
|
|
||||||
efree(buffer);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (i >= bufsize) {
|
|
||||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
|
||||||
bufsize += STOMP_BUFSIZE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
return length;
|
||||||
if (i > 1) {
|
|
||||||
*data = (char *) emalloc(i);
|
|
||||||
if (NULL == *data) {
|
|
||||||
efree(buffer);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(*data, buffer, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
efree(buffer);
|
|
||||||
|
|
||||||
return i-1;
|
|
||||||
}
|
}
|
||||||
/* }}} */
|
/* }}} */
|
||||||
|
|
||||||
|
@ -639,7 +669,7 @@ int stomp_select_ex(stomp_t *stomp, const long int sec, const long int usec)
|
||||||
int n;
|
int n;
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
|
|
||||||
if (stomp->frame_stack) {
|
if (stomp->read_buffer.size || stomp->frame_stack) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
tv.tv_sec = sec;
|
tv.tv_sec = sec;
|
||||||
|
|
5
stomp.h
5
stomp.h
|
@ -73,6 +73,11 @@ typedef struct _stomp {
|
||||||
SSL *ssl_handle;
|
SSL *ssl_handle;
|
||||||
#endif
|
#endif
|
||||||
stomp_frame_stack_t *frame_stack;
|
stomp_frame_stack_t *frame_stack;
|
||||||
|
struct {
|
||||||
|
size_t size;
|
||||||
|
char buf[STOMP_BUFSIZE];
|
||||||
|
char *pos;
|
||||||
|
} read_buffer;
|
||||||
} stomp_t;
|
} stomp_t;
|
||||||
|
|
||||||
stomp_t *stomp_init();
|
stomp_t *stomp_init();
|
||||||
|
|
Loading…
Reference in New Issue