Compare commits

..

1 Commits

Author SHA1 Message Date
Pierrick Charron ced3c2904e Tag release 1.0.0 2010-02-12 04:58:52 +00:00
25 changed files with 264 additions and 868 deletions

28
.gitignore vendored
View File

@ -1,28 +0,0 @@
.deps
.libs/
Makefile
Makefile.fragments
Makefile.global
Makefile.objects
acinclude.m4
aclocal.m4
autom4te.cache/
config.guess
config.h
config.h.in
config.log
config.nice
config.status
config.sub
configure
configure.in
install-sh
libtool
ltmain.sh
missing
mkinstalldirs
modules/
php_stomp.lo
run-tests.php
stomp.la
stomp.lo

68
LICENSE
View File

@ -1,68 +0,0 @@
--------------------------------------------------------------------
The PHP License, version 3.01
Copyright (c) 1999 - 2014 The PHP Group. All rights reserved.
--------------------------------------------------------------------
Redistribution and use in source and binary forms, with or without
modification, is permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
3. The name "PHP" must not be used to endorse or promote products
derived from this software without prior written permission. For
written permission, please contact group@php.net.
4. Products derived from this software may not be called "PHP", nor
may "PHP" appear in their name, without prior written permission
from group@php.net. You may indicate that your software works in
conjunction with PHP by saying "Foo for PHP" instead of calling
it "PHP Foo" or "phpfoo"
5. The PHP Group may publish revised and/or new versions of the
license from time to time. Each version will be given a
distinguishing version number.
Once covered code has been published under a particular version
of the license, you may always continue to use it under the terms
of that version. You may also choose to use such covered code
under the terms of any subsequent version of the license
published by the PHP Group. No one other than the PHP Group has
the right to modify the terms applicable to covered code created
under this License.
6. Redistributions of any form whatsoever must retain the following
acknowledgment:
"This product includes PHP software, freely available from
<http://www.php.net/software/>".
THIS SOFTWARE IS PROVIDED BY THE PHP DEVELOPMENT TEAM ``AS IS'' AND
ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PHP
DEVELOPMENT TEAM OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
This software consists of voluntary contributions made by many
individuals on behalf of the PHP Group.
The PHP Group can be contacted via Email at group@php.net.
For more information on the PHP Group and the PHP project,
please see <http://www.php.net>.
PHP includes the Zend Engine, freely available at
<http://www.zend.com>.

5
TODO
View File

