mirror of
https://github.com/vitalif/pecl-tools-stomp
synced 2019-04-16 04:14:55 +03:00
Compare commits
17 Commits
release_0_
...
release_0_
Author | SHA1 | Date | |
---|---|---|---|
![]() |
54e91dcec2 | ||
![]() |
b3743c3f77 | ||
![]() |
e2a724ed9b | ||
![]() |
3b149a22b1 | ||
![]() |
ec0f049612 | ||
![]() |
3e73da930b | ||
![]() |
f50e766836 | ||
![]() |
7d0911ffd5 | ||
![]() |
8a19087270 | ||
![]() |
7c135b9664 | ||
![]() |
b986c891d8 | ||
![]() |
d1934cf31b | ||
![]() |
26427f95cd | ||
![]() |
5940fe8264 | ||
![]() |
c7ea4d16e0 | ||
![]() |
332a4d8a05 | ||
![]() |
db3abd3904 |
21
package.xml
21
package.xml
@@ -12,14 +12,12 @@ This extension allows php applications to communicate with any Stomp compliant M
|
||||
<email>pierrick@php.net</email>
|
||||
<active>yes</active>
|
||||
</lead>
|
||||
<date>2009-11-22</date>
|
||||
<version><release>0.3.2</release><api>0.3.2</api></version>
|
||||
<date>2010-01-17</date>
|
||||
<version><release>0.4.0</release><api>0.4.0</api></version>
|
||||
<stability><release>beta</release><api>beta</api></stability>
|
||||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<notes>
|
||||
- Fixed bug #16936 (Module segfaults on readFrame if Frame > STOMP_BUFSIZE)
|
||||
- Fixed bug #16933 (readFrame does not notice when server shuts down)
|
||||
- Fixed bug #16930 (readFrame reports error-frames as "timeout")
|
||||
- Adds the ability to specify an alternative class for readFrame
|
||||
</notes>
|
||||
|
||||
<contents>
|
||||
@@ -45,6 +43,7 @@ This extension allows php applications to communicate with any Stomp compliant M
|
||||
<file role="test" name="tests/008-unsubscribe/001.phpt" />
|
||||
<file role="test" name="tests/009-readFrame/001.phpt" />
|
||||
<file role="test" name="tests/009-readFrame/002.phpt" />
|
||||
<file role="test" name="tests/009-readFrame/003.phpt" />
|
||||
<file role="test" name="tests/010-timeout/001.phpt" />
|
||||
<file role="test" name="tests/010-timeout/002.phpt" />
|
||||
<file role="test" name="tests/011-commit/001.phpt" />
|
||||
@@ -78,6 +77,18 @@ This extension allows php applications to communicate with any Stomp compliant M
|
||||
</extsrcrelease>
|
||||
|
||||
<changelog>
|
||||
<release>
|
||||
<version><release>0.3.2</release><api>0.3.2</api></version>
|
||||
<stability><release>beta</release><api>beta</api></stability>
|
||||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<date>2009-11-22</date>
|
||||
<notes>
|
||||
- Adds alt class
|
||||
- Fixed bug #16936 (Module segfaults on readFrame if Frame > STOMP_BUFSIZE)
|
||||
- Fixed bug #16933 (readFrame does not notice when server shuts down)
|
||||
- Fixed bug #16930 (readFrame reports error-frames as "timeout")
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
<version><release>0.3.1</release><api>0.3.1</api></version>
|
||||
<stability><release>beta</release><api>beta</api></stability>
|
||||
|
1380
php_stomp.c
1380
php_stomp.c
File diff suppressed because it is too large
Load Diff
31
php_stomp.h
31
php_stomp.h
@@ -2,7 +2,7 @@
|
||||
+----------------------------------------------------------------------+
|
||||
| PHP Version 5 |
|
||||
+----------------------------------------------------------------------+
|
||||
| Copyright (c) 1997-2009 The PHP Group |
|
||||
| Copyright (c) 1997-2010 The PHP Group |
|
||||
+----------------------------------------------------------------------+
|
||||
| This source file is subject to version 3.01 of the PHP license, |
|
||||
| that is bundled with this package in the file LICENSE, and is |
|
||||
@@ -24,15 +24,16 @@
|
||||
#include "stomp.h"
|
||||
|
||||
typedef struct _stomp_object {
|
||||
zend_object std;
|
||||
stomp_t *stomp;
|
||||
zend_object std;
|
||||
stomp_t *stomp;
|
||||
} stomp_object_t;
|
||||
|
||||
#define PHP_STOMP_EXTNAME "Stomp"
|
||||
#define PHP_STOMP_MAJOR_VERSION "0"
|
||||
#define PHP_STOMP_MINOR_VERSION "3"
|
||||
#define PHP_STOMP_PATCH_VERSION "2"
|
||||
#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION
|
||||
#define PHP_STOMP_MINOR_VERSION "4"
|
||||
#define PHP_STOMP_PATCH_VERSION "0"
|
||||
#define PHP_STOMP_VERSION_STATUS "-dev"
|
||||
#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION PHP_STOMP_VERSION_STATUS
|
||||
|
||||
#define PHP_STOMP_RES_NAME "stomp connection"
|
||||
|
||||
@@ -88,16 +89,16 @@ PHP_FUNCTION(stomp_get_read_timeout);
|
||||
PHP_METHOD(stompframe, __construct);
|
||||
|
||||
ZEND_BEGIN_MODULE_GLOBALS(stomp)
|
||||
/* INI */
|
||||
char *default_broker;
|
||||
long read_timeout_sec;
|
||||
long read_timeout_usec;
|
||||
long connection_timeout_sec;
|
||||
long connection_timeout_usec;
|
||||
/* INI */
|
||||
char *default_broker;
|
||||
long read_timeout_sec;
|
||||
long read_timeout_usec;
|
||||
long connection_timeout_sec;
|
||||
long connection_timeout_usec;
|
||||
|
||||
/* Others */
|
||||
long error_no;
|
||||
char *error_msg;
|
||||
/* Others */
|
||||
long error_no;
|
||||
char *error_msg;
|
||||
ZEND_END_MODULE_GLOBALS(stomp)
|
||||
|
||||
#ifdef ZTS
|
||||
|
679
stomp.c
679
stomp.c
@@ -2,7 +2,7 @@
|
||||
+----------------------------------------------------------------------+
|
||||
| PHP Version 5 |
|
||||
+----------------------------------------------------------------------+
|
||||
| Copyright (c) 1997-2009 The PHP Group |
|
||||
| Copyright (c) 1997-2010 The PHP Group |
|
||||
+----------------------------------------------------------------------+
|
||||
| This source file is subject to version 3.01 of the PHP license, |
|
||||
| that is bundled with this package in the file LICENSE, and is |
|
||||
@@ -28,38 +28,37 @@
|
||||
#include "stomp.h"
|
||||
#include "php_stomp.h"
|
||||
|
||||
#define RETURN_READ_FRAME_FAIL { frame_destroy(f); return NULL; }
|
||||
#define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; }
|
||||
|
||||
ZEND_EXTERN_MODULE_GLOBALS(stomp);
|
||||
extern stomp_ce_exception;
|
||||
extern zend_class_entry *stomp_ce_exception;
|
||||
|
||||
/* {{{ stomp_new
|
||||
/* {{{ stomp_init
|
||||
*/
|
||||
stomp_t *stomp_new(const char *host, unsigned short port, long read_timeout_sec, long read_timeout_usec TSRMLS_DC)
|
||||
stomp_t *stomp_init()
|
||||
{
|
||||
/* Memory allocation for the stomp */
|
||||
stomp_t *stomp = (stomp_t *) emalloc(sizeof(stomp_t));
|
||||
memset(stomp, 0, sizeof(*stomp));
|
||||
/* Memory allocation */
|
||||
stomp_t *stomp = (stomp_t *) emalloc(sizeof(stomp_t));
|
||||
memset(stomp, 0, sizeof(*stomp));
|
||||
|
||||
/* Define all values */
|
||||
stomp->host = (char *) emalloc(strlen(host) + 1);
|
||||
memcpy(stomp->host, host, strlen(host));
|
||||
stomp->host[strlen(host)] = '\0';
|
||||
|
||||
stomp->port = port;
|
||||
stomp->status = 0;
|
||||
stomp->error = NULL;
|
||||
stomp->errnum = 0;
|
||||
stomp->read_timeout_sec = read_timeout_sec;
|
||||
stomp->read_timeout_usec = read_timeout_usec;
|
||||
stomp->session = NULL;
|
||||
/* Define all values */
|
||||
stomp->host = NULL;
|
||||
stomp->port = 0;
|
||||
stomp->status = 0;
|
||||
stomp->error = NULL;
|
||||
stomp->errnum = 0;
|
||||
stomp->session = NULL;
|
||||
stomp->options.connect_timeout_sec = 2;
|
||||
stomp->options.connect_timeout_usec = 0;
|
||||
stomp->options.read_timeout_sec = 2;
|
||||
stomp->options.read_timeout_usec = 2;
|
||||
|
||||
#if HAVE_STOMP_SSL
|
||||
stomp->ssl_handle = NULL;
|
||||
stomp->use_ssl = 0;
|
||||
stomp->options.use_ssl = 0;
|
||||
stomp->ssl_handle = NULL;
|
||||
#endif
|
||||
|
||||
return stomp;
|
||||
return stomp;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
@@ -67,112 +66,122 @@ stomp_t *stomp_new(const char *host, unsigned short port, long read_timeout_sec,
|
||||
*/
|
||||
void stomp_set_error(stomp_t *stomp, const char *error, int errnum)
|
||||
{
|
||||
if (error != NULL) {
|
||||
if (stomp->error != NULL) {
|
||||
efree(stomp->error);
|
||||
}
|
||||
stomp->error = estrdup(error);
|
||||
stomp->errnum = errnum;
|
||||
}
|
||||
if (error != NULL) {
|
||||
if (stomp->error != NULL) {
|
||||
efree(stomp->error);
|
||||
}
|
||||
stomp->error = estrdup(error);
|
||||
stomp->errnum = errnum;
|
||||
}
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_connect
|
||||
*/
|
||||
int stomp_connect(stomp_t *stomp TSRMLS_DC)
|
||||
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC)
|
||||
{
|
||||
char error[1024];
|
||||
socklen_t size;
|
||||
struct timeval tv;
|
||||
fd_set rfds;
|
||||
char error[1024];
|
||||
socklen_t size;
|
||||
struct timeval tv;
|
||||
fd_set rfds;
|
||||
|
||||
tv.tv_sec = STOMP_G(connection_timeout_sec);
|
||||
tv.tv_usec = STOMP_G(connection_timeout_usec);
|
||||
if (stomp->host != NULL)
|
||||
{
|
||||
efree(stomp->host);
|
||||
}
|
||||
stomp->host = (char *) emalloc(strlen(host) + 1);
|
||||
memcpy(stomp->host, host, strlen(host));
|
||||
stomp->host[strlen(host)] = '\0';
|
||||
|
||||
stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC);
|
||||
if (stomp->fd == -1) {
|
||||
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
stomp->port = port;
|
||||
|
||||
size = sizeof(stomp->localaddr);
|
||||
memset(&stomp->localaddr, 0, size);
|
||||
if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
|
||||
snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
tv.tv_sec = stomp->options.connect_timeout_sec;
|
||||
tv.tv_usec = stomp->options.connect_timeout_usec;
|
||||
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_usec = 0;
|
||||
stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC);
|
||||
if (stomp->fd == -1) {
|
||||
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(stomp->fd, &rfds);
|
||||
size = sizeof(stomp->localaddr);
|
||||
memset(&stomp->localaddr, 0, size);
|
||||
if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
|
||||
snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (select(stomp->fd + 1, NULL, &rfds, NULL, &tv) > 0) {
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_usec = 0;
|
||||
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(stomp->fd, &rfds);
|
||||
|
||||
if (select(stomp->fd + 1, NULL, &rfds, NULL, &tv) > 0) {
|
||||
#if HAVE_STOMP_SSL
|
||||
if (stomp->use_ssl) {
|
||||
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
|
||||
if (NULL == ctx) {
|
||||
stomp_set_error(stomp, "failed to create the SSL context", 0);
|
||||
return 0;
|
||||
}
|
||||
if (stomp->options.use_ssl) {
|
||||
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
|
||||
if (NULL == ctx) {
|
||||
stomp_set_error(stomp, "failed to create the SSL context", 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSL_CTX_set_options(ctx, SSL_OP_ALL);
|
||||
SSL_CTX_set_options(ctx, SSL_OP_ALL);
|
||||
|
||||
stomp->ssl_handle = SSL_new(ctx);
|
||||
if (stomp->ssl_handle == NULL) {
|
||||
stomp_set_error(stomp, "failed to create the SSL handle", 0);
|
||||
SSL_CTX_free(ctx);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSL_set_fd(stomp->ssl_handle, stomp->fd);
|
||||
stomp->ssl_handle = SSL_new(ctx);
|
||||
if (stomp->ssl_handle == NULL) {
|
||||
stomp_set_error(stomp, "failed to create the SSL handle", 0);
|
||||
SSL_CTX_free(ctx);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSL_set_fd(stomp->ssl_handle, stomp->fd);
|
||||
|
||||
if (SSL_connect(stomp->ssl_handle) <= 0) {
|
||||
stomp_set_error(stomp, "SSL/TLS handshake failed", 0);
|
||||
SSL_shutdown(stomp->ssl_handle);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (SSL_connect(stomp->ssl_handle) <= 0) {
|
||||
stomp_set_error(stomp, "SSL/TLS handshake failed", 0);
|
||||
SSL_shutdown(stomp->ssl_handle);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return 1;
|
||||
} else {
|
||||
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
} else {
|
||||
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_close
|
||||
*/
|
||||
void stomp_close(stomp_t *stomp TSRMLS_DC)
|
||||
void stomp_close(stomp_t *stomp)
|
||||
{
|
||||
if (NULL == stomp) {
|
||||
return;
|
||||
}
|
||||
if (NULL == stomp) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (stomp->fd != -1) {
|
||||
if (stomp->fd != -1) {
|
||||
#if HAVE_STOMP_SSL
|
||||
if(stomp->ssl_handle) {
|
||||
SSL_shutdown(stomp->ssl_handle);
|
||||
}
|
||||
if(stomp->ssl_handle) {
|
||||
SSL_shutdown(stomp->ssl_handle);
|
||||
}
|
||||
#endif
|
||||
closesocket(stomp->fd);
|
||||
}
|
||||
if (stomp->host) {
|
||||
efree(stomp->host);
|
||||
}
|
||||
if (stomp->session) {
|
||||
efree(stomp->session);
|
||||
}
|
||||
if (stomp->error) {
|
||||
efree(stomp->error);
|
||||
}
|
||||
closesocket(stomp->fd);
|
||||
}
|
||||
if (stomp->host) {
|
||||
efree(stomp->host);
|
||||
}
|
||||
if (stomp->session) {
|
||||
efree(stomp->session);
|
||||
}
|
||||
if (stomp->error) {
|
||||
efree(stomp->error);
|
||||
}
|
||||
|
||||
efree(stomp);
|
||||
efree(stomp);
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
@@ -180,70 +189,70 @@ void stomp_close(stomp_t *stomp TSRMLS_DC)
|
||||
*/
|
||||
int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
|
||||
{
|
||||
smart_str buf = {0};
|
||||
smart_str buf = {0};
|
||||
|
||||
/* Command */
|
||||
smart_str_appends(&buf, frame->command);
|
||||
smart_str_appendc(&buf, '\n');
|
||||
/* Command */
|
||||
smart_str_appends(&buf, frame->command);
|
||||
smart_str_appendc(&buf, '\n');
|
||||
|
||||
/* Headers */
|
||||
if (frame->headers) {
|
||||
/* Headers */
|
||||
if (frame->headers) {
|
||||
|
||||
char *key;
|
||||
ulong pos;
|
||||
zend_hash_internal_pointer_reset(frame->headers);
|
||||
char *key;
|
||||
ulong pos;
|
||||
zend_hash_internal_pointer_reset(frame->headers);
|
||||
|
||||
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
|
||||
char *value = NULL;
|
||||
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
|
||||
char *value = NULL;
|
||||
|
||||
smart_str_appends(&buf, key);
|
||||
smart_str_appendc(&buf, ':');
|
||||
smart_str_appends(&buf, key);
|
||||
smart_str_appendc(&buf, ':');
|
||||
|
||||
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
|
||||
smart_str_appends(&buf, value);
|
||||
}
|
||||
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
|
||||
smart_str_appends(&buf, value);
|
||||
}
|
||||
|
||||
smart_str_appendc(&buf, '\n');
|
||||
smart_str_appendc(&buf, '\n');
|
||||
|
||||
zend_hash_move_forward(frame->headers);
|
||||
}
|
||||
}
|
||||
zend_hash_move_forward(frame->headers);
|
||||
}
|
||||
}
|
||||
|
||||
if (frame->body_length > 0) {
|
||||
smart_str_appends(&buf, "content-length: ");
|
||||
smart_str_append_long(&buf, frame->body_length);
|
||||
smart_str_appendc(&buf, '\n');
|
||||
}
|
||||
if (frame->body_length > 0) {
|
||||
smart_str_appends(&buf, "content-length: ");
|
||||
smart_str_append_long(&buf, frame->body_length);
|
||||
smart_str_appendc(&buf, '\n');
|
||||
}
|
||||
|
||||
smart_str_appendc(&buf, '\n');
|
||||
smart_str_appendc(&buf, '\n');
|
||||
|
||||
if (frame->body > 0) {
|
||||
smart_str_appends(&buf, frame->body);
|
||||
}
|
||||
if (frame->body > 0) {
|
||||
smart_str_appends(&buf, frame->body);
|
||||
}
|
||||
|
||||
#ifdef HAVE_STOMP_SSL
|
||||
if (stomp->use_ssl) {
|
||||
if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) {
|
||||
char error[1024];
|
||||
snprintf(error, sizeof(error), "Unable to send data");
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
if (stomp->options.use_ssl) {
|
||||
if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) {
|
||||
char error[1024];
|
||||
snprintf(error, sizeof(error), "Unable to send data");
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
#endif
|
||||
if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) {
|
||||
char error[1024];
|
||||
snprintf(error, sizeof(error), "Unable to send data");
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) {
|
||||
char error[1024];
|
||||
snprintf(error, sizeof(error), "Unable to send data");
|
||||
stomp_set_error(stomp, error, errno);
|
||||
return 0;
|
||||
}
|
||||
#ifdef HAVE_STOMP_SSL
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
smart_str_free(&buf);
|
||||
smart_str_free(&buf);
|
||||
|
||||
return 1;
|
||||
return 1;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
@@ -253,21 +262,21 @@ int stomp_recv(stomp_t *stomp, char *msg, size_t length)
|
||||
{
|
||||
int len;
|
||||
#if HAVE_STOMP_SSL
|
||||
if(stomp->use_ssl) {
|
||||
len = SSL_read(stomp->ssl_handle, msg, length);
|
||||
} else {
|
||||
if(stomp->options.use_ssl) {
|
||||
len = SSL_read(stomp->ssl_handle, msg, length);
|
||||
} else {
|
||||
#endif
|
||||
len = recv(stomp->fd, msg, length, 0);
|
||||
len = recv(stomp->fd, msg, length, 0);
|
||||
#if HAVE_STOMP_SSL
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (len == 0) {
|
||||
if (len == 0) {
|
||||
TSRMLS_FETCH();
|
||||
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, "Unexpected EOF while reading from socket");
|
||||
stomp->status = -1;
|
||||
}
|
||||
return len;
|
||||
stomp->status = -1;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
@@ -275,53 +284,53 @@ int stomp_recv(stomp_t *stomp, char *msg, size_t length)
|
||||
*/
|
||||
static int stomp_read_buffer(stomp_t *stomp, char **data)
|
||||
{
|
||||
int rc = 0;
|
||||
size_t i = 0;
|
||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
||||
int rc = 0;
|
||||
size_t i = 0;
|
||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
||||
|
||||
while (1) {
|
||||
while (1) {
|
||||
|
||||
size_t length = 1;
|
||||
rc = stomp_recv(stomp, buffer + i, length);
|
||||
if (rc < 1) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
size_t length = 1;
|
||||
rc = stomp_recv(stomp, buffer + i, length);
|
||||
if (rc < 1) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (1 == length) {
|
||||
i++;
|
||||
if (1 == length) {
|
||||
i++;
|
||||
|
||||
if (buffer[i-1] == 0) {
|
||||
char endline[1];
|
||||
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
|
||||
efree(buffer);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (buffer[i-1] == 0) {
|
||||
char endline[1];
|
||||
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
|
||||
efree(buffer);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (i >= bufsize) {
|
||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||
bufsize += STOMP_BUFSIZE;
|
||||
}
|
||||
if (i >= bufsize) {
|
||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||
bufsize += STOMP_BUFSIZE;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (i > 1) {
|
||||
*data = (char *) emalloc(i);
|
||||
if (NULL == *data) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
if (i > 1) {
|
||||
*data = (char *) emalloc(i);
|
||||
if (NULL == *data) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(*data, buffer, i);
|
||||
}
|
||||
memcpy(*data, buffer, i);
|
||||
}
|
||||
|
||||
efree(buffer);
|
||||
efree(buffer);
|
||||
|
||||
return i-1;
|
||||
return i-1;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
@@ -329,72 +338,72 @@ static int stomp_read_buffer(stomp_t *stomp, char **data)
|
||||
*/
|
||||
static int stomp_read_line(stomp_t *stomp, char **data)
|
||||
{
|
||||
int rc = 0;
|
||||
size_t i = 0;
|
||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
||||
int rc = 0;
|
||||
size_t i = 0;
|
||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
||||
|
||||
while (1) {
|
||||
while (1) {
|
||||
|
||||
size_t length = 1;
|
||||
rc = stomp_recv(stomp, buffer + i, length);
|
||||
if (rc < 1) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
size_t length = 1;
|
||||
rc = stomp_recv(stomp, buffer + i, length);
|
||||
if (rc < 1) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (1 == length) {
|
||||
i++;
|
||||
if (1 == length) {
|
||||
i++;
|
||||
|
||||
if (buffer[i-1] == '\n') {
|
||||
buffer[i-1] = 0;
|
||||
break;
|
||||
} else if (buffer[i-1] == 0) {
|
||||
efree(buffer);
|
||||
return 0;
|
||||
}
|
||||
if (buffer[i-1] == '\n') {
|
||||
buffer[i-1] = 0;
|
||||
break;
|
||||
} else if (buffer[i-1] == 0) {
|
||||
efree(buffer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (i >= bufsize) {
|
||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||
bufsize += STOMP_BUFSIZE;
|
||||
}
|
||||
}
|
||||
if (i >= bufsize) {
|
||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||
bufsize += STOMP_BUFSIZE;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (i > 1) {
|
||||
*data = (char *) emalloc(i);
|
||||
if (NULL == *data) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
if (i > 1) {
|
||||
*data = (char *) emalloc(i);
|
||||
if (NULL == *data) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(*data, buffer, i);
|
||||
}
|
||||
memcpy(*data, buffer, i);
|
||||
}
|
||||
|
||||
efree(buffer);
|
||||
efree(buffer);
|
||||
|
||||
return i-1;
|
||||
return i-1;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ frame_destroy
|
||||
/* {{{ stomp_free_frame
|
||||
*/
|
||||
void frame_destroy(stomp_frame_t *frame)
|
||||
void stomp_free_frame(stomp_frame_t *frame)
|
||||
{
|
||||
if (frame) {
|
||||
if (frame->command) {
|
||||
efree(frame->command);
|
||||
}
|
||||
if (frame->body) {
|
||||
efree(frame->body);
|
||||
}
|
||||
if (frame->headers) {
|
||||
zend_hash_destroy(frame->headers);
|
||||
efree(frame->headers);
|
||||
}
|
||||
efree(frame);
|
||||
}
|
||||
if (frame) {
|
||||
if (frame->command) {
|
||||
efree(frame->command);
|
||||
}
|
||||
if (frame->body) {
|
||||
efree(frame->body);
|
||||
}
|
||||
if (frame->headers) {
|
||||
zend_hash_destroy(frame->headers);
|
||||
efree(frame->headers);
|
||||
}
|
||||
efree(frame);
|
||||
}
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
@@ -402,110 +411,110 @@ void frame_destroy(stomp_frame_t *frame)
|
||||
*/
|
||||
stomp_frame_t *stomp_read_frame(stomp_t *stomp)
|
||||
{
|
||||
stomp_frame_t *f = NULL;
|
||||
char *cmd = NULL, *length_str = NULL;
|
||||
int length = 0;
|
||||
stomp_frame_t *f = NULL;
|
||||
char *cmd = NULL, *length_str = NULL;
|
||||
int length = 0;
|
||||
|
||||
INIT_STOMP_FRAME(f);
|
||||
INIT_STOMP_FRAME(f);
|
||||
|
||||
if (NULL == f) {
|
||||
return NULL;
|
||||
}
|
||||
if (NULL == f) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Parse the command */
|
||||
length = stomp_read_line(stomp, &cmd);
|
||||
if (length < 1) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
/* Parse the command */
|
||||
length = stomp_read_line(stomp, &cmd);
|
||||
if (length < 1) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
|
||||
f->command = cmd;
|
||||
f->command_length = length;
|
||||
f->command = cmd;
|
||||
f->command_length = length;
|
||||
|
||||
/* Parse the header */
|
||||
while (1) {
|
||||
char *p = NULL;
|
||||
length = stomp_read_line(stomp, &p);
|
||||
|
||||
if (length < 0) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
/* Parse the header */
|
||||
while (1) {
|
||||
char *p = NULL;
|
||||
length = stomp_read_line(stomp, &p);
|
||||
|
||||
if (length < 0) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
|
||||
if (0 == length) {
|
||||
break;
|
||||
} else {
|
||||
char *p2 = NULL;
|
||||
char *key;
|
||||
char *value;
|
||||
if (0 == length) {
|
||||
break;
|
||||
} else {
|
||||
char *p2 = NULL;
|
||||
char *key;
|
||||
char *value;
|
||||
|
||||
p2 = strstr(p,":");
|
||||
|
||||
if (p2 == NULL) {
|
||||
efree(p);
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
p2 = strstr(p,":");
|
||||
|
||||
if (p2 == NULL) {
|
||||
efree(p);
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
|
||||
/* Null terminate the key */
|
||||
*p2=0;
|
||||
key = p;
|
||||
/* Null terminate the key */
|
||||
*p2=0;
|
||||
key = p;
|
||||
|
||||
/* The rest if the value. */
|
||||
value = p2+1;
|
||||
/* The rest is the value. */
|
||||
value = p2+1;
|
||||
|
||||
/* Insert key/value into hash table. */
|
||||
zend_hash_add(f->headers, key, strlen(key) + 1, value, strlen(value) + 1, NULL);
|
||||
efree(p);
|
||||
}
|
||||
}
|
||||
/* Insert key/value into hash table. */
|
||||
zend_hash_add(f->headers, key, strlen(key) + 1, value, strlen(value) + 1, NULL);
|
||||
efree(p);
|
||||
}
|
||||
}
|
||||
|
||||
/* Check for the content length */
|
||||
if (zend_hash_find(f->headers, "content-length", strlen("content-length"), (void **)&length_str) == SUCCESS) {
|
||||
char endbuffer[2];
|
||||
length = 2;
|
||||
/* Check for the content length */
|
||||
if (zend_hash_find(f->headers, "content-length", strlen("content-length"), (void **)&length_str) == SUCCESS) {
|
||||
char endbuffer[2];
|
||||
length = 2;
|
||||
|
||||
f->body_length = atoi(length_str);
|
||||
f->body = (char *) emalloc(f->body_length);
|
||||
f->body_length = atoi(length_str);
|
||||
f->body = (char *) emalloc(f->body_length);
|
||||
|
||||
if (-1 == stomp_recv(stomp, f->body, f->body_length)) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
if (-1 == stomp_recv(stomp, f->body, f->body_length)) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
|
||||
if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
} else {
|
||||
f->body_length = stomp_read_buffer(stomp, &f->body);
|
||||
}
|
||||
if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
} else {
|
||||
f->body_length = stomp_read_buffer(stomp, &f->body);
|
||||
}
|
||||
|
||||
return f;
|
||||
return f;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_valid_receipt
|
||||
*/
|
||||
int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
||||
int success = 1;
|
||||
char *receipt = NULL;
|
||||
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
|
||||
stomp_frame_t *res = stomp_read_frame(stomp);
|
||||
success = 0;
|
||||
if (res) {
|
||||
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
|
||||
char *receipt_id = NULL;
|
||||
if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS
|
||||
&& strlen(receipt) == strlen(receipt_id)
|
||||
&& !strcmp(receipt, receipt_id)) {
|
||||
success = 1;
|
||||
}
|
||||
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
|
||||
char *error_msg = NULL;
|
||||
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
|
||||
stomp_set_error(stomp, error_msg, 0);
|
||||
}
|
||||
}
|
||||
frame_destroy(res);
|
||||
}
|
||||
}
|
||||
return success;
|
||||
int success = 1;
|
||||
char *receipt = NULL;
|
||||
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
|
||||
stomp_frame_t *res = stomp_read_frame(stomp);
|
||||
success = 0;
|
||||
if (res) {
|
||||
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
|
||||
char *receipt_id = NULL;
|
||||
if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS
|
||||
&& strlen(receipt) == strlen(receipt_id)
|
||||
&& !strcmp(receipt, receipt_id)) {
|
||||
success = 1;
|
||||
}
|
||||
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
|
||||
char *error_msg = NULL;
|
||||
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
|
||||
stomp_set_error(stomp, error_msg, 0);
|
||||
}
|
||||
}
|
||||
stomp_free_frame(res);
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
@@ -513,15 +522,15 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
||||
*/
|
||||
int stomp_select(stomp_t *stomp)
|
||||
{
|
||||
struct timeval tv;
|
||||
fd_set rfds;
|
||||
struct timeval tv;
|
||||
fd_set rfds;
|
||||
|
||||
tv.tv_sec = stomp->read_timeout_sec;
|
||||
tv.tv_usec = stomp->read_timeout_usec;
|
||||
tv.tv_sec = stomp->options.read_timeout_sec;
|
||||
tv.tv_usec = stomp->options.read_timeout_usec;
|
||||
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(stomp->fd, &rfds);
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(stomp->fd, &rfds);
|
||||
|
||||
return select(stomp->fd + 1, &rfds, NULL, NULL, &tv);
|
||||
return select(stomp->fd + 1, &rfds, NULL, NULL, &tv);
|
||||
}
|
||||
/* }}} */
|
||||
|
61
stomp.h
61
stomp.h
@@ -2,7 +2,7 @@
|
||||
+----------------------------------------------------------------------+
|
||||
| PHP Version 5 |
|
||||
+----------------------------------------------------------------------+
|
||||
| Copyright (c) 1997-2009 The PHP Group |
|
||||
| Copyright (c) 1997-2010 The PHP Group |
|
||||
+----------------------------------------------------------------------+
|
||||
| This source file is subject to version 3.01 of the PHP license, |
|
||||
| that is bundled with this package in the file LICENSE, and is |
|
||||
@@ -30,45 +30,54 @@
|
||||
#define STOMP_BUFSIZE 4096
|
||||
|
||||
#define INIT_STOMP_FRAME(f) \
|
||||
f = (stomp_frame_t *) emalloc(sizeof(stomp_frame_t)); \
|
||||
f->command = NULL; f->body = NULL; \
|
||||
ALLOC_HASHTABLE(f->headers); \
|
||||
zend_hash_init(f->headers, 0, NULL, NULL, 0);
|
||||
f = (stomp_frame_t *) emalloc(sizeof(stomp_frame_t)); \
|
||||
f->command = NULL; f->body = NULL; \
|
||||
ALLOC_HASHTABLE(f->headers); \
|
||||
zend_hash_init(f->headers, 0, NULL, NULL, 0);
|
||||
|
||||
typedef struct _stomp_options {
|
||||
long connect_timeout_sec;
|
||||
long connect_timeout_usec;
|
||||
long read_timeout_sec;
|
||||
long read_timeout_usec;
|
||||
#if HAVE_STOMP_SSL
|
||||
int use_ssl;
|
||||
#endif
|
||||
} stomp_options_t;
|
||||
|
||||
typedef struct _stomp {
|
||||
php_socket_t fd;
|
||||
php_sockaddr_storage localaddr;
|
||||
char *host;
|
||||
unsigned short port;
|
||||
int status;
|
||||
char *error;
|
||||
int errnum;
|
||||
long read_timeout_sec;
|
||||
long read_timeout_usec;
|
||||
char *session;
|
||||
php_socket_t fd;
|
||||
php_sockaddr_storage localaddr;
|
||||
stomp_options_t options;
|
||||
char *host;
|
||||
unsigned short port;
|
||||
int status;
|
||||
char *error;
|
||||
int errnum;
|
||||
char *session;
|
||||
#if HAVE_STOMP_SSL
|
||||
SSL *ssl_handle;
|
||||
int use_ssl;
|
||||
SSL *ssl_handle;
|
||||
#endif
|
||||
} stomp_t;
|
||||
|
||||
|
||||
typedef struct _stomp_frame {
|
||||
char *command;
|
||||
int command_length;
|
||||
HashTable *headers;
|
||||
char *body;
|
||||
int body_length;
|
||||
char *command;
|
||||
int command_length;
|
||||
HashTable *headers;
|
||||
char *body;
|
||||
int body_length;
|
||||
} stomp_frame_t;
|
||||
|
||||
stomp_t *stomp_new(const char *host, unsigned short port, long read_timeout_sec, long read_timeout_usec TSRMLS_DC);
|
||||
int stomp_connect(stomp_t *stomp TSRMLS_DC);
|
||||
void stomp_close(stomp_t *stomp TSRMLS_DC);
|
||||
stomp_t *stomp_init();
|
||||
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
|
||||
void stomp_close(stomp_t *stomp);
|
||||
int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC);
|
||||
stomp_frame_t *stomp_read_frame(stomp_t *connection);
|
||||
int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
|
||||
int stomp_select(stomp_t *connection);
|
||||
void stomp_set_error(stomp_t *stomp, const char *error, int errnum);
|
||||
void frame_destroy(stomp_frame_t *frame);
|
||||
void stomp_free_frame(stomp_frame_t *frame);
|
||||
#endif /* _STOMP_H_ */
|
||||
|
||||
/*
|
||||
|
@@ -11,7 +11,7 @@ $s = new Stomp();
|
||||
$s->send('/queue/test-09', 'A test Message');
|
||||
$s->subscribe('/queue/test-09');
|
||||
var_dump($s->readFrame()->body);
|
||||
var_dump($s->readFrame('frame'));
|
||||
var_dump($s->readFrame());
|
||||
|
||||
?>
|
||||
--EXPECTF--
|
||||
|
@@ -12,11 +12,9 @@ stomp_send($link, '/queue/test-09', 'A test Message');
|
||||
stomp_subscribe($link, '/queue/test-09');
|
||||
$result = stomp_read_frame($link);
|
||||
var_dump($result['body']);
|
||||
var_dump(stomp_read_frame($link, 'frame'));
|
||||
var_dump(stomp_read_frame($link));
|
||||
|
||||
?>
|
||||
--EXPECTF--
|
||||
string(14) "A test Message"
|
||||
|
||||
Warning: stomp_read_frame() expects exactly 1 parameter, 2 given in %s on line %d
|
||||
NULL
|
||||
bool(false)
|
||||
|
27
tests/009-readFrame/003.phpt
Normal file
27
tests/009-readFrame/003.phpt
Normal file
@@ -0,0 +1,27 @@
|
||||
--TEST--
|
||||
Test stomp::readFrame() - custom frame class
|
||||
--SKIPIF--
|
||||
<?php
|
||||
if (!extension_loaded("stomp")) print "skip";
|
||||
if (!stomp_connect()) print "skip";
|
||||
?>
|
||||
--FILE--
|
||||
<?php
|
||||
|
||||
class customFrame extends stompFrame
|
||||
{
|
||||
public function __construct($cmd, $headers, $body)
|
||||
{
|
||||
parent::__construct($cmd, $headers, $body);
|
||||
}
|
||||
}
|
||||
|
||||
$s = new Stomp();
|
||||
$s->send('/queue/test-09', 'A test Message');
|
||||
$s->subscribe('/queue/test-09');
|
||||
$frame = $s->readFrame('customFrame');
|
||||
var_dump(get_class($frame), $frame->body);
|
||||
?>
|
||||
--EXPECT--
|
||||
string(11) "customFrame"
|
||||
string(14) "A test Message"
|
Reference in New Issue
Block a user