Compare commits

..

1 Commits

Author SHA1 Message Date
Pierrick Charron ced3c2904e Tag release 1.0.0 2010-02-12 04:58:52 +00:00
25 changed files with 264 additions and 868 deletions

28
.gitignore vendored
View File

@ -1,28 +0,0 @@
.deps
.libs/
Makefile
Makefile.fragments
Makefile.global
Makefile.objects
acinclude.m4
aclocal.m4
autom4te.cache/
config.guess
config.h
config.h.in
config.log
config.nice
config.status
config.sub
configure
configure.in
install-sh
libtool
ltmain.sh
missing
mkinstalldirs
modules/
php_stomp.lo
run-tests.php
stomp.la
stomp.lo

68
LICENSE
View File

@ -1,68 +0,0 @@
--------------------------------------------------------------------
The PHP License, version 3.01
Copyright (c) 1999 - 2014 The PHP Group. All rights reserved.
--------------------------------------------------------------------
Redistribution and use in source and binary forms, with or without
modification, is permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
3. The name "PHP" must not be used to endorse or promote products
derived from this software without prior written permission. For
written permission, please contact group@php.net.
4. Products derived from this software may not be called "PHP", nor
may "PHP" appear in their name, without prior written permission
from group@php.net. You may indicate that your software works in
conjunction with PHP by saying "Foo for PHP" instead of calling
it "PHP Foo" or "phpfoo"
5. The PHP Group may publish revised and/or new versions of the
license from time to time. Each version will be given a
distinguishing version number.
Once covered code has been published under a particular version
of the license, you may always continue to use it under the terms
of that version. You may also choose to use such covered code
under the terms of any subsequent version of the license
published by the PHP Group. No one other than the PHP Group has
the right to modify the terms applicable to covered code created
under this License.
6. Redistributions of any form whatsoever must retain the following
acknowledgment:
"This product includes PHP software, freely available from
<http://www.php.net/software/>".
THIS SOFTWARE IS PROVIDED BY THE PHP DEVELOPMENT TEAM ``AS IS'' AND
ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PHP
DEVELOPMENT TEAM OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
This software consists of voluntary contributions made by many
individuals on behalf of the PHP Group.
The PHP Group can be contacted via Email at group@php.net.
For more information on the PHP Group and the PHP project,
please see <http://www.php.net>.
PHP includes the Zend Engine, freely available at
<http://www.zend.com>.

5
TODO
View File