@ -1,5 +0,0 @@
- Add the ability to connect to a network of brokers
(Cf: http://activemq.apache.org/networks-of-brokers.html)
- Add Stomp 1.1 full support
- Verify if we need to call recv with MSG_PEEK for the newline char
or if the current implementation is enough (cf bug #59217)

View File

@ -5,12 +5,11 @@ class Stomp {
/** /**
* Connect to server * Connect to server
* *
* @param string $broker The broker URI * @param string $broker Broker URI
* @param string $username The username * @param string $username The username
* @param string $password The password * @param string $password The password
* @param array $headers additional headers (example: receipt).
*/ */
public function __construct($broker = null, $username = null, $password = null, array $headers = array()) { public function __construct($broker = null, $username = null, $password = null) {
} }
/** /**
@ -34,30 +33,30 @@ class Stomp {
* *
* @param string $destination indicates where to send the message * @param string $destination indicates where to send the message
* @param string|StompFrame $msg message to be sent * @param string|StompFrame $msg message to be sent
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function send($destination, $msg, array $headers = array()) { public function send($destination, $msg, array $properties = array()) {
} }
/** /**
* Register to listen to a given destination * Register to listen to a given destination
* *
* @param string $destination indicates which destination to subscribe to * @param string $destination indicates which destination to subscribe to
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function subscribe($destination, array $headers = array()) { public function subscribe($destination, array $properties = array()) {
} }
/** /**
* Remove an existing subscription * Remove an existing subscription
* *
* @param string $destination indicates which subscription to remove * @param string $destination indicates which subscription to remove
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function unsubscribe($destination, array $headers = array()) { public function unsubscribe($destination, array $properties = array()) {
} }
/** /**
@ -108,10 +107,10 @@ class Stomp {
* Acknowledge consumption of a message from a subscription using client acknowledgment * Acknowledge consumption of a message from a subscription using client acknowledgment
* *
* @param string|StompFrame $msg message/messageId to be acknowledged * @param string|StompFrame $msg message/messageId to be acknowledged
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function ack($msg, array $headers = array()) { public function ack($msg, array $properties = array()) {
} }
/** /**
@ -164,12 +163,4 @@ class StompFrame {
} }
class StompException extends Exception { class StompException extends Exception {
/**
* Get the stomp server error details
*
* @return string
*/
public function getDetails() {
}
} }

View File

@ -14,10 +14,9 @@ function stomp_version() {
* @param string $broker broker URI * @param string $broker broker URI
* @param string $username The username * @param string $username The username
* @param string $password The password * @param string $password The password
* @param array $headers additional headers (example: receipt).
* @return Ressource stomp connection identifier on success, or FALSE on failure * @return Ressource stomp connection identifier on success, or FALSE on failure
*/ */
function stomp_connect($broker = null, $username = null, $password = null, array $headers = array()) { function stomp_connect($broker = null, $username = null, $password = null) {
} }
/** /**
@ -44,10 +43,10 @@ function stomp_close($link) {
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates where to send the message * @param string $destination indicates where to send the message
* @param string|StompFrame $msg message to be sent * @param string|StompFrame $msg message to be sent
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_send($link, $destination, $msg, array $headers = array()) { function stomp_send($link, $destination, $msg, array $properties = array()) {
} }
/** /**
@ -55,10 +54,10 @@ function stomp_send($link, $destination, $msg, array $headers = array()) {
* *
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates which destination to subscribe to * @param string $destination indicates which destination to subscribe to
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_subscribe($link, $destination, array $headers = array()) { function stomp_subscribe($link, $destination, array $properties = array()) {
} }
/** /**
@ -66,10 +65,10 @@ function stomp_subscribe($link, $destination, array $headers = array()) {
* *
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates which subscription to remove * @param string $destination indicates which subscription to remove
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction, id)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_unsubscribe($link, $destination, array $headers = array()) { function stomp_unsubscribe($link, $destination, array $properties = array()) {
} }
/** /**
@ -125,10 +124,10 @@ function stomp_abort($link, $transaction_id) {
* *
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string|StompFrame $msg message/messageId to be acknowledged * @param string|StompFrame $msg message/messageId to be acknowledged
* @param array $headers additional headers (example: receipt). * @param array $properties extra properties (example: receipt, transaction)
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_ack($link, $msg, array $headers = array()) { function stomp_ack($link, $msg, array $properties = array()) {
} }
/** /**

View File

@ -4,7 +4,7 @@
<channel>pecl.php.net</channel> <channel>pecl.php.net</channel>
<summary>Stomp client extension</summary> <summary>Stomp client extension</summary>
<description> <description>
This extension allows php applications to communicate with any Stomp compliant Message Brokers through easy object oriented and procedural interfaces. This extension allows php applications to communicate with any Stomp compliant Message Brokers through easy object oriented and procedural interfaces.
</description> </description>
<lead> <lead>
<name>Pierrick Charron</name> <name>Pierrick Charron</name>
@ -12,22 +12,17 @@ This extension allows php applications to communicate with any Stomp compliant M
<email>pierrick@php.net</email> <email>pierrick@php.net</email>
<active>yes</active> <active>yes</active>
</lead> </lead>
<lead> <date>2010-02-11</date>
<name>Gennady Feldman</name> <version><release>1.0.0</release><api>1.0.0</api></version>
<user>gena01</user>
<email>gena01@php.net</email>
<active>yes</active>
</lead>
<date>XXXX-XX-XX</date>
<version><release>1.0.X</release><api>1.0.X</api></version>
<stability><release>stable</release><api>stable</api></stability> <stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license> <license uri="http://www.php.net/license">PHP License</license>
<notes> <notes>
- Bump to stable
</notes> </notes>
<contents> <contents>
<dir name="/"> <dir name="/">
<file role="doc" name="CREDITS" /> <file role="doc" name="CREDITS" />
<file role="doc" name="LICENSE" />
<file role="src" name="config.m4" /> <file role="src" name="config.m4" />
<file role="src" name="config.w32" /> <file role="src" name="config.w32" />
<file role="src" name="php_stomp.c" /> <file role="src" name="php_stomp.c" />
@ -48,8 +43,6 @@ This extension allows php applications to communicate with any Stomp compliant M
<file role="test" name="tests/009-readFrame/001.phpt" /> <file role="test" name="tests/009-readFrame/001.phpt" />
<file role="test" name="tests/009-readFrame/002.phpt" /> <file role="test" name="tests/009-readFrame/002.phpt" />
<file role="test" name="tests/009-readFrame/003.phpt" /> <file role="test" name="tests/009-readFrame/003.phpt" />
<file role="test" name="tests/009-readFrame/004.phpt" />
<file role="test" name="tests/009-readFrame/005.phpt" />
<file role="test" name="tests/010-timeout/001.phpt" /> <file role="test" name="tests/010-timeout/001.phpt" />
<file role="test" name="tests/010-timeout/002.phpt" /> <file role="test" name="tests/010-timeout/002.phpt" />
<file role="test" name="tests/011-commit/001.phpt" /> <file role="test" name="tests/011-commit/001.phpt" />
@ -79,111 +72,11 @@ This extension allows php applications to communicate with any Stomp compliant M
<providesextension>stomp</providesextension> <providesextension>stomp</providesextension>
<extsrcrelease> <extsrcrelease>
<configureoption default="/usr" name="with-openssl-dir" prompt="OpenSSL install prefix (no to disable SSL support)" /> <configureoption default="/usr" name="with-openssl-dir" prompt="OpenSSL install prefix (no to disable SSL support)" />
</extsrcrelease> </extsrcrelease>
<changelog> <changelog>
<release> <release>
<version><release>1.0.8</release><api>1.0.8</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-18</date>
<notes>
- Fix perm on source files. (Remi)
- Fixing PHP_STOMP_VERSION constant, per Remi's request. (Gennady)
</notes>
</release>
<release>
<version><release>1.0.7</release><api>1.0.7</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-15</date>
<notes>
- add LICENSE file as documentation (Remi)
- Fixed Windows compilation regression due to new TCP_NODELAY code. (Gennady Feldman)
- Fixed bug where error checking was missing after stomp_send(). (Gennady Feldman)
</notes>
</release>
<release>
<version><release>1.0.6</release><api>1.0.6</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2014-12-07</date>
<notes>
- Add two new ini options stomp.default_username and stomp.default_passowrd (Pierrick)
- General performance improvements (Pierrick)
- Fix stomp_read_frame when buffered (Pierrick)
- Fixed bug #59217 (Connections to RabbitMQ via CLI). (Pierrick).
- Fixed bug #59970 (acking a message makes rabbitmq disconnect the server). (Pierrick)
- Fixed bug #67170 (Disable Nagle's Algorithm with TCP_NODELAY, it delays sending small messages). (Yarek Tyshchenko)
- Fixed bug #68497 (Stomp client doesn't parse ERROR response on CONNECT). (Lorenzo Fontana)
- Fixed bug #64671 (Add stomp_nack and Stomp::nack functions). (Pierrick)
</notes>
</release>
<release>
<version><release>1.0.5</release><api>1.0.5</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2012-11-18</date>
<notes>
- Fix memory leak when Stomp can't write the message on the queue. (Pierrick)
- Add a buffer for receipts. (Pierrick)
- Fixed bug #62831 (Stomp module seems not initializing SSL library first).
(Patch by lwhsu at lwhsu dot org)
- Fixed bug #59972 (Message body are not binary safe). (Pierrick)
</notes>
</release>
<release>
<version><release>1.0.4</release><api>1.0.4</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2012-09-17</date>
<notes>
- Fix compatibility with 5.4
</notes>
</release>
<release>
<version><release>1.0.3</release><api>1.0.3</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-10-12</date>
<notes>
- Fixed bug #18772 (setTimeout usecs not honored)
</notes>
</release>
<release>
<version><release>1.0.2</release><api>1.0.2</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-08-13</date>
<notes>
- Fixed SSL connection bug introduced in 1.0.1
</notes>
</release>
<release>
<version><release>1.0.1</release><api>1.0.1</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-08-03</date>
<notes>
- Add new parameter to the constructor to allow client to send extra informations
- Add zend engine runtime cache support (introduced into trunk)
- Add new details property in the StompException class
- Add new StompException::getDetails() method
- Add the frame body content in the Stomp::Error() method
- Fixed bug #17262 (Server is not responding on win32)
</notes>
</release>
<release>
<version><release>1.0.0</release><api>1.0.0</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-02-11</date>
<notes>
- Bump to stable
</notes>
</release>
<release>
<version><release>0.4.1</release><api>0.4.1</api></version> <version><release>0.4.1</release><api>0.4.1</api></version>
<stability><release>beta</release><api>beta</api></stability> <stability><release>beta</release><api>beta</api></stability>
<license uri="http://www.php.net/license">PHP License</license> <license uri="http://www.php.net/license">PHP License</license>
@ -207,10 +100,10 @@ This extension allows php applications to communicate with any Stomp compliant M
<license uri="http://www.php.net/license">PHP License</license> <license uri="http://www.php.net/license">PHP License</license>
<date>2009-11-22</date> <date>2009-11-22</date>
<notes> <notes>
- Adds alt class - Adds alt class
- Fixed bug #16936 (Module segfaults on readFrame if Frame > STOMP_BUFSIZE) - Fixed bug #16936 (Module segfaults on readFrame if Frame > STOMP_BUFSIZE)
- Fixed bug #16933 (readFrame does not notice when server shuts down) - Fixed bug #16933 (readFrame does not notice when server shuts down)
- Fixed bug #16930 (readFrame reports error-frames as "timeout") - Fixed bug #16930 (readFrame reports error-frames as "timeout")
</notes> </notes>
</release> </release>
<release> <release>
@ -232,8 +125,8 @@ This extension allows php applications to communicate with any Stomp compliant M
- new stomp_connect_error() function (pierrick) - new stomp_connect_error() function (pierrick)
- stomp_begin, stomp_abort and stomp_commit now accept additional headers (pierrick) - stomp_begin, stomp_abort and stomp_commit now accept additional headers (pierrick)
- new connection timeout and read timeout ini configuration (pierrick) - new connection timeout and read timeout ini configuration (pierrick)
- Fix a memory leak in stomp_read_line (pierrick) - Fix a memory leak in stomp_read_line (pierrick)
- Better set of test (Pierrick and Anis) - Better set of test (Pierrick and Anis)
</notes> </notes>
</release> </release>
<release> <release>
@ -243,7 +136,7 @@ This extension allows php applications to communicate with any Stomp compliant M
<date>2009-11-01</date> <date>2009-11-01</date>
<notes> <notes>
- Windows build fix (kalle) - Windows build fix (kalle)
- Add SSL support (pierrick) - Add SSL support (pierrick)
</notes> </notes>
</release> </release>
<release> <release>
@ -252,7 +145,7 @@ This extension allows php applications to communicate with any Stomp compliant M
<license uri="http://www.php.net/license">PHP License</license> <license uri="http://www.php.net/license">PHP License</license>
<date>2009-10-30</date> <date>2009-10-30</date>
<notes> <notes>
- Initial PECL release. (pierrick) - Initial PECL release. (pierrick)
</notes> </notes>
</release> </release>
</changelog> </changelog>

188
php_stomp.c Normal file → Executable file
View File

@ -51,6 +51,7 @@
zval **value = NULL; \ zval **value = NULL; \
char *string_key = NULL; \ char *string_key = NULL; \
ulong num_key; \ ulong num_key; \
zend_hash_internal_pointer_reset(headers_ht); \
for (zend_hash_internal_pointer_reset(headers_ht); \ for (zend_hash_internal_pointer_reset(headers_ht); \
zend_hash_get_current_data(headers_ht, (void **)&value) == SUCCESS; \ zend_hash_get_current_data(headers_ht, (void **)&value) == SUCCESS; \
zend_hash_move_forward(headers_ht)) { \ zend_hash_move_forward(headers_ht)) { \
@ -62,9 +63,7 @@
SEPARATE_ZVAL(value); \ SEPARATE_ZVAL(value); \
convert_to_string(*value); \ convert_to_string(*value); \
} \ } \
if (strcmp(string_key, "content-length") != 0) { \ zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
}\
efree(string_key); \ efree(string_key); \
} \ } \
} }
@ -73,29 +72,16 @@
zend_hash_destroy(frame.headers); \ zend_hash_destroy(frame.headers); \
efree(frame.headers); efree(frame.headers);
#define STOMP_ERROR(errno, msg) \ #define STOMP_ERROR(errno, msg, ... ) \
STOMP_G(error_no) = errno; \ STOMP_G(error_no) = errno; \
if (STOMP_G(error_msg)) { \ if (STOMP_G(error_msg)) { \
efree(STOMP_G(error_msg)); \ efree(STOMP_G(error_msg)); \
} \ } \
STOMP_G(error_msg) = estrdup(msg); \ STOMP_G(error_msg) = estrdup(msg); \
if (stomp_object) { \ if (stomp_object) { \
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg); \ zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg, ##__VA_ARGS__); \
} }
#define STOMP_ERROR_DETAILS(errno, msg, details) \
STOMP_G(error_no) = errno; \
if (STOMP_G(error_msg)) { \
efree(STOMP_G(error_msg)); \
} \
STOMP_G(error_msg) = estrdup(msg); \
if (stomp_object) { \
zval *object = zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg); \
if (details) { \
zend_update_property_string(stomp_ce_exception, object, "details", sizeof("details")-1, (char *) details TSRMLS_CC); \
} \
}
static int le_stomp; static int le_stomp;
ZEND_DECLARE_MODULE_GLOBALS(stomp) ZEND_DECLARE_MODULE_GLOBALS(stomp)
@ -115,7 +101,6 @@ ZEND_BEGIN_ARG_INFO_EX(stomp_connect_args, 0, 0, 0)
ZEND_ARG_INFO(0, broker) ZEND_ARG_INFO(0, broker)
ZEND_ARG_INFO(0, username) ZEND_ARG_INFO(0, username)
ZEND_ARG_INFO(0, password) ZEND_ARG_INFO(0, password)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO() ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_link_only, 0, 0, 1) ZEND_BEGIN_ARG_INFO_EX(stomp_link_only, 0, 0, 1)
@ -177,17 +162,6 @@ ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1) ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO() ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_nack_args, 0, 0, 2)
ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_oop_nack_args, 0, 0, 1)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_set_read_timeout_args, 0, 0, 2) ZEND_BEGIN_ARG_INFO_EX(stomp_set_read_timeout_args, 0, 0, 2)
ZEND_ARG_INFO(0, link) ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, seconds) ZEND_ARG_INFO(0, seconds)
@ -222,7 +196,6 @@ zend_function_entry stomp_functions[] = {
PHP_FE(stomp_commit, stomp_transaction_args) PHP_FE(stomp_commit, stomp_transaction_args)
PHP_FE(stomp_abort, stomp_transaction_args) PHP_FE(stomp_abort, stomp_transaction_args)
PHP_FE(stomp_ack, stomp_ack_args) PHP_FE(stomp_ack, stomp_ack_args)
PHP_FE(stomp_nack, stomp_nack_args)
PHP_FE(stomp_error, stomp_link_only) PHP_FE(stomp_error, stomp_link_only)
PHP_FE(stomp_set_read_timeout, stomp_set_read_timeout_args) PHP_FE(stomp_set_read_timeout, stomp_set_read_timeout_args)
PHP_FE(stomp_get_read_timeout, stomp_link_only) PHP_FE(stomp_get_read_timeout, stomp_link_only)
@ -244,7 +217,6 @@ static zend_function_entry stomp_methods[] = {
PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args) PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args)
PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args) PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args)
PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args) PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args)
PHP_FALIAS(nack, stomp_nack, stomp_oop_nack_args)
PHP_FALIAS(error, stomp_error, stomp_no_args) PHP_FALIAS(error, stomp_error, stomp_no_args)
PHP_FALIAS(setReadTimeout, stomp_set_read_timeout, stomp_oop_set_read_timeout_args) PHP_FALIAS(setReadTimeout, stomp_set_read_timeout, stomp_oop_set_read_timeout_args)
PHP_FALIAS(getReadTimeout, stomp_get_read_timeout, stomp_no_args) PHP_FALIAS(getReadTimeout, stomp_get_read_timeout, stomp_no_args)
@ -259,13 +231,6 @@ static zend_function_entry stomp_frame_methods[] = {
}; };
/* }}} */ /* }}} */
/* {{{ stomp_exception_methods[] */
static zend_function_entry stomp_exception_methods[] = {
PHP_ME(stompexception, getDetails, stomp_no_args, ZEND_ACC_PUBLIC)
{NULL, NULL, NULL}
};
/* }}} */
/* {{{ stomp_module_entry */ /* {{{ stomp_module_entry */
zend_module_entry stomp_module_entry = { zend_module_entry stomp_module_entry = {
#if ZEND_MODULE_API_NO >= 20010901 #if ZEND_MODULE_API_NO >= 20010901
@ -291,8 +256,6 @@ zend_module_entry stomp_module_entry = {
PHP_INI_BEGIN() PHP_INI_BEGIN()
STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_username", "", PHP_INI_ALL, OnUpdateString, default_username, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_password", "", PHP_INI_ALL, OnUpdateString, default_password, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, read_timeout_sec, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_read_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, read_timeout_sec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, read_timeout_usec, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_read_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, read_timeout_usec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_connection_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, connection_timeout_sec, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_connection_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, connection_timeout_sec, zend_stomp_globals, stomp_globals)
@ -303,15 +266,10 @@ PHP_INI_END()
static PHP_GINIT_FUNCTION(stomp) static PHP_GINIT_FUNCTION(stomp)
{ {
stomp_globals->default_broker = NULL; stomp_globals->default_broker = NULL;
stomp_globals->default_username = NULL;
stomp_globals->default_password = NULL;
stomp_globals->read_timeout_sec = 2; stomp_globals->read_timeout_sec = 2;
stomp_globals->read_timeout_usec = 0; stomp_globals->read_timeout_usec = 0;
stomp_globals->connection_timeout_sec = 2; stomp_globals->connection_timeout_sec = 2;
stomp_globals->connection_timeout_usec = 0; stomp_globals->connection_timeout_usec = 0;
#if HAVE_STOMP_SSL
SSL_library_init();
#endif
} }
/* }}} */ /* }}} */
@ -349,26 +307,17 @@ static void stomp_object_free_storage(stomp_object_t *intern TSRMLS_DC)
} }
#if (PHP_MAJOR_VERSION == 5 && PHP_MINOR_VERSION >= 4) || (PHP_MAJOR_VERSION > 5)
#define PHP_STOMP_RUNTIME_CACHE
#endif
static zend_object_value php_stomp_new(zend_class_entry *ce TSRMLS_DC) static zend_object_value php_stomp_new(zend_class_entry *ce TSRMLS_DC)
{ {
zend_object_value retval; zend_object_value retval;
stomp_object_t *intern; stomp_object_t *intern;
#ifndef PHP_STOMP_RUNTIME_CACHE
zval *tmp; zval *tmp;
#endif
intern = (stomp_object_t *) ecalloc(1, sizeof(stomp_object_t)); intern = (stomp_object_t *) ecalloc(1, sizeof(stomp_object_t));
intern->stomp = NULL; intern->stomp = NULL;
zend_object_std_init(&intern->std, ce TSRMLS_CC); zend_object_std_init(&intern->std, ce TSRMLS_CC);
#ifdef PHP_STOMP_RUNTIME_CACHE
object_properties_init(&intern->std, ce);
#else
zend_hash_copy(intern->std.properties, &ce->default_properties, (copy_ctor_func_t) zval_add_ref, (void *) &tmp, sizeof(zval *)); zend_hash_copy(intern->std.properties, &ce->default_properties, (copy_ctor_func_t) zval_add_ref, (void *) &tmp, sizeof(zval *));
#endif
retval.handle = zend_objects_store_put(intern, (zend_objects_store_dtor_t)zend_objects_destroy_object, (zend_objects_free_object_storage_t) stomp_object_free_storage, NULL TSRMLS_CC); retval.handle = zend_objects_store_put(intern, (zend_objects_store_dtor_t)zend_objects_destroy_object, (zend_objects_free_object_storage_t) stomp_object_free_storage, NULL TSRMLS_CC);
retval.handlers = zend_get_std_object_handlers(); retval.handlers = zend_get_std_object_handlers();
@ -400,12 +349,9 @@ PHP_MINIT_FUNCTION(stomp)
zend_declare_property_null(stomp_ce_frame, "body", sizeof("body")-1, ZEND_ACC_PUBLIC TSRMLS_CC); zend_declare_property_null(stomp_ce_frame, "body", sizeof("body")-1, ZEND_ACC_PUBLIC TSRMLS_CC);
/* Register StompException class */ /* Register StompException class */
INIT_CLASS_ENTRY(ce, PHP_STOMP_EXCEPTION_CLASSNAME, stomp_exception_methods); INIT_CLASS_ENTRY(ce, PHP_STOMP_EXCEPTION_CLASSNAME, NULL);
stomp_ce_exception = zend_register_internal_class_ex(&ce, zend_exception_get_default(TSRMLS_C), NULL TSRMLS_CC); stomp_ce_exception = zend_register_internal_class_ex(&ce, zend_exception_get_default(TSRMLS_C), NULL TSRMLS_CC);
/* Properties */
zend_declare_property_null(stomp_ce_exception, "details", sizeof("details")-1, ZEND_ACC_PRIVATE TSRMLS_CC);
/** Register INI entries **/ /** Register INI entries **/
REGISTER_INI_ENTRIES(); REGISTER_INI_ENTRIES();
@ -451,8 +397,6 @@ PHP_MINFO_FUNCTION(stomp)
php_info_print_table_row(2, "API version", PHP_STOMP_VERSION); php_info_print_table_row(2, "API version", PHP_STOMP_VERSION);
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
php_info_print_table_row(2, "SSL Support", "enabled"); php_info_print_table_row(2, "SSL Support", "enabled");
#else
php_info_print_table_row(2, "SSL Support", "disabled");
#endif #endif
php_info_print_table_end(); php_info_print_table_end();
DISPLAY_INI_ENTRIES(); DISPLAY_INI_ENTRIES();
@ -467,22 +411,25 @@ PHP_FUNCTION(stomp_version)
} }
/* }}} */ /* }}} */
/* {{{ proto Stomp::__construct([string broker [, string username [, string password [, array headers]]]]) /* {{{ proto Stomp::__construct([string broker [, string username [, string password]]])
Connect to server */ Connect to server */
PHP_FUNCTION(stomp_connect) PHP_FUNCTION(stomp_connect)
{ {
zval *stomp_object = getThis(); zval *stomp_object = getThis();
zval *headers = NULL;
stomp_t *stomp = NULL; stomp_t *stomp = NULL;
char *broker = NULL, *username = NULL, *password = NULL; char *broker = NULL, *username = NULL, *password = NULL;
int broker_len = 0, username_len = 0, password_len = 0; int broker_len = 0, username_len = 0, password_len = 0;
struct timeval tv;
php_url *url_parts; php_url *url_parts;
#ifdef HAVE_STOMP_SSL #ifdef HAVE_STOMP_SSL
int use_ssl = 0; int use_ssl = 0;
#endif #endif
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sssa!", &broker, &broker_len, &username, &username_len, &password, &password_len, &headers) == FAILURE) { tv.tv_sec = 2;
tv.tv_usec = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sss", &broker, &broker_len, &username, &username_len, &password, &password_len) == FAILURE) {
return; return;
} }
@ -531,52 +478,28 @@ PHP_FUNCTION(stomp_connect)
if (stomp->status) { if (stomp->status) {
stomp_frame_t *res; stomp_frame_t *res;
int rres;
stomp_frame_t frame = {0}; stomp_frame_t frame = {0};
INIT_FRAME(frame, "CONNECT"); INIT_FRAME(frame, "CONNECT");
if (!username) { if (username_len == 0) {
username = STOMP_G(default_username); username = "";
username_len = strlen(username);
} }
if (!password) { if (password_len == 0) {
password = STOMP_G(default_password); password = "";
password_len = strlen(password);
} }
zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL); zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL);
zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL); zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL);
if (NULL != headers) { stomp_send(stomp, &frame TSRMLS_CC);
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
}
rres = stomp_send(stomp, &frame TSRMLS_CC);
CLEAR_FRAME(frame); CLEAR_FRAME(frame);
if (0 == rres) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, stomp->errnum TSRMLS_CC, stomp->error);
if (stomp->error_details) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, stomp->error_details TSRMLS_CC);
}
return;
}
/* Retreive Response */ /* Retreive Response */
res = stomp_read_frame(stomp); res = stomp_read_frame(stomp);
if (NULL == res) { if (NULL == res) {
STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING); STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING);
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg);
if (res->body) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
}
stomp_free_frame(res);
}
} else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) { } else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) {
if (stomp->error) { if (stomp->error) {
STOMP_ERROR_DETAILS(stomp->errnum, stomp->error, stomp->error_details); STOMP_ERROR(stomp->errnum, stomp->error);
} else { } else {
STOMP_ERROR(0, PHP_STOMP_ERR_UNKNOWN); STOMP_ERROR(0, PHP_STOMP_ERR_UNKNOWN);
} }
@ -605,7 +528,7 @@ PHP_FUNCTION(stomp_connect)
} }
} }
} else { } else {
STOMP_ERROR_DETAILS(0, stomp->error, stomp->error_details); STOMP_ERROR(0, stomp->error);
} }
stomp_close(stomp); stomp_close(stomp);
@ -725,7 +648,7 @@ PHP_FUNCTION(stomp_send)
if (Z_TYPE_P(msg) == IS_STRING) { if (Z_TYPE_P(msg) == IS_STRING) {
frame.body = Z_STRVAL_P(msg); frame.body = Z_STRVAL_P(msg);
frame.body_length = Z_STRLEN_P(msg); frame.body_length = -1;
} else if (Z_TYPE_P(msg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(msg), stomp_ce_frame TSRMLS_CC)) { } else if (Z_TYPE_P(msg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(msg), stomp_ce_frame TSRMLS_CC)) {
zval *frame_obj_prop = NULL; zval *frame_obj_prop = NULL;
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "command", sizeof("command")-1, 1 TSRMLS_CC); frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "command", sizeof("command")-1, 1 TSRMLS_CC);
@ -736,7 +659,7 @@ PHP_FUNCTION(stomp_send)
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "body", sizeof("body")-1, 1 TSRMLS_CC); frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "body", sizeof("body")-1, 1 TSRMLS_CC);
if (Z_TYPE_P(frame_obj_prop) == IS_STRING) { if (Z_TYPE_P(frame_obj_prop) == IS_STRING) {
frame.body = Z_STRVAL_P(frame_obj_prop); frame.body = Z_STRVAL_P(frame_obj_prop);
frame.body_length = Z_STRLEN_P(frame_obj_prop); frame.body_length = -1;
} }
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "headers", sizeof("headers")-1, 1 TSRMLS_CC); frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "headers", sizeof("headers")-1, 1 TSRMLS_CC);
if (Z_TYPE_P(frame_obj_prop) == IS_ARRAY) { if (Z_TYPE_P(frame_obj_prop) == IS_ARRAY) {
@ -747,8 +670,6 @@ PHP_FUNCTION(stomp_send)
CLEAR_FRAME(frame); CLEAR_FRAME(frame);
RETURN_FALSE; RETURN_FALSE;
} }
if (frame.body_length > 0 && strnlen(frame.body, frame.body_length) >= frame.body_length)
frame.body_length = 0;
if (stomp_send(stomp, &frame TSRMLS_CC) > 0) { if (stomp_send(stomp, &frame TSRMLS_CC) > 0) {
success = stomp_valid_receipt(stomp, &frame); success = stomp_valid_receipt(stomp, &frame);
@ -891,6 +812,7 @@ PHP_FUNCTION(stomp_read_frame)
zval *stomp_object = getThis(); zval *stomp_object = getThis();
stomp_t *stomp = NULL; stomp_t *stomp = NULL;
stomp_frame_t *res = NULL; stomp_frame_t *res = NULL;
int sel_res = 0;
char *class_name = NULL; char *class_name = NULL;
int class_name_len = 0; int class_name_len = 0;
zend_class_entry *ce = NULL; zend_class_entry *ce = NULL;
@ -920,17 +842,13 @@ PHP_FUNCTION(stomp_read_frame)
} }
if ((res = stomp_read_frame(stomp))) { if ((sel_res = stomp_select(stomp)) > 0 && (res = stomp_read_frame(stomp))) {
zval *headers = NULL; zval *headers = NULL;
if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) { if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL; char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) { if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg); STOMP_ERROR(0, error_msg)
if (res->body) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
}
stomp_free_frame(res); stomp_free_frame(res);
RETURN_FALSE; RETURN_FALSE;
} }
@ -1034,13 +952,16 @@ PHP_FUNCTION(stomp_read_frame)
array_init(return_value); array_init(return_value);
add_assoc_string_ex(return_value, "command", sizeof("command"), res->command, 1); add_assoc_string_ex(return_value, "command", sizeof("command"), res->command, 1);
if (res->body) { if (res->body) {
add_assoc_stringl_ex(return_value, "body", sizeof("body"), res->body, res->body_length, 1); add_assoc_string_ex(return_value, "body", sizeof("body"), res->body, 1);
} }
add_assoc_zval_ex(return_value, "headers", sizeof("headers"), headers); add_assoc_zval_ex(return_value, "headers", sizeof("headers"), headers);
} }
stomp_free_frame(res); stomp_free_frame(res);
} else { } else {
if (sel_res == -1) {
STOMP_ERROR(0, "Error while selecting from socket: %d", errno);
}
RETURN_FALSE; RETURN_FALSE;
} }
} }
@ -1114,9 +1035,10 @@ PHP_FUNCTION(stomp_abort)
} }
/* }}} */ /* }}} */
/* {{{ _php_stomp_acknowledgment /* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
*/ Acknowledge consumption of a message from a subscription using client acknowledgment */
static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) { PHP_FUNCTION(stomp_ack)
{
zval *stomp_object = getThis(); zval *stomp_object = getThis();
zval *msg = NULL, *headers = NULL; zval *msg = NULL, *headers = NULL;
stomp_t *stomp = NULL; stomp_t *stomp = NULL;
@ -1137,7 +1059,7 @@ static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp);
} }
INIT_FRAME(frame, cmd); INIT_FRAME(frame, "ACK");
if (NULL != headers) { if (NULL != headers) {
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers)); FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
@ -1165,22 +1087,6 @@ static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
} }
/* }}} */ /* }}} */
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
Acknowledge consumption of a message from a subscription using client acknowledgment */
PHP_FUNCTION(stomp_ack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "ACK");
}
/* }}} */
/* {{{ proto boolean Stomp::nack(mixed msg [, array headers])
Negative Acknowledgment of a message from a subscription */
PHP_FUNCTION(stomp_nack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "NACK");
}
/* }}} */
/* {{{ proto string Stomp::error() /* {{{ proto string Stomp::error()
Get the last error message */ Get the last error message */
PHP_FUNCTION(stomp_error) PHP_FUNCTION(stomp_error)
@ -1199,15 +1105,7 @@ PHP_FUNCTION(stomp_error)
} }
if (stomp->error) { if (stomp->error) {
if (stomp->error_details) { RETURN_STRING(stomp->error, 1);
char *error_msg = (char *) emalloc(strlen(stomp->error) + strlen(stomp->error_details) + 10);
strcpy(error_msg, stomp->error);
strcat(error_msg, "\n\n");
strcat(error_msg, stomp->error_details);
RETURN_STRING(error_msg, 0);
} else {
RETURN_STRING(stomp->error, 1);
}
} else { } else {
RETURN_FALSE; RETURN_FALSE;
} }
@ -1286,18 +1184,4 @@ PHP_METHOD(stompframe, __construct)
zend_update_property_stringl(stomp_ce_frame, object, "body", sizeof("body")-1, body, body_length TSRMLS_CC); zend_update_property_stringl(stomp_ce_frame, object, "body", sizeof("body")-1, body, body_length TSRMLS_CC);
} }
} }
/* }}} */ /* }}} */
/* {{{ proto string StompException::getDetails()
Get error details */
PHP_METHOD(stompexception, getDetails)
{
zval *object = getThis();
zval *details = NULL;
details = zend_read_property(stomp_ce_exception, object, "details", sizeof("details")-1, 1 TSRMLS_CC);
RETURN_STRINGL(Z_STRVAL_P(details), Z_STRLEN_P(details), 1);
}
/* }}} */

View File

@ -29,7 +29,11 @@ typedef struct _stomp_object {
} stomp_object_t; } stomp_object_t;
#define PHP_STOMP_EXTNAME "Stomp" #define PHP_STOMP_EXTNAME "Stomp"
#define PHP_STOMP_VERSION "1.0.9-dev" #define PHP_STOMP_MAJOR_VERSION "1"
#define PHP_STOMP_MINOR_VERSION "0"
#define PHP_STOMP_PATCH_VERSION "0"
#define PHP_STOMP_VERSION_STATUS ""
#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION PHP_STOMP_VERSION_STATUS
#define PHP_STOMP_RES_NAME "stomp connection" #define PHP_STOMP_RES_NAME "stomp connection"
@ -78,15 +82,12 @@ PHP_FUNCTION(stomp_begin);
PHP_FUNCTION(stomp_commit); PHP_FUNCTION(stomp_commit);
PHP_FUNCTION(stomp_abort); PHP_FUNCTION(stomp_abort);
PHP_FUNCTION(stomp_ack); PHP_FUNCTION(stomp_ack);
PHP_FUNCTION(stomp_nack);
PHP_FUNCTION(stomp_error); PHP_FUNCTION(stomp_error);
PHP_FUNCTION(stomp_set_read_timeout); PHP_FUNCTION(stomp_set_read_timeout);
PHP_FUNCTION(stomp_get_read_timeout); PHP_FUNCTION(stomp_get_read_timeout);
PHP_METHOD(stompframe, __construct); PHP_METHOD(stompframe, __construct);
PHP_METHOD(stompexception, getDetails);
ZEND_BEGIN_MODULE_GLOBALS(stomp) ZEND_BEGIN_MODULE_GLOBALS(stomp)
/* INI */ /* INI */
char *default_broker; char *default_broker;
@ -94,8 +95,6 @@ ZEND_BEGIN_MODULE_GLOBALS(stomp)
long read_timeout_usec; long read_timeout_usec;
long connection_timeout_sec; long connection_timeout_sec;
long connection_timeout_usec; long connection_timeout_usec;
char *default_username;
char *default_password;
/* Others */ /* Others */
long error_no; long error_no;