@ -1,5 +0,0 @@
- Add the ability to connect to a network of brokers
(Cf: http://activemq.apache.org/networks-of-brokers.html)
- Add Stomp 1.1 full support
- Verify if we need to call recv with MSG_PEEK for the newline char
or if the current implementation is enough (cf bug #59217)

View File

@ -5,12 +5,11 @@ class Stomp {
/**
* Connect to server
*
* @param string $broker The broker URI
* @param string $broker Broker URI
* @param string $username The username
* @param string $password The password
* @param array $headers additional headers (example: receipt).
*/
public function __construct($broker = null, $username = null, $password = null, array $headers = array()) {
public function __construct($broker = null, $username = null, $password = null) {
}
/**
@ -34,30 +33,30 @@ class Stomp {
*
* @param string $destination indicates where to send the message
* @param string|StompFrame $msg message to be sent
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure
*/
public function send($destination, $msg, array $headers = array()) {
public function send($destination, $msg, array $properties = array()) {
}
/**
* Register to listen to a given destination
*
* @param string $destination indicates which destination to subscribe to
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure
*/
public function subscribe($destination, array $headers = array()) {
public function subscribe($destination, array $properties = array()) {
}
/**
* Remove an existing subscription
*
* @param string $destination indicates which subscription to remove
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure
*/
public function unsubscribe($destination, array $headers = array()) {
public function unsubscribe($destination, array $properties = array()) {
}
/**
@ -108,10 +107,10 @@ class Stomp {
* Acknowledge consumption of a message from a subscription using client acknowledgment
*
* @param string|StompFrame $msg message/messageId to be acknowledged
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure
*/
public function ack($msg, array $headers = array()) {
public function ack($msg, array $properties = array()) {
}
/**
@ -164,12 +163,4 @@ class StompFrame {
}
class StompException extends Exception {
/**
* Get the stomp server error details
*
* @return string
*/
public function getDetails() {
}
}

View File

@ -14,10 +14,9 @@ function stomp_version() {
* @param string $broker broker URI
* @param string $username The username
* @param string $password The password
* @param array $headers additional headers (example: receipt).
* @return Ressource stomp connection identifier on success, or FALSE on failure
*/
function stomp_connect($broker = null, $username = null, $password = null, array $headers = array()) {
function stomp_connect($broker = null, $username = null, $password = null) {
}
/**
@ -44,10 +43,10 @@ function stomp_close($link) {
* @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates where to send the message
* @param string|StompFrame $msg message to be sent
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure
*/
function stomp_send($link, $destination, $msg, array $headers = array()) {
function stomp_send($link, $destination, $msg, array $properties = array()) {
}
/**
@ -55,10 +54,10 @@ function stomp_send($link, $destination, $msg, array $headers = array()) {
*
* @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates which destination to subscribe to
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure
*/
function stomp_subscribe($link, $destination, array $headers = array()) {
function stomp_subscribe($link, $destination, array $properties = array()) {
}
/**
@ -66,10 +65,10 @@ function stomp_subscribe($link, $destination, array $headers = array()) {
*
* @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates which subscription to remove
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure
*/
function stomp_unsubscribe($link, $destination, array $headers = array()) {
function stomp_unsubscribe($link, $destination, array $properties = array()) {
}
/**
@ -125,10 +124,10 @@ function stomp_abort($link, $transaction_id) {
*
* @param ressource $link identifier returned by stomp_connect
* @param string|StompFrame $msg message/messageId to be acknowledged
* @param array $headers additional headers (example: receipt).
* @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure
*/
function stomp_ack($link, $msg, array $headers = array()) {
function stomp_ack($link, $msg, array $properties = array()) {
}
/**

View File

@ -12,22 +12,17 @@ This extension allows php applications to communicate with any Stomp compliant M
<email>pierrick@php.net</email>
<active>yes</active>
</lead>
<lead>
<name>Gennady Feldman</name>
<user>gena01</user>
<email>gena01@php.net</email>
<active>yes</active>
</lead>
<date>XXXX-XX-XX</date>
<version><release>1.0.X</release><api>1.0.X</api></version>
<date>2010-02-11</date>
<version><release>1.0.0</release><api>1.0.0</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<notes>
- Bump to stable
</notes>
<contents>
<dir name="/">
<file role="doc" name="CREDITS" />
<file role="doc" name="LICENSE" />
<file role="src" name="config.m4" />
<file role="src" name="config.w32" />
<file role="src" name="php_stomp.c" />
@ -48,8 +43,6 @@ This extension allows php applications to communicate with any Stomp compliant M
<file role="test" name="tests/009-readFrame/001.phpt" />
<file role="test" name="tests/009-readFrame/002.phpt" />
<file role="test" name="tests/009-readFrame/003.phpt" />
<file role="test" name="tests/009-readFrame/004.phpt" />
<file role="test" name="tests/009-readFrame/005.phpt" />
<file role="test" name="tests/010-timeout/001.phpt" />
<file role="test" name="tests/010-timeout/002.phpt" />
<file role="test" name="tests/011-commit/001.phpt" />
@ -83,106 +76,6 @@ This extension allows php applications to communicate with any Stomp compliant M
</extsrcrelease>
<changelog>
<release>
<version><release>1.0.8</release><api>1.0.8</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-18</date>
<notes>
- Fix perm on source files. (Remi)
- Fixing PHP_STOMP_VERSION constant, per Remi's request. (Gennady)
</notes>
</release>
<release>
<version><release>1.0.7</release><api>1.0.7</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-15</date>
<notes>
- add LICENSE file as documentation (Remi)
- Fixed Windows compilation regression due to new TCP_NODELAY code. (Gennady Feldman)
- Fixed bug where error checking was missing after stomp_send(). (Gennady Feldman)
</notes>
</release>
<release>
<version><release>1.0.6</release><api>1.0.6</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2014-12-07</date>
<notes>
- Add two new ini options stomp.default_username and stomp.default_passowrd (Pierrick)
- General performance improvements (Pierrick)
- Fix stomp_read_frame when buffered (Pierrick)
- Fixed bug #59217 (Connections to RabbitMQ via CLI). (Pierrick).
- Fixed bug #59970 (acking a message makes rabbitmq disconnect the server). (Pierrick)
- Fixed bug #67170 (Disable Nagle's Algorithm with TCP_NODELAY, it delays sending small messages). (Yarek Tyshchenko)
- Fixed bug #68497 (Stomp client doesn't parse ERROR response on CONNECT). (Lorenzo Fontana)
- Fixed bug #64671 (Add stomp_nack and Stomp::nack functions). (Pierrick)
</notes>
</release>
<release>
<version><release>1.0.5</release><api>1.0.5</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2012-11-18</date>
<notes>
- Fix memory leak when Stomp can't write the message on the queue. (Pierrick)
- Add a buffer for receipts. (Pierrick)
- Fixed bug #62831 (Stomp module seems not initializing SSL library first).
(Patch by lwhsu at lwhsu dot org)
- Fixed bug #59972 (Message body are not binary safe). (Pierrick)
</notes>
</release>
<release>
<version><release>1.0.4</release><api>1.0.4</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2012-09-17</date>
<notes>
- Fix compatibility with 5.4
</notes>
</release>
<release>
<version><release>1.0.3</release><api>1.0.3</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-10-12</date>
<notes>
- Fixed bug #18772 (setTimeout usecs not honored)
</notes>
</release>
<release>
<version><release>1.0.2</release><api>1.0.2</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-08-13</date>
<notes>
- Fixed SSL connection bug introduced in 1.0.1
</notes>
</release>
<release>
<version><release>1.0.1</release><api>1.0.1</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-08-03</date>
<notes>
- Add new parameter to the constructor to allow client to send extra informations
- Add zend engine runtime cache support (introduced into trunk)
- Add new details property in the StompException class
- Add new StompException::getDetails() method
- Add the frame body content in the Stomp::Error() method
- Fixed bug #17262 (Server is not responding on win32)
</notes>
</release>
<release>
<version><release>1.0.0</release><api>1.0.0</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-02-11</date>
<notes>
- Bump to stable
</notes>
</release>
<release>
<version><release>0.4.1</release><api>0.4.1</api></version>
<stability><release>beta</release><api>beta</api></stability>

178
php_stomp.c Normal file → Executable file
View File

@ -51,6 +51,7 @@
zval **value = NULL; \
char *string_key = NULL; \
ulong num_key; \
zend_hash_internal_pointer_reset(headers_ht); \
for (zend_hash_internal_pointer_reset(headers_ht); \
zend_hash_get_current_data(headers_ht, (void **)&value) == SUCCESS; \
zend_hash_move_forward(headers_ht)) { \
@ -62,9 +63,7 @@
SEPARATE_ZVAL(value); \
convert_to_string(*value); \
} \
if (strcmp(string_key, "content-length") != 0) { \
zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
}\
efree(string_key); \
} \
}
@ -73,27 +72,14 @@
zend_hash_destroy(frame.headers); \
efree(frame.headers);
#define STOMP_ERROR(errno, msg) \
#define STOMP_ERROR(errno, msg, ... ) \
STOMP_G(error_no) = errno; \
if (STOMP_G(error_msg)) { \
efree(STOMP_G(error_msg)); \
} \
STOMP_G(error_msg) = estrdup(msg); \
if (stomp_object) { \
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg); \
}
#define STOMP_ERROR_DETAILS(errno, msg, details) \
STOMP_G(error_no) = errno; \
if (STOMP_G(error_msg)) { \
efree(STOMP_G(error_msg)); \
} \
STOMP_G(error_msg) = estrdup(msg); \
if (stomp_object) { \
zval *object = zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg); \
if (details) { \
zend_update_property_string(stomp_ce_exception, object, "details", sizeof("details")-1, (char *) details TSRMLS_CC); \
} \
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg, ##__VA_ARGS__); \
}
static int le_stomp;
@ -115,7 +101,6 @@ ZEND_BEGIN_ARG_INFO_EX(stomp_connect_args, 0, 0, 0)
ZEND_ARG_INFO(0, broker)
ZEND_ARG_INFO(0, username)
ZEND_ARG_INFO(0, password)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_link_only, 0, 0, 1)
@ -177,17 +162,6 @@ ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_nack_args, 0, 0, 2)
ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_oop_nack_args, 0, 0, 1)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_set_read_timeout_args, 0, 0, 2)
ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, seconds)
@ -222,7 +196,6 @@ zend_function_entry stomp_functions[] = {
PHP_FE(stomp_commit, stomp_transaction_args)
PHP_FE(stomp_abort, stomp_transaction_args)
PHP_FE(stomp_ack, stomp_ack_args)
PHP_FE(stomp_nack, stomp_nack_args)
PHP_FE(stomp_error, stomp_link_only)
PHP_FE(stomp_set_read_timeout, stomp_set_read_timeout_args)
PHP_FE(stomp_get_read_timeout, stomp_link_only)
@ -244,7 +217,6 @@ static zend_function_entry stomp_methods[] = {
PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args)
PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args)
PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args)
PHP_FALIAS(nack, stomp_nack, stomp_oop_nack_args)
PHP_FALIAS(error, stomp_error, stomp_no_args)
PHP_FALIAS(setReadTimeout, stomp_set_read_timeout, stomp_oop_set_read_timeout_args)
PHP_FALIAS(getReadTimeout, stomp_get_read_timeout, stomp_no_args)
@ -259,13 +231,6 @@ static zend_function_entry stomp_frame_methods[] = {
};
/* }}} */
/* {{{ stomp_exception_methods[] */
static zend_function_entry stomp_exception_methods[] = {
PHP_ME(stompexception, getDetails, stomp_no_args, ZEND_ACC_PUBLIC)
{NULL, NULL, NULL}
};
/* }}} */
/* {{{ stomp_module_entry */
zend_module_entry stomp_module_entry = {
#if ZEND_MODULE_API_NO >= 20010901
@ -291,8 +256,6 @@ zend_module_entry stomp_module_entry = {
PHP_INI_BEGIN()
STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_username", "", PHP_INI_ALL, OnUpdateString, default_username, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_password", "", PHP_INI_ALL, OnUpdateString, default_password, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, read_timeout_sec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, read_timeout_usec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_connection_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, connection_timeout_sec, zend_stomp_globals, stomp_globals)
@ -303,15 +266,10 @@ PHP_INI_END()
static PHP_GINIT_FUNCTION(stomp)
{
stomp_globals->default_broker = NULL;
stomp_globals->default_username = NULL;
stomp_globals->default_password = NULL;
stomp_globals->read_timeout_sec = 2;
stomp_globals->read_timeout_usec = 0;
stomp_globals->connection_timeout_sec = 2;
stomp_globals->connection_timeout_usec = 0;
#if HAVE_STOMP_SSL
SSL_library_init();
#endif
}
/* }}} */
@ -349,26 +307,17 @@ static void stomp_object_free_storage(stomp_object_t *intern TSRMLS_DC)
}
#if (PHP_MAJOR_VERSION == 5 && PHP_MINOR_VERSION >= 4) || (PHP_MAJOR_VERSION > 5)
#define PHP_STOMP_RUNTIME_CACHE
#endif
static zend_object_value php_stomp_new(zend_class_entry *ce TSRMLS_DC)
{
zend_object_value retval;
stomp_object_t *intern;
#ifndef PHP_STOMP_RUNTIME_CACHE
zval *tmp;
#endif
intern = (stomp_object_t *) ecalloc(1, sizeof(stomp_object_t));
intern->stomp = NULL;
zend_object_std_init(&intern->std, ce TSRMLS_CC);
#ifdef PHP_STOMP_RUNTIME_CACHE
object_properties_init(&intern->std, ce);
#else
zend_hash_copy(intern->std.properties, &ce->default_properties, (copy_ctor_func_t) zval_add_ref, (void *) &tmp, sizeof(zval *));
#endif
retval.handle = zend_objects_store_put(intern, (zend_objects_store_dtor_t)zend_objects_destroy_object, (zend_objects_free_object_storage_t) stomp_object_free_storage, NULL TSRMLS_CC);
retval.handlers = zend_get_std_object_handlers();
@ -400,12 +349,9 @@ PHP_MINIT_FUNCTION(stomp)
zend_declare_property_null(stomp_ce_frame, "body", sizeof("body")-1, ZEND_ACC_PUBLIC TSRMLS_CC);
/* Register StompException class */
INIT_CLASS_ENTRY(ce, PHP_STOMP_EXCEPTION_CLASSNAME, stomp_exception_methods);
INIT_CLASS_ENTRY(ce, PHP_STOMP_EXCEPTION_CLASSNAME, NULL);
stomp_ce_exception = zend_register_internal_class_ex(&ce, zend_exception_get_default(TSRMLS_C), NULL TSRMLS_CC);
/* Properties */
zend_declare_property_null(stomp_ce_exception, "details", sizeof("details")-1, ZEND_ACC_PRIVATE TSRMLS_CC);
/** Register INI entries **/
REGISTER_INI_ENTRIES();
@ -451,8 +397,6 @@ PHP_MINFO_FUNCTION(stomp)
php_info_print_table_row(2, "API version", PHP_STOMP_VERSION);
#if HAVE_STOMP_SSL
php_info_print_table_row(2, "SSL Support", "enabled");
#else
php_info_print_table_row(2, "SSL Support", "disabled");
#endif
php_info_print_table_end();
DISPLAY_INI_ENTRIES();
@ -467,22 +411,25 @@ PHP_FUNCTION(stomp_version)
}
/* }}} */
/* {{{ proto Stomp::__construct([string broker [, string username [, string password [, array headers]]]])
/* {{{ proto Stomp::__construct([string broker [, string username [, string password]]])
Connect to server */
PHP_FUNCTION(stomp_connect)
{
zval *stomp_object = getThis();
zval *headers = NULL;
stomp_t *stomp = NULL;
char *broker = NULL, *username = NULL, *password = NULL;
int broker_len = 0, username_len = 0, password_len = 0;
struct timeval tv;
php_url *url_parts;
#ifdef HAVE_STOMP_SSL
int use_ssl = 0;
#endif
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sssa!", &broker, &broker_len, &username, &username_len, &password, &password_len, &headers) == FAILURE) {
tv.tv_sec = 2;
tv.tv_usec = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sss", &broker, &broker_len, &username, &username_len, &password, &password_len) == FAILURE) {
return;
}
@ -531,52 +478,28 @@ PHP_FUNCTION(stomp_connect)
if (stomp->status) {
stomp_frame_t *res;
int rres;
stomp_frame_t frame = {0};
INIT_FRAME(frame, "CONNECT");
if (!username) {
username = STOMP_G(default_username);
username_len = strlen(username);
if (username_len == 0) {
username = "";
}
if (!password) {
password = STOMP_G(default_password);
password_len = strlen(password);
if (password_len == 0) {
password = "";
}
zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL);
zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL);
if (NULL != headers) {
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
}
rres = stomp_send(stomp, &frame TSRMLS_CC);
stomp_send(stomp, &frame TSRMLS_CC);
CLEAR_FRAME(frame);
if (0 == rres) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, stomp->errnum TSRMLS_CC, stomp->error);
if (stomp->error_details) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, stomp->error_details TSRMLS_CC);
}
return;
}
/* Retreive Response */
res = stomp_read_frame(stomp);
if (NULL == res) {
STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING);
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg);
if (res->body) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
}
stomp_free_frame(res);
}
} else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) {
if (stomp->error) {
STOMP_ERROR_DETAILS(stomp->errnum, stomp->error, stomp->error_details);
STOMP_ERROR(stomp->errnum, stomp->error);
} else {
STOMP_ERROR(0, PHP_STOMP_ERR_UNKNOWN);
}
@ -605,7 +528,7 @@ PHP_FUNCTION(stomp_connect)
}
}
} else {
STOMP_ERROR_DETAILS(0, stomp->error, stomp->error_details);
STOMP_ERROR(0, stomp->error);
}
stomp_close(stomp);
@ -725,7 +648,7 @@ PHP_FUNCTION(stomp_send)
if (Z_TYPE_P(msg) == IS_STRING) {
frame.body = Z_STRVAL_P(msg);
frame.body_length = Z_STRLEN_P(msg);
frame.body_length = -1;
} else if (Z_TYPE_P(msg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(msg), stomp_ce_frame TSRMLS_CC)) {
zval *frame_obj_prop = NULL;
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "command", sizeof("command")-1, 1 TSRMLS_CC);
@ -736,7 +659,7 @@ PHP_FUNCTION(stomp_send)
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "body", sizeof("body")-1, 1 TSRMLS_CC);
if (Z_TYPE_P(frame_obj_prop) == IS_STRING) {
frame.body = Z_STRVAL_P(frame_obj_prop);
frame.body_length = Z_STRLEN_P(frame_obj_prop);
frame.body_length = -1;
}
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "headers", sizeof("headers")-1, 1 TSRMLS_CC);
if (Z_TYPE_P(frame_obj_prop) == IS_ARRAY) {
@ -747,8 +670,6 @@ PHP_FUNCTION(stomp_send)
CLEAR_FRAME(frame);
RETURN_FALSE;
}
if (frame.body_length > 0 && strnlen(frame.body, frame.body_length) >= frame.body_length)
frame.body_length = 0;
if (stomp_send(stomp, &frame TSRMLS_CC) > 0) {
success = stomp_valid_receipt(stomp, &frame);
@ -891,6 +812,7 @@ PHP_FUNCTION(stomp_read_frame)
zval *stomp_object = getThis();
stomp_t *stomp = NULL;
stomp_frame_t *res = NULL;
int sel_res = 0;
char *class_name = NULL;
int class_name_len = 0;
zend_class_entry *ce = NULL;
@ -920,17 +842,13 @@ PHP_FUNCTION(stomp_read_frame)
}
if ((res = stomp_read_frame(stomp))) {
if ((sel_res = stomp_select(stomp)) > 0 && (res = stomp_read_frame(stomp))) {
zval *headers = NULL;
if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg);
if (res->body) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
}
STOMP_ERROR(0, error_msg)
stomp_free_frame(res);
RETURN_FALSE;
}
@ -1034,13 +952,16 @@ PHP_FUNCTION(stomp_read_frame)
array_init(return_value);
add_assoc_string_ex(return_value, "command", sizeof("command"), res->command, 1);
if (res->body) {
add_assoc_stringl_ex(return_value, "body", sizeof("body"), res->body, res->body_length, 1);
add_assoc_string_ex(return_value, "body", sizeof("body"), res->body, 1);
}
add_assoc_zval_ex(return_value, "headers", sizeof("headers"), headers);
}
stomp_free_frame(res);
} else {
if (sel_res == -1) {
STOMP_ERROR(0, "Error while selecting from socket: %d", errno);
}
RETURN_FALSE;
}
}
@ -1114,9 +1035,10 @@ PHP_FUNCTION(stomp_abort)
}
/* }}} */
/* {{{ _php_stomp_acknowledgment
*/
static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
Acknowledge consumption of a message from a subscription using client acknowledgment */
PHP_FUNCTION(stomp_ack)
{
zval *stomp_object = getThis();
zval *msg = NULL, *headers = NULL;
stomp_t *stomp = NULL;
@ -1137,7 +1059,7 @@ static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp);
}
INIT_FRAME(frame, cmd);
INIT_FRAME(frame, "ACK");
if (NULL != headers) {
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
@ -1165,22 +1087,6 @@ static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
}
/* }}} */
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
Acknowledge consumption of a message from a subscription using client acknowledgment */
PHP_FUNCTION(stomp_ack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "ACK");
}
/* }}} */
/* {{{ proto boolean Stomp::nack(mixed msg [, array headers])
Negative Acknowledgment of a message from a subscription */
PHP_FUNCTION(stomp_nack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "NACK");
}
/* }}} */
/* {{{ proto string Stomp::error()
Get the last error message */
PHP_FUNCTION(stomp_error)
@ -1199,15 +1105,7 @@ PHP_FUNCTION(stomp_error)
}
if (stomp->error) {
if (stomp->error_details) {
char *error_msg = (char *) emalloc(strlen(stomp->error) + strlen(stomp->error_details) + 10);
strcpy(error_msg, stomp->error);
strcat(error_msg, "\n\n");
strcat(error_msg, stomp->error_details);
RETURN_STRING(error_msg, 0);
} else {
RETURN_STRING(stomp->error, 1);
}
} else {
RETURN_FALSE;
}
@ -1287,17 +1185,3 @@ PHP_METHOD(stompframe, __construct)
}
}
/* }}} */
/* {{{ proto string StompException::getDetails()
Get error details */
PHP_METHOD(stompexception, getDetails)
{
zval *object = getThis();
zval *details = NULL;
details = zend_read_property(stomp_ce_exception, object, "details", sizeof("details")-1, 1 TSRMLS_CC);
RETURN_STRINGL(Z_STRVAL_P(details), Z_STRLEN_P(details), 1);
}
/* }}} */