531
stomp.c
View File

@ -23,50 +23,19 @@
#endif #endif
#include "php.h" #include "php.h"
#include "zend_exceptions.h"
#include "ext/standard/php_smart_str.h" #include "ext/standard/php_smart_str.h"
#include "stomp.h" #include "stomp.h"
#include "php_stomp.h" #include "php_stomp.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/tcp.h>
#endif
#define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; } #define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; }
ZEND_EXTERN_MODULE_GLOBALS(stomp); ZEND_EXTERN_MODULE_GLOBALS(stomp);
extern zend_class_entry *stomp_ce_exception; extern zend_class_entry *stomp_ce_exception;
/* {{{ DEBUG */
#if PHP_DEBUG
static void print_stomp_frame(stomp_frame_t *frame TSRMLS_DC) {
php_printf("------ START FRAME ------\n");
php_printf("%s\n", frame->command);
/* Headers */
if (frame->headers) {
char *key;
ulong pos;
zend_hash_internal_pointer_reset(frame->headers);
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
char *value = NULL;
php_printf("%s:", key);
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
php_printf("%s", value);
}
php_printf("\n");
zend_hash_move_forward(frame->headers);
}
}
php_printf("\n%s\n", frame->body);
php_printf("------ END FRAME ------\n");
}
#endif
/* }}} */
/* {{{ stomp_init /* {{{ stomp_init
*/ */
stomp_t *stomp_init() stomp_t *stomp_init()
{ {
/* Memory allocation */ /* Memory allocation */
stomp_t *stomp = (stomp_t *) emalloc(sizeof(stomp_t)); stomp_t *stomp = (stomp_t *) emalloc(sizeof(stomp_t));
@ -77,131 +46,44 @@ stomp_t *stomp_init()
stomp->port = 0; stomp->port = 0;
stomp->status = 0; stomp->status = 0;
stomp->error = NULL; stomp->error = NULL;
stomp->error_details = NULL;
stomp->errnum = 0; stomp->errnum = 0;
stomp->session = NULL; stomp->session = NULL;
stomp->options.connect_timeout_sec = 2; stomp->options.connect_timeout_sec = 2;
stomp->options.connect_timeout_usec = 0; stomp->options.connect_timeout_usec = 0;
stomp->options.read_timeout_sec = 2; stomp->options.read_timeout_sec = 2;
stomp->options.read_timeout_usec = 0; stomp->options.read_timeout_usec = 2;
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
stomp->options.use_ssl = 0; stomp->options.use_ssl = 0;
stomp->ssl_handle = NULL; stomp->ssl_handle = NULL;
#endif #endif
stomp->frame_stack = NULL;
stomp->read_buffer.size = 0;
return stomp; return stomp;
} }
/* }}} */ /* }}} */
/* {{{ stomp_frame_stack_push /* {{{ stomp_set_error
*/ */
static void stomp_frame_stack_push(stomp_frame_stack_t **stack, stomp_frame_t *frame) void stomp_set_error(stomp_t *stomp, const char *error, int errnum)
{ {
stomp_frame_stack_t *cell = (stomp_frame_stack_t *) emalloc(sizeof(stomp_frame_stack_t));
cell->frame = frame;
cell->next = NULL;
if (!*stack) {
*stack = cell;
} else {
stomp_frame_stack_t *cursor = *stack;
while (cursor->next != NULL) cursor = cursor->next;
cursor->next = cell;
}
}
/* }}} */
/* {{{ stomp_frame_stack_shift
*/
static stomp_frame_t *stomp_frame_stack_shift(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
if (*stack) {
stomp_frame_stack_t *cell = *stack;
*stack = cell->next;
frame = cell->frame;
efree(cell);
}
return frame;
}
/* }}} */
/* {{{ stomp_frame_stack_clear
*/
static void stomp_frame_stack_clear(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
while ((frame = stomp_frame_stack_shift(stack))) efree(frame);
}
/* }}} */
/* {{{ stomp_set_error
*/
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...)
{
va_list ap;
int len;
if (stomp->error != NULL) {
efree(stomp->error);
stomp->error = NULL;
}
if (stomp->error_details != NULL) {
efree(stomp->error_details);
stomp->error_details = NULL;
}
stomp->errnum = errnum;
if (error != NULL) { if (error != NULL) {
if (stomp->error != NULL) {
efree(stomp->error);
}
stomp->error = estrdup(error); stomp->error = estrdup(error);
} stomp->errnum = errnum;
if (fmt != NULL) {
stomp->error_details = emalloc(STOMP_BUFSIZE);
if (stomp->error_details == NULL) {
return; /* Nothing else can be done */
}
va_start(ap, fmt);
/*
* Would've been better to call vasprintf(), but that
* function is missing on some platforms...
*/
len = vsnprintf(stomp->error_details, STOMP_BUFSIZE, fmt, ap);
va_end(ap);
if (len < STOMP_BUFSIZE) {
stomp->error_details = erealloc(stomp->error_details, len+1);
}
} }
} }
/* }}} */ /* }}} */
/* {{{ stomp_writable /* {{{ stomp_connect
*/
int stomp_writable(stomp_t *stomp)
{
int n;
n = php_pollfd_for_ms(stomp->fd, POLLOUT, 1000);
if (n != POLLOUT) {
#ifndef PHP_WIN32
if (n == 0) {
errno = ETIMEDOUT;
}
#endif
return 0;
}
return 1;
}
/* }}} */
/* {{{ stomp_connect
*/ */
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC) int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC)
{ {
char error[1024]; char error[1024];
socklen_t size; socklen_t size;
struct timeval tv; struct timeval tv;
int flag = 1; fd_set rfds;
if (stomp->host != NULL) if (stomp->host != NULL)
{ {
@ -219,30 +101,30 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC); stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC);
if (stomp->fd == -1) { if (stomp->fd == -1) {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port); snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno, "%s", strerror(errno)); stomp_set_error(stomp, error, errno);
return 0; return 0;
} }
#ifdef HAVE_NETINET_IN_H
setsockopt(stomp->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
#endif
size = sizeof(stomp->localaddr); size = sizeof(stomp->localaddr);
memset(&stomp->localaddr, 0, size); memset(&stomp->localaddr, 0, size);
if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) { if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno); snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
stomp_set_error(stomp, error, errno, NULL); stomp_set_error(stomp, error, errno);
return 0; return 0;
} }
if (stomp_writable(stomp)) { tv.tv_sec = 0;
tv.tv_usec = 0;
FD_ZERO(&rfds);
FD_SET(stomp->fd, &rfds);
if (select(stomp->fd + 1, NULL, &rfds, NULL, &tv) > 0) {
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
if (stomp->options.use_ssl) { if (stomp->options.use_ssl) {
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method()); SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
int ret;
if (NULL == ctx) { if (NULL == ctx) {
stomp_set_error(stomp, "failed to create the SSL context", 0, NULL); stomp_set_error(stomp, "failed to create the SSL context", 0);
return 0; return 0;
} }
@ -250,24 +132,24 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
stomp->ssl_handle = SSL_new(ctx); stomp->ssl_handle = SSL_new(ctx);
if (stomp->ssl_handle == NULL) { if (stomp->ssl_handle == NULL) {
stomp_set_error(stomp, "failed to create the SSL handle", 0, NULL); stomp_set_error(stomp, "failed to create the SSL handle", 0);
SSL_CTX_free(ctx); SSL_CTX_free(ctx);
return 0; return 0;
} }
SSL_set_fd(stomp->ssl_handle, stomp->fd); SSL_set_fd(stomp->ssl_handle, stomp->fd);
if ((ret = SSL_connect(stomp->ssl_handle)) <= 0) { if (SSL_connect(stomp->ssl_handle) <= 0) {
stomp_set_error(stomp, "SSL/TLS handshake failed", 0, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret)); stomp_set_error(stomp, "SSL/TLS handshake failed", 0);
SSL_shutdown(stomp->ssl_handle); SSL_shutdown(stomp->ssl_handle);
return 0; return 0;
} }
} }
#endif #endif
return 1; return 1;
} else { } else {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port); snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno, "%s", strerror(errno)); stomp_set_error(stomp, error, errno);
return 0; return 0;
} }
} }
@ -298,10 +180,7 @@ void stomp_close(stomp_t *stomp)
if (stomp->error) { if (stomp->error) {
efree(stomp->error); efree(stomp->error);
} }
if (stomp->error_details) {
efree(stomp->error_details);
}
stomp_frame_stack_clear(&stomp->frame_stack);
efree(stomp); efree(stomp);
} }
/* }}} */ /* }}} */
@ -319,7 +198,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
/* Headers */ /* Headers */
if (frame->headers) { if (frame->headers) {
char *key; char *key;
ulong pos; ulong pos;
zend_hash_internal_pointer_reset(frame->headers); zend_hash_internal_pointer_reset(frame->headers);
@ -340,7 +219,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
} }
if (frame->body_length > 0) { if (frame->body_length > 0) {
smart_str_appendl(&buf, "content-length:", sizeof("content-length:") - 1); smart_str_appends(&buf, "content-length: ");
smart_str_append_long(&buf, frame->body_length); smart_str_append_long(&buf, frame->body_length);
smart_str_appendc(&buf, '\n'); smart_str_appendc(&buf, '\n');
} }
@ -348,35 +227,28 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
smart_str_appendc(&buf, '\n'); smart_str_appendc(&buf, '\n');
if (frame->body > 0) { if (frame->body > 0) {
smart_str_appendl(&buf, frame->body, frame->body_length > 0 ? frame->body_length : strlen(frame->body)); smart_str_appends(&buf, frame->body);
}
smart_str_appendl(&buf, "\0", sizeof("\0")-1);
if (!stomp_writable(stomp)) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
return 0;
} }
#ifdef HAVE_STOMP_SSL #ifdef HAVE_STOMP_SSL
if (stomp->options.use_ssl) { if (stomp->options.use_ssl) {
int ret; if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) {
if (-1 == (ret = SSL_write(stomp->ssl_handle, buf.c, buf.len))) { char error[1024];
smart_str_free(&buf); snprintf(error, sizeof(error), "Unable to send data");
stomp_set_error(stomp, "Unable to send data", errno, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret)); stomp_set_error(stomp, error, errno);
return 0; return 0;
} }
} else { } else {
#endif #endif
if (-1 == send(stomp->fd, buf.c, buf.len, 0)) { if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) {
smart_str_free(&buf); char error[1024];
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno)); snprintf(error, sizeof(error), "Unable to send data");
stomp_set_error(stomp, error, errno);
return 0; return 0;
} }
#ifdef HAVE_STOMP_SSL #ifdef HAVE_STOMP_SSL
} }
#endif #endif
smart_str_free(&buf); smart_str_free(&buf);
@ -386,12 +258,9 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
/* {{{ stomp_recv /* {{{ stomp_recv
*/ */
static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length) int stomp_recv(stomp_t *stomp, char *msg, size_t length)
{ {
int len; int len;
stomp_select(stomp);
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
if(stomp->options.use_ssl) { if(stomp->options.use_ssl) {
len = SSL_read(stomp->ssl_handle, msg, length); len = SSL_read(stomp->ssl_handle, msg, length);
@ -402,142 +271,66 @@ static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length)
} }
#endif #endif
if (len == -1) { if (len == 0) {
#if HAVE_STOMP_SSL TSRMLS_FETCH();
if (stomp->options.use_ssl) { zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, "Unexpected EOF while reading from socket");
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL in use)", strerror(errno));
} else {
#endif
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL not in use)", strerror(errno));
#if HAVE_STOMP_SSL
}
#endif
stomp->status = -1;
} else if (len == 0) {
stomp_set_error(stomp, "Sender closed connection unexpectedly", 0, NULL);
stomp->status = -1; stomp->status = -1;
} }
return len; return len;
} }
int stomp_recv(stomp_t *stomp, char *msg, const size_t length)
{
if (stomp->read_buffer.size == 0) {
if (length >= STOMP_BUFSIZE) {
return _stomp_recv(stomp, msg, length);
} else {
size_t recv_size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
if (recv_size <= length) {
memcpy(msg, stomp->read_buffer.buf, recv_size);
return recv_size;
} else {
memcpy(msg, stomp->read_buffer.buf, length);
stomp->read_buffer.pos = stomp->read_buffer.buf + length;
stomp->read_buffer.size = recv_size - length;
return length;
}
}
} else if (stomp->read_buffer.size >= length) {
memcpy(msg, stomp->read_buffer.pos, length);
stomp->read_buffer.pos += length;
stomp->read_buffer.size -= length;
return length;
} else {
int len = stomp->read_buffer.size;
memcpy(msg, stomp->read_buffer.pos, stomp->read_buffer.size);
stomp->read_buffer.size = 0;
if (stomp_select_ex(stomp, 0, 0)) {
return len + stomp_recv(stomp, msg + len, length - len);
} else {
return len;
}
}
}
/* }}} */ /* }}} */
/* {{{ _stomp_read_until /* {{{ stomp_read_buffer
*/ */
static size_t _stomp_read_until(stomp_t *stomp, char **data, const char delimiter) static int stomp_read_buffer(stomp_t *stomp, char **data)
{ {
size_t length = 0; int rc = 0;
size_t bufsize = STOMP_BUFSIZE; size_t i = 0;
char *buffer = (char *) emalloc(STOMP_BUFSIZE); size_t bufsize = STOMP_BUFSIZE + 1;
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
while (1) { while (1) {
unsigned int i, found;
char *c;
found = 0;
// First populate the buffer size_t length = 1;
if (stomp->read_buffer.size == 0) { rc = stomp_recv(stomp, buffer + i, length);
stomp->read_buffer.size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE); if (rc < 1) {
stomp->read_buffer.pos = stomp->read_buffer.buf; efree(buffer);
return -1;
} }
// Then search the delimiter if (1 == length) {
c = stomp->read_buffer.pos; i++;
for (i = 1; i <= stomp->read_buffer.size ; i++) {
if (*c == delimiter) { if (buffer[i-1] == 0) {
found = 1; char endline[1];
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
efree(buffer);
return 0;
}
break; break;
} else {
c++;
} }
}
if (!found) i--;
// Make sure we have enough place in the buffer if (i >= bufsize) {
if ((i+length) >= bufsize) { buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE); bufsize += STOMP_BUFSIZE;
bufsize += STOMP_BUFSIZE;
}
// Copy and update the buffer
memcpy(buffer + length, stomp->read_buffer.pos, i);
length += i;
stomp->read_buffer.pos += i;
stomp->read_buffer.size -= i;
if (found) {
break;
}
}
if (length) {
*data = buffer;
} else {
efree(buffer);
*data = NULL;
}
return length;
}
/* }}} */
/* {{{ stomp_read_buffer
*/
static size_t stomp_read_buffer(stomp_t *stomp, char **data)
{
size_t length = _stomp_read_until(stomp, data, 0);
if (stomp_select_ex(stomp, 0, 0)) {
char endline[1];
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
if (*data) {
efree(*data);
*data = NULL;
} }
return 0;
} }
} }
if (length > 1) {
length --; if (i > 1) {
} else if (length) { *data = (char *) emalloc(i);
efree(*data); if (NULL == *data) {
*data = NULL; efree(buffer);
length = 0; return -1;
}
memcpy(*data, buffer, i);
} }
return length;
efree(buffer);
return i-1;
} }
/* }}} */ /* }}} */
@ -545,16 +338,52 @@ static size_t stomp_read_buffer(stomp_t *stomp, char **data)
*/ */
static int stomp_read_line(stomp_t *stomp, char **data) static int stomp_read_line(stomp_t *stomp, char **data)
{ {
size_t length = _stomp_read_until(stomp, data, '\n'); int rc = 0;
if (length > 1) { size_t i = 0;
(*data)[length - 1] = 0; size_t bufsize = STOMP_BUFSIZE + 1;
length--; char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
} else if (length) {
efree(*data); while (1) {
*data = NULL;
length = 0; size_t length = 1;
rc = stomp_recv(stomp, buffer + i, length);
if (rc < 1) {
efree(buffer);
return -1;
}
if (1 == length) {
i++;
if (buffer[i-1] == '\n') {
buffer[i-1] = 0;
break;
} else if (buffer[i-1] == 0) {
efree(buffer);
return 0;
}
if (i >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
}
} }
return length;
if (i > 1) {
*data = (char *) emalloc(i);
if (NULL == *data) {
efree(buffer);
return -1;
}
memcpy(*data, buffer, i);
}
efree(buffer);
return i-1;
} }
/* }}} */ /* }}} */
@ -578,22 +407,14 @@ void stomp_free_frame(stomp_frame_t *frame)
} }
/* }}} */ /* }}} */
/* {{{ stomp_read_frame /* {{{ stomp_read_frame
*/ */
stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack) stomp_frame_t *stomp_read_frame(stomp_t *stomp)
{ {
stomp_frame_t *f = NULL; stomp_frame_t *f = NULL;
char *cmd = NULL, *length_str = NULL; char *cmd = NULL, *length_str = NULL;
int length = 0; int length = 0;
if (use_stack && stomp->frame_stack) {
return stomp_frame_stack_shift(&stomp->frame_stack);
}
if (!stomp_select(stomp)) {
return NULL;
}
INIT_STOMP_FRAME(f); INIT_STOMP_FRAME(f);
if (NULL == f) { if (NULL == f) {
@ -613,20 +434,20 @@ stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
while (1) { while (1) {
char *p = NULL; char *p = NULL;
length = stomp_read_line(stomp, &p); length = stomp_read_line(stomp, &p);
if (length < 0) { if (length < 0) {
RETURN_READ_FRAME_FAIL; RETURN_READ_FRAME_FAIL;
} }
if (0 == length) { if (0 == length) {
break; break;
} else { } else {
char *p2 = NULL; char *p2 = NULL;
char *key; char *key;
char *value; char *value;
p2 = strstr(p,":"); p2 = strstr(p,":");
if (p2 == NULL) { if (p2 == NULL) {
efree(p); efree(p);
RETURN_READ_FRAME_FAIL; RETURN_READ_FRAME_FAIL;
@ -646,27 +467,20 @@ stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
} }
/* Check for the content length */ /* Check for the content length */
if (zend_hash_find(f->headers, "content-length", sizeof("content-length"), (void **)&length_str) == SUCCESS) { if (zend_hash_find(f->headers, "content-length", strlen("content-length"), (void **)&length_str) == SUCCESS) {
int recv_size = 0;
char endbuffer[2]; char endbuffer[2];
length = 2;
f->body_length = atoi(length_str); f->body_length = atoi(length_str);
f->body = (char *) emalloc(f->body_length); f->body = (char *) emalloc(f->body_length);
while (recv_size != f->body_length) { if (-1 == stomp_recv(stomp, f->body, f->body_length)) {
int l = stomp_recv(stomp, f->body + recv_size, f->body_length - recv_size);
if (-1 == l) {
RETURN_READ_FRAME_FAIL;
} else {
recv_size += l;
}
}
length = stomp_recv(stomp, endbuffer, 2);
if (endbuffer[0] != '\0' || ((2 == length) && (endbuffer[1] != '\n'))) {
RETURN_READ_FRAME_FAIL; RETURN_READ_FRAME_FAIL;
} }
if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
RETURN_READ_FRAME_FAIL;
}
} else { } else {
f->body_length = stomp_read_buffer(stomp, &f->body); f->body_length = stomp_read_buffer(stomp, &f->body);
} }
@ -680,36 +494,24 @@ stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) { int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
int success = 1; int success = 1;
char *receipt = NULL; char *receipt = NULL;
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) { if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
stomp_frame_t *res = stomp_read_frame(stomp);
success = 0; success = 0;
while (1) { if (res) {
stomp_frame_t *res = stomp_read_frame_ex(stomp, 0); if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
if (res) { char *receipt_id = NULL;
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) { if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS
char *receipt_id = NULL; && strlen(receipt) == strlen(receipt_id)
if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS && !strcmp(receipt, receipt_id)) {
&& strlen(receipt) == strlen(receipt_id) success = 1;
&& !strcmp(receipt, receipt_id)) { }
success = 1; } else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
} else { char *error_msg = NULL;
stomp_set_error(stomp, "Invalid receipt", 0, "%s", receipt_id); if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
} stomp_set_error(stomp, error_msg, 0);
stomp_free_frame(res);
return success;
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
stomp_set_error(stomp, error_msg, 0, "%s", res->body);
}
stomp_free_frame(res);
return success;
} else {
stomp_frame_stack_push(&stomp->frame_stack, res);
} }
} else {
return success;
} }
stomp_free_frame(res);
} }
} }
return success; return success;
@ -718,26 +520,17 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
/* {{{ stomp_select /* {{{ stomp_select
*/ */
int stomp_select_ex(stomp_t *stomp, const long int sec, const long int usec) int stomp_select(stomp_t *stomp)
{ {
int n;
struct timeval tv; struct timeval tv;
fd_set rfds;
if (stomp->read_buffer.size || stomp->frame_stack) { tv.tv_sec = stomp->options.read_timeout_sec;
return 1; tv.tv_usec = stomp->options.read_timeout_usec;
}
tv.tv_sec = sec;
tv.tv_usec = usec;
n = php_pollfd_for(stomp->fd, PHP_POLLREADABLE, &tv); FD_ZERO(&rfds);
if (n < 1) { FD_SET(stomp->fd, &rfds);
#if !defined(PHP_WIN32) && !(defined(NETWARE) && defined(USE_WINSOCK))
if (n == 0) { return select(stomp->fd + 1, &rfds, NULL, NULL, &tv);
errno = ETIMEDOUT;
}
#endif
return 0;
}
return 1;
} }
/* }}} */ /* }}} */

38
stomp.h Normal file → Executable file
View File

@ -45,19 +45,6 @@ typedef struct _stomp_options {
#endif #endif
} stomp_options_t; } stomp_options_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
typedef struct _stomp_frame_stack {
stomp_frame_t *frame;
struct _stomp_frame_stack *next;
} stomp_frame_stack_t;
typedef struct _stomp { typedef struct _stomp {
php_socket_t fd; php_socket_t fd;
php_sockaddr_storage localaddr; php_sockaddr_storage localaddr;
@ -67,31 +54,30 @@ typedef struct _stomp {
int status; int status;
char *error; char *error;
int errnum; int errnum;
char *error_details;
char *session; char *session;
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
SSL *ssl_handle; SSL *ssl_handle;
#endif #endif
stomp_frame_stack_t *frame_stack;
struct {
size_t size;
char buf[STOMP_BUFSIZE];
char *pos;
} read_buffer;
} stomp_t; } stomp_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
stomp_t *stomp_init(); stomp_t *stomp_init();
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC); int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
void stomp_close(stomp_t *stomp); void stomp_close(stomp_t *stomp);
int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC); int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC);
stomp_frame_t *stomp_read_frame_ex(stomp_t *connection, int use_stack); stomp_frame_t *stomp_read_frame(stomp_t *connection);
int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame); int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
int stomp_select_ex(stomp_t *connection, const long int sec, const long int usec); int stomp_select(stomp_t *connection);
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...) ZEND_ATTRIBUTE_PTR_FORMAT(printf, 4, 0); void stomp_set_error(stomp_t *stomp, const char *error, int errnum);
void stomp_free_frame(stomp_frame_t *frame); void stomp_free_frame(stomp_frame_t *frame);
#define stomp_select(s) stomp_select_ex(s, s->options.read_timeout_sec, s->options.read_timeout_sec)
#define stomp_read_frame(c) stomp_read_frame_ex(c, 1)
#endif /* _STOMP_H_ */ #endif /* _STOMP_H_ */
/* /*

View File

@ -7,4 +7,4 @@ Test stomp_version()
echo stomp_version(); echo stomp_version();
?> ?>
--EXPECTF-- --EXPECTF--
%d.%d.%s %d.%d.%d

View File

@ -1,14 +0,0 @@
--TEST--
Test stomp_connect() - Test error on CONNECT
--SKIPIF--
<?php if (!extension_loaded("stomp")) print "skip"; ?>
--FILE--
<?php
try {
$stomp = new Stomp('tcp://localhost', 'anotpresentusername1234');
} catch (Exception $e) {
var_dump(get_class($e));
}
?>
--EXPECTF--
string(14) "StompException"

View File

@ -14,6 +14,7 @@ stomp_send($link, '/queue/test-06', array());
var_dump(stomp_send($link, '/queue/test-06', '')); var_dump(stomp_send($link, '/queue/test-06', ''));
var_dump(stomp_send($link, '/queue/test-06', 'A realMessage')); var_dump(stomp_send($link, '/queue/test-06', 'A realMessage'));
var_dump(stomp_send($link, '/queue/test-06', 'بياريك شارون')); var_dump(stomp_send($link, '/queue/test-06', 'بياريك شارون'));
var_dump(stomp_send($link, 'بياريك شارون', 'بياريك شارون', array('receipt' => 'message-123')), stomp_error($link));
?> ?>
--EXPECTF-- --EXPECTF--
@ -23,3 +24,5 @@ Warning: stomp_send(): Expects parameter %d to be a string or a StompFrame objec
bool(true) bool(true)
bool(true) bool(true)
bool(true) bool(true)
bool(false)
string(%d) "%s"

View File

@ -14,6 +14,7 @@ $s->send('/queue/test-06', array());
var_dump($s->send('/queue/test-06', '')); var_dump($s->send('/queue/test-06', ''));
var_dump($s->send('/queue/test-06', 'A realMessage')); var_dump($s->send('/queue/test-06', 'A realMessage'));
var_dump($s->send('/queue/test-06', 'بياريك شارون')); var_dump($s->send('/queue/test-06', 'بياريك شارون'));
var_dump($s->send('بياريك شارون', 'بياريك شارون', array('receipt' => 'message-123')), $s->error());
?> ?>
--EXPECTF-- --EXPECTF--
@ -23,3 +24,5 @@ Warning: Stomp::send(): Expects parameter %d to be a string or a StompFrame obje
bool(true) bool(true)
bool(true) bool(true)
bool(true) bool(true)
bool(false)
string(%d) "%s"

View File

@ -1,14 +0,0 @@
--TEST--
Test stomp::send() - test send with receipt
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->send('/queue/test-06', 'A real message', array('receipt' => 'message-12345')));
?>
--EXPECTF--
bool(true)

View File

@ -14,4 +14,4 @@ $s->subscribe('/queue/test', 'string');
--EXPECTF-- --EXPECTF--
Warning: Stomp::subscribe(): Destination can not be empty in %s on line %d Warning: Stomp::subscribe(): Destination can not be empty in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::subscribe() must be %s array, string given in %s on line %d Catchable fatal error: Argument 2 passed to Stomp::subscribe() must be an array, string given in %s on line %d

View File

@ -14,4 +14,4 @@ $s->unsubscribe('/queue/test', 'string');
--EXPECTF-- --EXPECTF--
Warning: Stomp::unsubscribe(): Destination can not be empty in %s on line %d Warning: Stomp::unsubscribe(): Destination can not be empty in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::unsubscribe() must be %s array, string given in %s on line %d Catchable fatal error: Argument 2 passed to Stomp::unsubscribe() must be an array, string given in %s on line %d

View File

@ -9,7 +9,7 @@ Test stomp::readFrame() - tests functionnality and parameters
<?php <?php
$s = new Stomp(); $s = new Stomp();
$s->send('/queue/test-09', 'A test Message'); $s->send('/queue/test-09', 'A test Message');
$s->subscribe('/queue/test-09', array('ack' => 'auto')); $s->subscribe('/queue/test-09');
var_dump($s->readFrame()->body); var_dump($s->readFrame()->body);
var_dump($s->readFrame()); var_dump($s->readFrame());

View File

@ -9,7 +9,7 @@ Test stomp_read_frame() - test functionnality and parameters
<?php <?php
$link = stomp_connect(); $link = stomp_connect();
stomp_send($link, '/queue/test-09', 'A test Message'); stomp_send($link, '/queue/test-09', 'A test Message');
stomp_subscribe($link, '/queue/test-09', array('ack' => 'auto')); stomp_subscribe($link, '/queue/test-09');
$result = stomp_read_frame($link); $result = stomp_read_frame($link);
var_dump($result['body']); var_dump($result['body']);
var_dump(stomp_read_frame($link)); var_dump(stomp_read_frame($link));

View File

@ -18,7 +18,7 @@ class customFrame extends stompFrame
$s = new Stomp(); $s = new Stomp();
$s->send('/queue/test-09', 'A test Message'); $s->send('/queue/test-09', 'A test Message');
$s->subscribe('/queue/test-09', array('ack' => 'auto')); $s->subscribe('/queue/test-09');
$frame = $s->readFrame('customFrame'); $frame = $s->readFrame('customFrame');
var_dump(get_class($frame), $frame->body); var_dump(get_class($frame), $frame->body);
?> ?>

Binary file not shown.

Binary file not shown.

View File

@ -1,26 +0,0 @@
--TEST--
Test stomp::readFrame() - test frame stack
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->subscribe('/queue/test-buffer', array('ack' => 'auto')));
var_dump($s->send('/queue/test-buffer', "Message1", array('receipt' => 'msg-1')));
var_dump($s->send('/queue/test-buffer', "Message2", array('receipt' => 'msg-2')));
var_dump($s->send('/queue/test-buffer', "Message3", array('receipt' => 'msg-3')));
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
?>
--EXPECTF--
bool(true)
bool(true)
bool(true)
bool(true)
string(8) "Message1"
string(8) "Message2"
string(8) "Message3"

View File

@ -17,7 +17,7 @@ var_dump($s->send('/queue/test-011-commit', 'bar', array('transaction' => 't1'))
// sends a message to the queue and asks for a receipt // sends a message to the queue and asks for a receipt
$s->send('/queue/test-011-commit', 'bar', array('transaction' => 't2', 'receipt' => 'tptp')); $s->send('/queue/test-011-commit', 'bar', array('transaction' => 't2', 'receipt' => 'tptp'));
echo gettype($s->error()) . PHP_EOL; var_dump($s->error());
// commits a valid transaction // commits a valid transaction
var_dump($s->commit('t1')); var_dump($s->commit('t1'));
@ -28,15 +28,15 @@ var_dump($s->commit(null));
// commits a non valid transaction (a transaction id that does not exist) and asks for a receipt // commits a non valid transaction (a transaction id that does not exist) and asks for a receipt
$s->commit('t2', array('receipt' => 'commit-key')); $s->commit('t2', array('receipt' => 'commit-key'));
echo gettype($s->error()); var_dump($s->error());
unset($s); unset($s);
?> ?>
--EXPECTF-- --EXPECTF--
bool(true) bool(true)
bool(true) bool(true)
string string(%d) "Invalid transaction id: %s"
bool(true) bool(true)
bool(false) bool(false)
bool(true) bool(true)
string string(%d) "Must specify the transaction you are committing"

View File

@ -19,7 +19,7 @@ try {
$stomp->send($queue, $msg); $stomp->send($queue, $msg);
/* subscribe to messages from the queue 'foo' */ /* subscribe to messages from the queue 'foo' */
$stomp->subscribe($queue, array('ack' => 'auto')); $stomp->subscribe($queue);
/* read a frame */ /* read a frame */
$frame = $stomp->readFrame(); $frame = $stomp->readFrame();