View File

@ -29,7 +29,11 @@ typedef struct _stomp_object {
} stomp_object_t;
#define PHP_STOMP_EXTNAME "Stomp"
#define PHP_STOMP_VERSION "1.0.9-dev"
#define PHP_STOMP_MAJOR_VERSION "1"
#define PHP_STOMP_MINOR_VERSION "0"
#define PHP_STOMP_PATCH_VERSION "0"
#define PHP_STOMP_VERSION_STATUS ""
#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION PHP_STOMP_VERSION_STATUS
#define PHP_STOMP_RES_NAME "stomp connection"
@ -78,15 +82,12 @@ PHP_FUNCTION(stomp_begin);
PHP_FUNCTION(stomp_commit);
PHP_FUNCTION(stomp_abort);
PHP_FUNCTION(stomp_ack);
PHP_FUNCTION(stomp_nack);
PHP_FUNCTION(stomp_error);
PHP_FUNCTION(stomp_set_read_timeout);
PHP_FUNCTION(stomp_get_read_timeout);
PHP_METHOD(stompframe, __construct);
PHP_METHOD(stompexception, getDetails);
ZEND_BEGIN_MODULE_GLOBALS(stomp)
/* INI */
char *default_broker;
@ -94,8 +95,6 @@ ZEND_BEGIN_MODULE_GLOBALS(stomp)
long read_timeout_usec;
long connection_timeout_sec;
long connection_timeout_usec;
char *default_username;
char *default_password;
/* Others */
long error_no;

489
stomp.c
View File

@ -23,47 +23,16 @@
#endif
#include "php.h"
#include "zend_exceptions.h"
#include "ext/standard/php_smart_str.h"
#include "stomp.h"
#include "php_stomp.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/tcp.h>
#endif
#define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; }
ZEND_EXTERN_MODULE_GLOBALS(stomp);
extern zend_class_entry *stomp_ce_exception;
/* {{{ DEBUG */
#if PHP_DEBUG
static void print_stomp_frame(stomp_frame_t *frame TSRMLS_DC) {
php_printf("------ START FRAME ------\n");
php_printf("%s\n", frame->command);
/* Headers */
if (frame->headers) {
char *key;
ulong pos;
zend_hash_internal_pointer_reset(frame->headers);
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
char *value = NULL;
php_printf("%s:", key);
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
php_printf("%s", value);
}
php_printf("\n");
zend_hash_move_forward(frame->headers);
}
}
php_printf("\n%s\n", frame->body);
php_printf("------ END FRAME ------\n");
}
#endif
/* }}} */
/* {{{ stomp_init
*/
stomp_t *stomp_init()
@ -77,120 +46,33 @@ stomp_t *stomp_init()
stomp->port = 0;
stomp->status = 0;
stomp->error = NULL;
stomp->error_details = NULL;
stomp->errnum = 0;
stomp->session = NULL;
stomp->options.connect_timeout_sec = 2;
stomp->options.connect_timeout_usec = 0;
stomp->options.read_timeout_sec = 2;
stomp->options.read_timeout_usec = 0;
stomp->options.read_timeout_usec = 2;
#if HAVE_STOMP_SSL
stomp->options.use_ssl = 0;
stomp->ssl_handle = NULL;
#endif
stomp->frame_stack = NULL;
stomp->read_buffer.size = 0;
return stomp;
}
/* }}} */
/* {{{ stomp_frame_stack_push
*/
static void stomp_frame_stack_push(stomp_frame_stack_t **stack, stomp_frame_t *frame)
{
stomp_frame_stack_t *cell = (stomp_frame_stack_t *) emalloc(sizeof(stomp_frame_stack_t));
cell->frame = frame;
cell->next = NULL;
if (!*stack) {
*stack = cell;
} else {
stomp_frame_stack_t *cursor = *stack;
while (cursor->next != NULL) cursor = cursor->next;
cursor->next = cell;
}
}
/* }}} */
/* {{{ stomp_frame_stack_shift
*/
static stomp_frame_t *stomp_frame_stack_shift(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
if (*stack) {
stomp_frame_stack_t *cell = *stack;
*stack = cell->next;
frame = cell->frame;
efree(cell);
}
return frame;
}
/* }}} */
/* {{{ stomp_frame_stack_clear
*/
static void stomp_frame_stack_clear(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
while ((frame = stomp_frame_stack_shift(stack))) efree(frame);
}
/* }}} */
/* {{{ stomp_set_error
*/
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...)
void stomp_set_error(stomp_t *stomp, const char *error, int errnum)
{
va_list ap;
int len;
if (error != NULL) {
if (stomp->error != NULL) {
efree(stomp->error);
stomp->error = NULL;
}
if (stomp->error_details != NULL) {
efree(stomp->error_details);
stomp->error_details = NULL;
}
stomp->errnum = errnum;
if (error != NULL) {
stomp->error = estrdup(error);
stomp->errnum = errnum;
}
if (fmt != NULL) {
stomp->error_details = emalloc(STOMP_BUFSIZE);
if (stomp->error_details == NULL) {
return; /* Nothing else can be done */
}
va_start(ap, fmt);
/*
* Would've been better to call vasprintf(), but that
* function is missing on some platforms...
*/
len = vsnprintf(stomp->error_details, STOMP_BUFSIZE, fmt, ap);
va_end(ap);
if (len < STOMP_BUFSIZE) {
stomp->error_details = erealloc(stomp->error_details, len+1);
}
}
}
/* }}} */
/* {{{ stomp_writable
*/
int stomp_writable(stomp_t *stomp)
{
int n;
n = php_pollfd_for_ms(stomp->fd, POLLOUT, 1000);
if (n != POLLOUT) {
#ifndef PHP_WIN32
if (n == 0) {
errno = ETIMEDOUT;
}
#endif
return 0;
}
return 1;
}
/* }}} */
@ -201,7 +83,7 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
char error[1024];
socklen_t size;
struct timeval tv;
int flag = 1;
fd_set rfds;
if (stomp->host != NULL)
{
@ -219,30 +101,30 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC);
if (stomp->fd == -1) {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno, "%s", strerror(errno));
stomp_set_error(stomp, error, errno);
return 0;
}
#ifdef HAVE_NETINET_IN_H
setsockopt(stomp->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
#endif
size = sizeof(stomp->localaddr);
memset(&stomp->localaddr, 0, size);
if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
stomp_set_error(stomp, error, errno, NULL);
stomp_set_error(stomp, error, errno);
return 0;
}
if (stomp_writable(stomp)) {
tv.tv_sec = 0;
tv.tv_usec = 0;
FD_ZERO(&rfds);
FD_SET(stomp->fd, &rfds);
if (select(stomp->fd + 1, NULL, &rfds, NULL, &tv) > 0) {
#if HAVE_STOMP_SSL
if (stomp->options.use_ssl) {
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
int ret;
if (NULL == ctx) {
stomp_set_error(stomp, "failed to create the SSL context", 0, NULL);
stomp_set_error(stomp, "failed to create the SSL context", 0);
return 0;
}
@ -250,15 +132,15 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
stomp->ssl_handle = SSL_new(ctx);
if (stomp->ssl_handle == NULL) {
stomp_set_error(stomp, "failed to create the SSL handle", 0, NULL);
stomp_set_error(stomp, "failed to create the SSL handle", 0);
SSL_CTX_free(ctx);
return 0;
}
SSL_set_fd(stomp->ssl_handle, stomp->fd);
if ((ret = SSL_connect(stomp->ssl_handle)) <= 0) {
stomp_set_error(stomp, "SSL/TLS handshake failed", 0, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
if (SSL_connect(stomp->ssl_handle) <= 0) {
stomp_set_error(stomp, "SSL/TLS handshake failed", 0);
SSL_shutdown(stomp->ssl_handle);
return 0;
}
@ -267,7 +149,7 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
return 1;
} else {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno, "%s", strerror(errno));
stomp_set_error(stomp, error, errno);
return 0;
}
}
@ -298,10 +180,7 @@ void stomp_close(stomp_t *stomp)
if (stomp->error) {
efree(stomp->error);
}
if (stomp->error_details) {
efree(stomp->error_details);
}
stomp_frame_stack_clear(&stomp->frame_stack);
efree(stomp);
}
/* }}} */
@ -340,7 +219,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
}
if (frame->body_length > 0) {
smart_str_appendl(&buf, "content-length:", sizeof("content-length:") - 1);
smart_str_appends(&buf, "content-length: ");
smart_str_append_long(&buf, frame->body_length);
smart_str_appendc(&buf, '\n');
}
@ -348,30 +227,23 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
smart_str_appendc(&buf, '\n');
if (frame->body > 0) {
smart_str_appendl(&buf, frame->body, frame->body_length > 0 ? frame->body_length : strlen(frame->body));
}
smart_str_appendl(&buf, "\0", sizeof("\0")-1);
if (!stomp_writable(stomp)) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
return 0;
smart_str_appends(&buf, frame->body);
}
#ifdef HAVE_STOMP_SSL
if (stomp->options.use_ssl) {
int ret;
if (-1 == (ret = SSL_write(stomp->ssl_handle, buf.c, buf.len))) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) {
char error[1024];
snprintf(error, sizeof(error), "Unable to send data");
stomp_set_error(stomp, error, errno);
return 0;
}
} else {
#endif
if (-1 == send(stomp->fd, buf.c, buf.len, 0)) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) {
char error[1024];
snprintf(error, sizeof(error), "Unable to send data");
stomp_set_error(stomp, error, errno);
return 0;
}
#ifdef HAVE_STOMP_SSL
@ -386,12 +258,9 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
/* {{{ stomp_recv
*/
static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length)
int stomp_recv(stomp_t *stomp, char *msg, size_t length)
{
int len;
stomp_select(stomp);
#if HAVE_STOMP_SSL
if(stomp->options.use_ssl) {
len = SSL_read(stomp->ssl_handle, msg, length);
@ -402,142 +271,66 @@ static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length)
}
#endif
if (len == -1) {
#if HAVE_STOMP_SSL
if (stomp->options.use_ssl) {
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL in use)", strerror(errno));
} else {
#endif
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL not in use)", strerror(errno));
#if HAVE_STOMP_SSL
}
#endif
stomp->status = -1;
} else if (len == 0) {
stomp_set_error(stomp, "Sender closed connection unexpectedly", 0, NULL);
if (len == 0) {
TSRMLS_FETCH();
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, "Unexpected EOF while reading from socket");
stomp->status = -1;
}
return len;
}
int stomp_recv(stomp_t *stomp, char *msg, const size_t length)
{
if (stomp->read_buffer.size == 0) {
if (length >= STOMP_BUFSIZE) {
return _stomp_recv(stomp, msg, length);
} else {
size_t recv_size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
if (recv_size <= length) {
memcpy(msg, stomp->read_buffer.buf, recv_size);
return recv_size;
} else {
memcpy(msg, stomp->read_buffer.buf, length);
stomp->read_buffer.pos = stomp->read_buffer.buf + length;
stomp->read_buffer.size = recv_size - length;
return length;
}
}
} else if (stomp->read_buffer.size >= length) {
memcpy(msg, stomp->read_buffer.pos, length);
stomp->read_buffer.pos += length;
stomp->read_buffer.size -= length;
return length;
} else {
int len = stomp->read_buffer.size;
memcpy(msg, stomp->read_buffer.pos, stomp->read_buffer.size);
stomp->read_buffer.size = 0;
if (stomp_select_ex(stomp, 0, 0)) {
return len + stomp_recv(stomp, msg + len, length - len);
} else {
return len;
}
}
}
/* }}} */
/* {{{ _stomp_read_until
*/
static size_t _stomp_read_until(stomp_t *stomp, char **data, const char delimiter)
{
size_t length = 0;
size_t bufsize = STOMP_BUFSIZE;
char *buffer = (char *) emalloc(STOMP_BUFSIZE);
while (1) {
unsigned int i, found;
char *c;
found = 0;
// First populate the buffer
if (stomp->read_buffer.size == 0) {
stomp->read_buffer.size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
stomp->read_buffer.pos = stomp->read_buffer.buf;
}
// Then search the delimiter
c = stomp->read_buffer.pos;
for (i = 1; i <= stomp->read_buffer.size ; i++) {
if (*c == delimiter) {
found = 1;
break;
} else {
c++;
}
}
if (!found) i--;
// Make sure we have enough place in the buffer
if ((i+length) >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
// Copy and update the buffer
memcpy(buffer + length, stomp->read_buffer.pos, i);
length += i;
stomp->read_buffer.pos += i;
stomp->read_buffer.size -= i;
if (found) {
break;
}
}
if (length) {
*data = buffer;
} else {
efree(buffer);
*data = NULL;
}
return length;
}
/* }}} */
/* {{{ stomp_read_buffer
*/
static size_t stomp_read_buffer(stomp_t *stomp, char **data)
static int stomp_read_buffer(stomp_t *stomp, char **data)
{
size_t length = _stomp_read_until(stomp, data, 0);
if (stomp_select_ex(stomp, 0, 0)) {
int rc = 0;
size_t i = 0;
size_t bufsize = STOMP_BUFSIZE + 1;
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
while (1) {
size_t length = 1;
rc = stomp_recv(stomp, buffer + i, length);
if (rc < 1) {
efree(buffer);
return -1;
}
if (1 == length) {
i++;
if (buffer[i-1] == 0) {
char endline[1];
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
if (*data) {
efree(*data);
*data = NULL;
}
efree(buffer);
return 0;
}
break;
}
if (length > 1) {
length --;
} else if (length) {
efree(*data);
*data = NULL;
length = 0;
if (i >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
return length;
}
}
if (i > 1) {
*data = (char *) emalloc(i);
if (NULL == *data) {
efree(buffer);
return -1;
}
memcpy(*data, buffer, i);
}
efree(buffer);
return i-1;
}
/* }}} */
@ -545,16 +338,52 @@ static size_t stomp_read_buffer(stomp_t *stomp, char **data)
*/
static int stomp_read_line(stomp_t *stomp, char **data)
{
size_t length = _stomp_read_until(stomp, data, '\n');
if (length > 1) {
(*data)[length - 1] = 0;
length--;
} else if (length) {
efree(*data);
*data = NULL;
length = 0;
int rc = 0;
size_t i = 0;
size_t bufsize = STOMP_BUFSIZE + 1;
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
while (1) {
size_t length = 1;
rc = stomp_recv(stomp, buffer + i, length);
if (rc < 1) {
efree(buffer);
return -1;
}
return length;
if (1 == length) {
i++;
if (buffer[i-1] == '\n') {
buffer[i-1] = 0;
break;
} else if (buffer[i-1] == 0) {
efree(buffer);
return 0;
}
if (i >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
}
}
if (i > 1) {
*data = (char *) emalloc(i);
if (NULL == *data) {
efree(buffer);
return -1;
}
memcpy(*data, buffer, i);
}
efree(buffer);
return i-1;
}
/* }}} */
@ -580,20 +409,12 @@ void stomp_free_frame(stomp_frame_t *frame)
/* {{{ stomp_read_frame
*/
stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
stomp_frame_t *stomp_read_frame(stomp_t *stomp)
{
stomp_frame_t *f = NULL;
char *cmd = NULL, *length_str = NULL;
int length = 0;
if (use_stack && stomp->frame_stack) {
return stomp_frame_stack_shift(&stomp->frame_stack);
}
if (!stomp_select(stomp)) {
return NULL;
}
INIT_STOMP_FRAME(f);
if (NULL == f) {
@ -646,27 +467,20 @@ stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
}
/* Check for the content length */
if (zend_hash_find(f->headers, "content-length", sizeof("content-length"), (void **)&length_str) == SUCCESS) {
int recv_size = 0;
if (zend_hash_find(f->headers, "content-length", strlen("content-length"), (void **)&length_str) == SUCCESS) {
char endbuffer[2];
length = 2;
f->body_length = atoi(length_str);
f->body = (char *) emalloc(f->body_length);
while (recv_size != f->body_length) {
int l = stomp_recv(stomp, f->body + recv_size, f->body_length - recv_size);
if (-1 == l) {
RETURN_READ_FRAME_FAIL;
} else {
recv_size += l;
}
}
length = stomp_recv(stomp, endbuffer, 2);
if (endbuffer[0] != '\0' || ((2 == length) && (endbuffer[1] != '\n'))) {
if (-1 == stomp_recv(stomp, f->body, f->body_length)) {
RETURN_READ_FRAME_FAIL;
}
if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
RETURN_READ_FRAME_FAIL;
}
} else {
f->body_length = stomp_read_buffer(stomp, &f->body);
}
@ -680,11 +494,9 @@ stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
int success = 1;
char *receipt = NULL;
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
stomp_frame_t *res = stomp_read_frame(stomp);
success = 0;
while (1) {
stomp_frame_t *res = stomp_read_frame_ex(stomp, 0);
if (res) {
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
char *receipt_id = NULL;
@ -692,24 +504,14 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
&& strlen(receipt) == strlen(receipt_id)
&& !strcmp(receipt, receipt_id)) {
success = 1;
} else {
stomp_set_error(stomp, "Invalid receipt", 0, "%s", receipt_id);
}
stomp_free_frame(res);
return success;
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
stomp_set_error(stomp, error_msg, 0, "%s", res->body);
stomp_set_error(stomp, error_msg, 0);
}
}
stomp_free_frame(res);
return success;
} else {
stomp_frame_stack_push(&stomp->frame_stack, res);
}
} else {
return success;
}
}
}
return success;
@ -718,26 +520,17 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
/* {{{ stomp_select
*/
int stomp_select_ex(stomp_t *stomp, const long int sec, const long int usec)
int stomp_select(stomp_t *stomp)
{
int n;
struct timeval tv;
fd_set rfds;
if (stomp->read_buffer.size || stomp->frame_stack) {
return 1;
}
tv.tv_sec = sec;
tv.tv_usec = usec;
tv.tv_sec = stomp->options.read_timeout_sec;
tv.tv_usec = stomp->options.read_timeout_usec;
n = php_pollfd_for(stomp->fd, PHP_POLLREADABLE, &tv);
if (n < 1) {
#if !defined(PHP_WIN32) && !(defined(NETWARE) && defined(USE_WINSOCK))
if (n == 0) {
errno = ETIMEDOUT;
}
#endif
return 0;
}
return 1;
FD_ZERO(&rfds);
FD_SET(stomp->fd, &rfds);
return select(stomp->fd + 1, &rfds, NULL, NULL, &tv);
}
/* }}} */

38
stomp.h Normal file → Executable file
View File

@ -45,19 +45,6 @@ typedef struct _stomp_options {
#endif
} stomp_options_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
typedef struct _stomp_frame_stack {
stomp_frame_t *frame;
struct _stomp_frame_stack *next;
} stomp_frame_stack_t;
typedef struct _stomp {
php_socket_t fd;
php_sockaddr_storage localaddr;
@ -67,31 +54,30 @@ typedef struct _stomp {
int status;
char *error;
int errnum;
char *error_details;
char *session;
#if HAVE_STOMP_SSL
SSL *ssl_handle;
#endif
stomp_frame_stack_t *frame_stack;
struct {
size_t size;
char buf[STOMP_BUFSIZE];
char *pos;
} read_buffer;
} stomp_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
stomp_t *stomp_init();
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
void stomp_close(stomp_t *stomp);
int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC);
stomp_frame_t *stomp_read_frame_ex(stomp_t *connection, int use_stack);
stomp_frame_t *stomp_read_frame(stomp_t *connection);
int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
int stomp_select_ex(stomp_t *connection, const long int sec, const long int usec);
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...) ZEND_ATTRIBUTE_PTR_FORMAT(printf, 4, 0);
int stomp_select(stomp_t *connection);
void stomp_set_error(stomp_t *stomp, const char *error, int errnum);
void stomp_free_frame(stomp_frame_t *frame);
#define stomp_select(s) stomp_select_ex(s, s->options.read_timeout_sec, s->options.read_timeout_sec)
#define stomp_read_frame(c) stomp_read_frame_ex(c, 1)
#endif /* _STOMP_H_ */
/*

View File

@ -7,4 +7,4 @@ Test stomp_version()
echo stomp_version();
?>
--EXPECTF--
%d.%d.%s
%d.%d.%d

View File

@ -1,14 +0,0 @@
--TEST--
Test stomp_connect() - Test error on CONNECT
--SKIPIF--
<?php if (!extension_loaded("stomp")) print "skip"; ?>
--FILE--
<?php
try {
$stomp = new Stomp('tcp://localhost', 'anotpresentusername1234');
} catch (Exception $e) {
var_dump(get_class($e));
}
?>
--EXPECTF--
string(14) "StompException"

View File

@ -14,6 +14,7 @@ stomp_send($link, '/queue/test-06', array());
var_dump(stomp_send($link, '/queue/test-06', ''));
var_dump(stomp_send($link, '/queue/test-06', 'A realMessage'));
var_dump(stomp_send($link, '/queue/test-06', 'بياريك شارون'));
var_dump(stomp_send($link, 'بياريك شارون', 'بياريك شارون', array('receipt' => 'message-123')), stomp_error($link));
?>
--EXPECTF--
@ -23,3 +24,5 @@ Warning: stomp_send(): Expects parameter %d to be a string or a StompFrame objec
bool(true)
bool(true)
bool(true)
bool(false)
string(%d) "%s"

View File

@ -14,6 +14,7 @@ $s->send('/queue/test-06', array());
var_dump($s->send('/queue/test-06', ''));
var_dump($s->send('/queue/test-06', 'A realMessage'));
var_dump($s->send('/queue/test-06', 'بياريك شارون'));
var_dump($s->send('بياريك شارون', 'بياريك شارون', array('receipt' => 'message-123')), $s->error());
?>
--EXPECTF--
@ -23,3 +24,5 @@ Warning: Stomp::send(): Expects parameter %d to be a string or a StompFrame obje
bool(true)
bool(true)
bool(true)
bool(false)
string(%d) "%s"

View File

@ -1,14 +0,0 @@
--TEST--
Test stomp::send() - test send with receipt
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->send('/queue/test-06', 'A real message', array('receipt' => 'message-12345')));
?>
--EXPECTF--
bool(true)

View File

@ -14,4 +14,4 @@ $s->subscribe('/queue/test', 'string');
--EXPECTF--
Warning: Stomp::subscribe(): Destination can not be empty in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::subscribe() must be %s array, string given in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::subscribe() must be an array, string given in %s on line %d

View File

@ -14,4 +14,4 @@ $s->unsubscribe('/queue/test', 'string');
--EXPECTF--
Warning: Stomp::unsubscribe(): Destination can not be empty in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::unsubscribe() must be %s array, string given in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::unsubscribe() must be an array, string given in %s on line %d

View File

@ -9,7 +9,7 @@ Test stomp::readFrame() - tests functionnality and parameters
<?php
$s = new Stomp();
$s->send('/queue/test-09', 'A test Message');
$s->subscribe('/queue/test-09', array('ack' => 'auto'));
$s->subscribe('/queue/test-09');
var_dump($s->readFrame()->body);
var_dump($s->readFrame());

View File

@ -9,7 +9,7 @@ Test stomp_read_frame() - test functionnality and parameters
<?php
$link = stomp_connect();
stomp_send($link, '/queue/test-09', 'A test Message');
stomp_subscribe($link, '/queue/test-09', array('ack' => 'auto'));
stomp_subscribe($link, '/queue/test-09');
$result = stomp_read_frame($link);
var_dump($result['body']);
var_dump(stomp_read_frame($link));

View File

@ -18,7 +18,7 @@ class customFrame extends stompFrame
$s = new Stomp();
$s->send('/queue/test-09', 'A test Message');
$s->subscribe('/queue/test-09', array('ack' => 'auto'));
$s->subscribe('/queue/test-09');
$frame = $s->readFrame('customFrame');
var_dump(get_class($frame), $frame->body);
?>

Binary file not shown.

Binary file not shown.

View File

@ -1,26 +0,0 @@
--TEST--
Test stomp::readFrame() - test frame stack
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->subscribe('/queue/test-buffer', array('ack' => 'auto')));
var_dump($s->send('/queue/test-buffer', "Message1", array('receipt' => 'msg-1')));
var_dump($s->send('/queue/test-buffer', "Message2", array('receipt' => 'msg-2')));
var_dump($s->send('/queue/test-buffer', "Message3", array('receipt' => 'msg-3')));
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
?>
--EXPECTF--
bool(true)
bool(true)
bool(true)
bool(true)
string(8) "Message1"
string(8) "Message2"
string(8) "Message3"

View File

@ -17,7 +17,7 @@ var_dump($s->send('/queue/test-011-commit', 'bar', array('transaction' => 't1'))
// sends a message to the queue and asks for a receipt
$s->send('/queue/test-011-commit', 'bar', array('transaction' => 't2', 'receipt' => 'tptp'));
echo gettype($s->error()) . PHP_EOL;
var_dump($s->error());
// commits a valid transaction
var_dump($s->commit('t1'));
@ -28,15 +28,15 @@ var_dump($s->commit(null));
// commits a non valid transaction (a transaction id that does not exist) and asks for a receipt
$s->commit('t2', array('receipt' => 'commit-key'));
echo gettype($s->error());
var_dump($s->error());
unset($s);
?>
--EXPECTF--
bool(true)
bool(true)
string
string(%d) "Invalid transaction id: %s"
bool(true)
bool(false)
bool(true)
string
string(%d) "Must specify the transaction you are committing"

View File

@ -19,7 +19,7 @@ try {
$stomp->send($queue, $msg);
/* subscribe to messages from the queue 'foo' */
$stomp->subscribe($queue, array('ack' => 'auto'));
$stomp->subscribe($queue);
/* read a frame */
$frame = $stomp->readFrame();