Compare commits

...

42 Commits

Author SHA1 Message Date
Vitaliy Filippov acf43f619f Only add content-length header for messages containing zero byte
Allows OpenMQ/ActiveMQ to correctly map messages to JMS TextMessages
instead of only BytesMessages.

Fixes https://bugs.php.net/bug.php?id=70280
2015-08-18 12:08:17 +03:00
Vitaliy Filippov 618d2bd4d4 Fix compiler warning (assignment makes pointer from integer without a cast) 2015-08-18 12:07:50 +03:00
Gennady Feldman eb55496781 Fixing stomp_writable() check so we can catch when connect failed. 2015-06-05 14:41:04 -04:00
Gennady Feldman 15a3b9a059 1.0.8 => 1.0.9-dev 2015-05-18 18:19:58 -04:00
Gennady Feldman 50bfb5d8da Releasing v1.0.8 2015-05-18 18:17:11 -04:00
Gennady Feldman 4bd40f20a2 Fixing PHP_STOMP_VERSION constant, per Remi's request. 2015-05-18 18:08:25 -04:00
Remi Collet bf4539e523 fix perm on source files 2015-05-16 07:18:45 +02:00
Gennady Feldman 2c4b2f8bf1 1.0.7 => 1.0.8-dev 2015-05-15 17:21:29 -04:00
Gennady Feldman 0e38c2452e Releasing v1.0.7 2015-05-15 17:12:34 -04:00
Gennady Feldman 59b74e4142 Merge branch 'pull-request/6' 2015-05-05 10:20:59 -04:00
Gennady Feldman fbacdccd37 Fix error checking after stomp_send().
This one has been in there for a while and would hang for me in
1.0.6 release if the connection was refused.

Some tests would fail too. This error was also somewhat masked in
previous release. We should abort and throw an exception if send
fails.
2015-05-05 10:17:49 -04:00
Gennady Feldman bdedf5922c Adding #ifdef HAVE_NETINET_IN_H around the new TCP_NODELAY code to fix compilation on Windows and possibly others. 2015-02-25 11:58:32 -05:00
Pierrick Charron 8b0bbd4db2 Improve frame stack and add test 2014-12-09 01:49:39 -05:00
Pierrick Charron 79a3ca34c4 Fix error message on invalid receipt 2014-12-09 01:10:11 -05:00
Remi Collet 5024fff376 add missing LICENSE file (mandatory for downstream) 2014-12-08 06:25:08 +01:00
Pierrick Charron 8aece537c1 1.0.6 => 1.0.7-dev 2014-12-07 21:57:57 -05:00
Pierrick Charron ca0de7d66a Release stomp v1.0.6 2014-12-07 21:53:45 -05:00
Pierrick Charron 200bd35bb2 Fix error message mismatch 2014-12-07 21:15:03 -05:00
Pierrick Charron 059566498b Make stomp_set_error() to be printf like (mi+php at aldan dot algebra dot com) 2014-12-07 18:49:04 -05:00
Pierrick Charron d37e34917d Make sure connection is closed on error 2014-12-07 14:40:23 -05:00
Pierrick Charron 189b8dcfc2 Fixed bug #64671 (Add stomp_nack and Stomp::nack functions). 2014-12-07 13:22:01 -05:00
Pierrick Charron 026798d7c4 Update package.xml for merged #68497 2014-12-07 13:03:31 -05:00
Pierrick Charron aafb968d49 Merge remote-tracking branch 'github-fntlnz/fix-bug-68497' 2014-12-07 13:01:37 -05:00
Pierrick Charron c338977f53 Update package.xml for Bug #67170 2014-12-07 12:55:45 -05:00
Lorenzo Fontana 8e18eb079d Fix Bug #68497 Stomp client doesn't parse ERROR response on CONNECT 2014-12-02 19:28:20 +01:00
Yarek Tyshchenko 214d0eb602 Disable Nagle's Algorithm by adding TCP_NODELAY option to the socket
Signed-off-by: Gennady Feldman <gena01@gmail.com>
2014-12-01 10:43:38 -05:00
Peter Petermann beb86e70fe added fix be marcelog, see #59970 2013-03-17 17:33:02 +01:00
Pierrick Charron 5ac6a47e5d Remove compilation warnings 2012-11-21 08:59:02 -05:00
Pierrick Charron 6dff7f7fd1 Remove useless new line 2012-11-20 21:45:38 -05:00
Pierrick Charron d705083ab3 Merge branch 'read_buffer'
* read_buffer:
  Reimplement read_line and read_buffer
  read buffer implementation on top of stomp_recv
2012-11-20 21:28:31 -05:00
Pierrick Charron e7feab2950 Reimplement read_line and read_buffer 2012-11-20 21:25:46 -05:00
Pierrick Charron 19f47cd105 read buffer implementation on top of stomp_recv 2012-11-20 19:54:00 -05:00
Pierrick Charron 9e0839fff1 Rename few things 2012-11-20 19:12:26 -05:00
Pierrick Charron 36c7391402 Auto ack for this test 2012-11-20 17:34:51 -05:00
Pierrick Charron 6a0fd1393f Package.xml 2012-11-20 17:11:12 -05:00
Pierrick Charron d4c9f11e07 Combine the two sendto call in one 2012-11-20 17:00:43 -05:00
Pierrick Charron 0a6e6b2f5a s#writeable#writable#g 2012-11-19 09:03:22 -05:00
Pierrick Charron a1619b9aa1 Add new note 2012-11-18 20:31:55 -05:00
Pierrick Charron 520e311c83 Fix rabbitmq compatibility 2012-11-18 20:20:23 -05:00
Pierrick Charron 71caee45bc Add two new ini config default_username and default_password 2012-11-18 19:33:12 -05:00
Pierrick Charron 5f7d260c05 Fix stomp_read_frame when buffered 2012-11-18 19:23:27 -05:00
Pierrick Charron ce374724b9 1.0.5 => 1.0.6-dev 2012-11-18 17:35:17 -05:00
12 changed files with 556 additions and 214 deletions

28
.gitignore vendored Normal file
View File

@ -0,0 +1,28 @@
.deps
.libs/
Makefile
Makefile.fragments
Makefile.global
Makefile.objects
acinclude.m4
aclocal.m4
autom4te.cache/
config.guess
config.h
config.h.in
config.log
config.nice
config.status
config.sub
configure
configure.in
install-sh
libtool
ltmain.sh
missing
mkinstalldirs
modules/
php_stomp.lo
run-tests.php
stomp.la
stomp.lo

68
LICENSE Normal file
View File

@ -0,0 +1,68 @@
--------------------------------------------------------------------
The PHP License, version 3.01
Copyright (c) 1999 - 2014 The PHP Group. All rights reserved.
--------------------------------------------------------------------
Redistribution and use in source and binary forms, with or without
modification, is permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
3. The name "PHP" must not be used to endorse or promote products
derived from this software without prior written permission. For
written permission, please contact group@php.net.
4. Products derived from this software may not be called "PHP", nor
may "PHP" appear in their name, without prior written permission
from group@php.net. You may indicate that your software works in
conjunction with PHP by saying "Foo for PHP" instead of calling
it "PHP Foo" or "phpfoo"
5. The PHP Group may publish revised and/or new versions of the
license from time to time. Each version will be given a
distinguishing version number.
Once covered code has been published under a particular version
of the license, you may always continue to use it under the terms
of that version. You may also choose to use such covered code
under the terms of any subsequent version of the license
published by the PHP Group. No one other than the PHP Group has
the right to modify the terms applicable to covered code created
under this License.
6. Redistributions of any form whatsoever must retain the following
acknowledgment:
"This product includes PHP software, freely available from
<http://www.php.net/software/>".
THIS SOFTWARE IS PROVIDED BY THE PHP DEVELOPMENT TEAM ``AS IS'' AND
ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PHP
DEVELOPMENT TEAM OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
This software consists of voluntary contributions made by many
individuals on behalf of the PHP Group.
The PHP Group can be contacted via Email at group@php.net.
For more information on the PHP Group and the PHP project,
please see <http://www.php.net>.
PHP includes the Zend Engine, freely available at
<http://www.zend.com>.

7
TODO
View File

@ -1,8 +1,5 @@
- Add the ability to connect to a network of brokers
(Cf: http://activemq.apache.org/networks-of-brokers.html)
- Add Stomp 1.1 full support
- Review tests to work on every message broker
- ActimeMQ
- stompserver (Ruby)
- TBD
- Verify if we need to call recv with MSG_PEEK for the newline char
or if the current implementation is enough (cf bug #59217)

View File

@ -4,7 +4,7 @@
<channel>pecl.php.net</channel>
<summary>Stomp client extension</summary>
<description>
This extension allows php applications to communicate with any Stomp compliant Message Brokers through easy object oriented and procedural interfaces.
This extension allows php applications to communicate with any Stomp compliant Message Brokers through easy object oriented and procedural interfaces.
</description>
<lead>
<name>Pierrick Charron</name>
@ -12,20 +12,22 @@ This extension allows php applications to communicate with any Stomp compliant M
<email>pierrick@php.net</email>
<active>yes</active>
</lead>
<date>2012-11-18</date>
<version><release>1.0.5</release><api>1.0.5</api></version>
<lead>
<name>Gennady Feldman</name>
<user>gena01</user>
<email>gena01@php.net</email>
<active>yes</active>
</lead>
<date>XXXX-XX-XX</date>
<version><release>1.0.X</release><api>1.0.X</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<notes>
- Fix memory leak when Stomp can't write the message on the queue. (Pierrick)
- Add a buffer for receipts. (Pierrick)
- Fixed bug #62831 (Stomp module seems not initializing SSL library first).
(Patch by lwhsu at lwhsu dot org)
- Fixed bug #59972 (Message body are not binary safe). (Pierrick)
</notes>
<contents>
<dir name="/">
<file role="doc" name="CREDITS" />
<file role="doc" name="LICENSE" />
<file role="src" name="config.m4" />
<file role="src" name="config.w32" />
<file role="src" name="php_stomp.c" />
@ -77,10 +79,47 @@ This extension allows php applications to communicate with any Stomp compliant M
<providesextension>stomp</providesextension>
<extsrcrelease>
<configureoption default="/usr" name="with-openssl-dir" prompt="OpenSSL install prefix (no to disable SSL support)" />
<configureoption default="/usr" name="with-openssl-dir" prompt="OpenSSL install prefix (no to disable SSL support)" />
</extsrcrelease>
<changelog>
<release>
<version><release>1.0.8</release><api>1.0.8</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-18</date>
<notes>
- Fix perm on source files. (Remi)
- Fixing PHP_STOMP_VERSION constant, per Remi's request. (Gennady)
</notes>
</release>
<release>
<version><release>1.0.7</release><api>1.0.7</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-15</date>
<notes>
- add LICENSE file as documentation (Remi)
- Fixed Windows compilation regression due to new TCP_NODELAY code. (Gennady Feldman)
- Fixed bug where error checking was missing after stomp_send(). (Gennady Feldman)
</notes>
</release>
<release>
<version><release>1.0.6</release><api>1.0.6</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2014-12-07</date>
<notes>
- Add two new ini options stomp.default_username and stomp.default_passowrd (Pierrick)
- General performance improvements (Pierrick)
- Fix stomp_read_frame when buffered (Pierrick)
- Fixed bug #59217 (Connections to RabbitMQ via CLI). (Pierrick).
- Fixed bug #59970 (acking a message makes rabbitmq disconnect the server). (Pierrick)
- Fixed bug #67170 (Disable Nagle's Algorithm with TCP_NODELAY, it delays sending small messages). (Yarek Tyshchenko)
- Fixed bug #68497 (Stomp client doesn't parse ERROR response on CONNECT). (Lorenzo Fontana)
- Fixed bug #64671 (Add stomp_nack and Stomp::nack functions). (Pierrick)
</notes>
</release>
<release>
<version><release>1.0.5</release><api>1.0.5</api></version>
<stability><release>stable</release><api>stable</api></stability>
@ -118,7 +157,7 @@ This extension allows php applications to communicate with any Stomp compliant M
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-08-13</date>
<notes>
- Fixed SSL connection bug introduced in 1.0.1
- Fixed SSL connection bug introduced in 1.0.1
</notes>
</release>
<release>
@ -144,7 +183,7 @@ This extension allows php applications to communicate with any Stomp compliant M
- Bump to stable
</notes>
</release>
<release>
<release>
<version><release>0.4.1</release><api>0.4.1</api></version>
<stability><release>beta</release><api>beta</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
@ -168,10 +207,10 @@ This extension allows php applications to communicate with any Stomp compliant M
<license uri="http://www.php.net/license">PHP License</license>
<date>2009-11-22</date>
<notes>
- Adds alt class
- Adds alt class
- Fixed bug #16936 (Module segfaults on readFrame if Frame > STOMP_BUFSIZE)
- Fixed bug #16933 (readFrame does not notice when server shuts down)
- Fixed bug #16930 (readFrame reports error-frames as "timeout")
- Fixed bug #16930 (readFrame reports error-frames as "timeout")
</notes>
</release>
<release>
@ -193,8 +232,8 @@ This extension allows php applications to communicate with any Stomp compliant M
- new stomp_connect_error() function (pierrick)
- stomp_begin, stomp_abort and stomp_commit now accept additional headers (pierrick)
- new connection timeout and read timeout ini configuration (pierrick)
- Fix a memory leak in stomp_read_line (pierrick)
- Better set of test (Pierrick and Anis)
- Fix a memory leak in stomp_read_line (pierrick)
- Better set of test (Pierrick and Anis)
</notes>
</release>
<release>
@ -204,7 +243,7 @@ This extension allows php applications to communicate with any Stomp compliant M
<date>2009-11-01</date>
<notes>
- Windows build fix (kalle)
- Add SSL support (pierrick)
- Add SSL support (pierrick)
</notes>
</release>
<release>
@ -213,7 +252,7 @@ This extension allows php applications to communicate with any Stomp compliant M
<license uri="http://www.php.net/license">PHP License</license>
<date>2009-10-30</date>
<notes>
- Initial PECL release. (pierrick)
- Initial PECL release. (pierrick)
</notes>
</release>
</changelog>

80
php_stomp.c Executable file → Normal file
View File

@ -62,7 +62,9 @@
SEPARATE_ZVAL(value); \
convert_to_string(*value); \
} \
zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
if (strcmp(string_key, "content-length") != 0) { \
zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
}\
efree(string_key); \
} \
}
@ -175,6 +177,17 @@ ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_nack_args, 0, 0, 2)
ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_oop_nack_args, 0, 0, 1)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_set_read_timeout_args, 0, 0, 2)
ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, seconds)
@ -209,6 +222,7 @@ zend_function_entry stomp_functions[] = {
PHP_FE(stomp_commit, stomp_transaction_args)
PHP_FE(stomp_abort, stomp_transaction_args)
PHP_FE(stomp_ack, stomp_ack_args)
PHP_FE(stomp_nack, stomp_nack_args)
PHP_FE(stomp_error, stomp_link_only)
PHP_FE(stomp_set_read_timeout, stomp_set_read_timeout_args)
PHP_FE(stomp_get_read_timeout, stomp_link_only)
@ -230,6 +244,7 @@ static zend_function_entry stomp_methods[] = {
PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args)
PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args)
PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args)
PHP_FALIAS(nack, stomp_nack, stomp_oop_nack_args)
PHP_FALIAS(error, stomp_error, stomp_no_args)
PHP_FALIAS(setReadTimeout, stomp_set_read_timeout, stomp_oop_set_read_timeout_args)
PHP_FALIAS(getReadTimeout, stomp_get_read_timeout, stomp_no_args)
@ -276,6 +291,8 @@ zend_module_entry stomp_module_entry = {
PHP_INI_BEGIN()
STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_username", "", PHP_INI_ALL, OnUpdateString, default_username, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_password", "", PHP_INI_ALL, OnUpdateString, default_password, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, read_timeout_sec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, read_timeout_usec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_connection_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, connection_timeout_sec, zend_stomp_globals, stomp_globals)
@ -286,6 +303,8 @@ PHP_INI_END()
static PHP_GINIT_FUNCTION(stomp)
{
stomp_globals->default_broker = NULL;
stomp_globals->default_username = NULL;
stomp_globals->default_password = NULL;
stomp_globals->read_timeout_sec = 2;
stomp_globals->read_timeout_usec = 0;
stomp_globals->connection_timeout_sec = 2;
@ -512,14 +531,17 @@ PHP_FUNCTION(stomp_connect)
if (stomp->status) {
stomp_frame_t *res;
int rres;
stomp_frame_t frame = {0};
INIT_FRAME(frame, "CONNECT");
if (username_len == 0) {
username = "";
if (!username) {
username = STOMP_G(default_username);
username_len = strlen(username);
}
if (password_len == 0) {
password = "";
if (!password) {
password = STOMP_G(default_password);
password_len = strlen(password);
}
zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL);
zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL);
@ -528,13 +550,30 @@ PHP_FUNCTION(stomp_connect)
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
}
stomp_send(stomp, &frame TSRMLS_CC);
rres = stomp_send(stomp, &frame TSRMLS_CC);
CLEAR_FRAME(frame);
if (0 == rres) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, stomp->errnum TSRMLS_CC, stomp->error);
if (stomp->error_details) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, stomp->error_details TSRMLS_CC);
}
return;
}
/* Retreive Response */
res = stomp_read_frame(stomp);
if (NULL == res) {
STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING);
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg);
if (res->body) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
}
stomp_free_frame(res);
}
} else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) {
if (stomp->error) {
STOMP_ERROR_DETAILS(stomp->errnum, stomp->error, stomp->error_details);
@ -708,6 +747,8 @@ PHP_FUNCTION(stomp_send)
CLEAR_FRAME(frame);
RETURN_FALSE;
}
if (frame.body_length > 0 && strnlen(frame.body, frame.body_length) >= frame.body_length)
frame.body_length = 0;
if (stomp_send(stomp, &frame TSRMLS_CC) > 0) {
success = stomp_valid_receipt(stomp, &frame);
@ -1073,10 +1114,9 @@ PHP_FUNCTION(stomp_abort)
}
/* }}} */
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
Acknowledge consumption of a message from a subscription using client acknowledgment */
PHP_FUNCTION(stomp_ack)
{
/* {{{ _php_stomp_acknowledgment
*/
static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
zval *stomp_object = getThis();
zval *msg = NULL, *headers = NULL;
stomp_t *stomp = NULL;
@ -1097,7 +1137,7 @@ PHP_FUNCTION(stomp_ack)
ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp);
}
INIT_FRAME(frame, "ACK");
INIT_FRAME(frame, cmd);
if (NULL != headers) {
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
@ -1125,6 +1165,22 @@ PHP_FUNCTION(stomp_ack)
}
/* }}} */
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
Acknowledge consumption of a message from a subscription using client acknowledgment */
PHP_FUNCTION(stomp_ack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "ACK");
}
/* }}} */
/* {{{ proto boolean Stomp::nack(mixed msg [, array headers])
Negative Acknowledgment of a message from a subscription */
PHP_FUNCTION(stomp_nack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "NACK");
}
/* }}} */
/* {{{ proto string Stomp::error()
Get the last error message */
PHP_FUNCTION(stomp_error)

View File

@ -29,11 +29,7 @@ typedef struct _stomp_object {
} stomp_object_t;
#define PHP_STOMP_EXTNAME "Stomp"
#define PHP_STOMP_MAJOR_VERSION "1"
#define PHP_STOMP_MINOR_VERSION "0"
#define PHP_STOMP_PATCH_VERSION "5"
#define PHP_STOMP_VERSION_STATUS ""
#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION PHP_STOMP_VERSION_STATUS
#define PHP_STOMP_VERSION "1.0.9-dev"
#define PHP_STOMP_RES_NAME "stomp connection"
@ -82,6 +78,7 @@ PHP_FUNCTION(stomp_begin);
PHP_FUNCTION(stomp_commit);
PHP_FUNCTION(stomp_abort);
PHP_FUNCTION(stomp_ack);
PHP_FUNCTION(stomp_nack);
PHP_FUNCTION(stomp_error);
PHP_FUNCTION(stomp_set_read_timeout);
PHP_FUNCTION(stomp_get_read_timeout);
@ -97,6 +94,8 @@ ZEND_BEGIN_MODULE_GLOBALS(stomp)
long read_timeout_usec;
long connection_timeout_sec;
long connection_timeout_usec;
char *default_username;
char *default_password;
/* Others */
long error_no;

427
stomp.c
View File

@ -23,19 +23,50 @@
#endif
#include "php.h"
#include "zend_exceptions.h"
#include "ext/standard/php_smart_str.h"
#include "stomp.h"
#include "php_stomp.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/tcp.h>
#endif
#define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; }
ZEND_EXTERN_MODULE_GLOBALS(stomp);
extern zend_class_entry *stomp_ce_exception;
/* {{{ DEBUG */
#if PHP_DEBUG
static void print_stomp_frame(stomp_frame_t *frame TSRMLS_DC) {
php_printf("------ START FRAME ------\n");
php_printf("%s\n", frame->command);
/* Headers */
if (frame->headers) {
char *key;
ulong pos;
zend_hash_internal_pointer_reset(frame->headers);
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
char *value = NULL;
php_printf("%s:", key);
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
php_printf("%s", value);
}
php_printf("\n");
zend_hash_move_forward(frame->headers);
}
}
php_printf("\n%s\n", frame->body);
php_printf("------ END FRAME ------\n");
}
#endif
/* }}} */
/* {{{ stomp_init
*/
stomp_t *stomp_init()
stomp_t *stomp_init()
{
/* Memory allocation */
stomp_t *stomp = (stomp_t *) emalloc(sizeof(stomp_t));
@ -59,36 +90,37 @@ stomp_t *stomp_init()
stomp->ssl_handle = NULL;
#endif
stomp->buffer = NULL;
stomp->frame_stack = NULL;
stomp->read_buffer.size = 0;
return stomp;
}
/* }}} */
/* {{{ stomp_frame_buffer_push
/* {{{ stomp_frame_stack_push
*/
void stomp_frame_buffer_push(stomp_frame_cell_t **pcell, stomp_frame_t *frame)
static void stomp_frame_stack_push(stomp_frame_stack_t **stack, stomp_frame_t *frame)
{
stomp_frame_cell_t *cell = (stomp_frame_cell_t *) emalloc(sizeof(stomp_frame_cell_t));
stomp_frame_stack_t *cell = (stomp_frame_stack_t *) emalloc(sizeof(stomp_frame_stack_t));
cell->frame = frame;
cell->next = NULL;
if (!*pcell) {
*pcell = cell;
if (!*stack) {
*stack = cell;
} else {
stomp_frame_cell_t *cursor = *pcell;
stomp_frame_stack_t *cursor = *stack;
while (cursor->next != NULL) cursor = cursor->next;
cursor->next = cell;
}
}
/* }}} */
/* {{{ stomp_frame_buffer_shift
/* {{{ stomp_frame_stack_shift
*/
stomp_frame_t *stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) {
static stomp_frame_t *stomp_frame_stack_shift(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
if (*pcell) {
stomp_frame_cell_t *cell = *pcell;
*pcell = cell->next;
if (*stack) {
stomp_frame_stack_t *cell = *stack;
*stack = cell->next;
frame = cell->frame;
efree(cell);
}
@ -96,22 +128,25 @@ stomp_frame_t *stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) {
}
/* }}} */
/* {{{ stomp_frame_buffer_clear
/* {{{ stomp_frame_stack_clear
*/
void stomp_frame_buffer_clear(stomp_frame_cell_t **pcell) {
static void stomp_frame_stack_clear(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
while (frame = stomp_frame_buffer_shift(pcell)) efree(frame);
while ((frame = stomp_frame_stack_shift(stack))) efree(frame);
}
/* }}} */
/* {{{ stomp_set_error
/* {{{ stomp_set_error
*/
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details)
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...)
{
va_list ap;
int len;
if (stomp->error != NULL) {
efree(stomp->error);
stomp->error = NULL;
}
}
if (stomp->error_details != NULL) {
efree(stomp->error_details);
stomp->error_details = NULL;
@ -120,20 +155,33 @@ void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *
if (error != NULL) {
stomp->error = estrdup(error);
}
if (details != NULL) {
stomp->error_details = estrdup(details);
if (fmt != NULL) {
stomp->error_details = emalloc(STOMP_BUFSIZE);
if (stomp->error_details == NULL) {
return; /* Nothing else can be done */
}
va_start(ap, fmt);
/*
* Would've been better to call vasprintf(), but that
* function is missing on some platforms...
*/
len = vsnprintf(stomp->error_details, STOMP_BUFSIZE, fmt, ap);
va_end(ap);
if (len < STOMP_BUFSIZE) {
stomp->error_details = erealloc(stomp->error_details, len+1);
}
}
}
/* }}} */
/* {{{ stomp_writeable
/* {{{ stomp_writable
*/
int stomp_writeable(stomp_t *stomp)
int stomp_writable(stomp_t *stomp)
{
int n;
n = php_pollfd_for_ms(stomp->fd, POLLOUT, 1000);
if (n < 1) {
if (n != POLLOUT) {
#ifndef PHP_WIN32
if (n == 0) {
errno = ETIMEDOUT;
@ -146,13 +194,14 @@ int stomp_writeable(stomp_t *stomp)
}
/* }}} */
/* {{{ stomp_connect
/* {{{ stomp_connect
*/
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC)
{
char error[1024];
socklen_t size;
struct timeval tv;
int flag = 1;
if (stomp->host != NULL)
{
@ -170,22 +219,28 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC);
if (stomp->fd == -1) {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno, NULL);
stomp_set_error(stomp, error, errno, "%s", strerror(errno));
return 0;
}
#ifdef HAVE_NETINET_IN_H
setsockopt(stomp->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
#endif
size = sizeof(stomp->localaddr);
memset(&stomp->localaddr, 0, size);
if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
stomp_set_error(stomp, error, errno, NULL);
return 0;
stomp_set_error(stomp, error, errno, NULL);
return 0;
}
if (stomp_writeable(stomp)) {
if (stomp_writable(stomp)) {
#if HAVE_STOMP_SSL
if (stomp->options.use_ssl) {
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
int ret;
if (NULL == ctx) {
stomp_set_error(stomp, "failed to create the SSL context", 0, NULL);
return 0;
@ -199,20 +254,20 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
SSL_CTX_free(ctx);
return 0;
}
SSL_set_fd(stomp->ssl_handle, stomp->fd);
if (SSL_connect(stomp->ssl_handle) <= 0) {
stomp_set_error(stomp, "SSL/TLS handshake failed", 0, NULL);
if ((ret = SSL_connect(stomp->ssl_handle)) <= 0) {
stomp_set_error(stomp, "SSL/TLS handshake failed", 0, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
SSL_shutdown(stomp->ssl_handle);
return 0;
}
}
#endif
#endif
return 1;
} else {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno, NULL);
stomp_set_error(stomp, error, errno, "%s", strerror(errno));
return 0;
}
}
@ -246,7 +301,7 @@ void stomp_close(stomp_t *stomp)
if (stomp->error_details) {
efree(stomp->error_details);
}
stomp_frame_buffer_clear(&stomp->buffer);
stomp_frame_stack_clear(&stomp->frame_stack);
efree(stomp);
}
/* }}} */
@ -264,7 +319,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
/* Headers */
if (frame->headers) {
char *key;
char *key;
ulong pos;
zend_hash_internal_pointer_reset(frame->headers);
@ -285,7 +340,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
}
if (frame->body_length > 0) {
smart_str_appends(&buf, "content-length:");
smart_str_appendl(&buf, "content-length:", sizeof("content-length:") - 1);
smart_str_append_long(&buf, frame->body_length);
smart_str_appendc(&buf, '\n');
}
@ -296,35 +351,32 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
smart_str_appendl(&buf, frame->body, frame->body_length > 0 ? frame->body_length : strlen(frame->body));
}
if (!stomp_writeable(stomp)) {
char error[1024];
snprintf(error, sizeof(error), "Unable to send data");
stomp_set_error(stomp, error, errno, NULL);
smart_str_appendl(&buf, "\0", sizeof("\0")-1);
if (!stomp_writable(stomp)) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
return 0;
}
#ifdef HAVE_STOMP_SSL
if (stomp->options.use_ssl) {
if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) {
char error[1024];
snprintf(error, sizeof(error), "Unable to send data");
stomp_set_error(stomp, error, errno, NULL);
int ret;
if (-1 == (ret = SSL_write(stomp->ssl_handle, buf.c, buf.len))) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
return 0;
}
} else {
#endif
if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) {
char error[1024];
snprintf(error, sizeof(error), "Unable to send data");
stomp_set_error(stomp, error, errno, NULL);
#endif
if (-1 == send(stomp->fd, buf.c, buf.len, 0)) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
return 0;
}
#ifdef HAVE_STOMP_SSL
}
#endif
#endif
smart_str_free(&buf);
@ -334,10 +386,12 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
/* {{{ stomp_recv
*/
int stomp_recv(stomp_t *stomp, char *msg, size_t length)
static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length)
{
int len;
stomp_select(stomp);
#if HAVE_STOMP_SSL
if(stomp->options.use_ssl) {
len = SSL_read(stomp->ssl_handle, msg, length);
@ -348,66 +402,142 @@ int stomp_recv(stomp_t *stomp, char *msg, size_t length)
}
#endif
if (len == 0) {
TSRMLS_FETCH();
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, "Unexpected EOF while reading from socket");
if (len == -1) {
#if HAVE_STOMP_SSL
if (stomp->options.use_ssl) {
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL in use)", strerror(errno));
} else {
#endif
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL not in use)", strerror(errno));
#if HAVE_STOMP_SSL
}
#endif
stomp->status = -1;
} else if (len == 0) {
stomp_set_error(stomp, "Sender closed connection unexpectedly", 0, NULL);
stomp->status = -1;
}
return len;
}
int stomp_recv(stomp_t *stomp, char *msg, const size_t length)
{
if (stomp->read_buffer.size == 0) {
if (length >= STOMP_BUFSIZE) {
return _stomp_recv(stomp, msg, length);
} else {
size_t recv_size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
if (recv_size <= length) {
memcpy(msg, stomp->read_buffer.buf, recv_size);
return recv_size;
} else {
memcpy(msg, stomp->read_buffer.buf, length);
stomp->read_buffer.pos = stomp->read_buffer.buf + length;
stomp->read_buffer.size = recv_size - length;
return length;
}
}
} else if (stomp->read_buffer.size >= length) {
memcpy(msg, stomp->read_buffer.pos, length);
stomp->read_buffer.pos += length;
stomp->read_buffer.size -= length;
return length;
} else {
int len = stomp->read_buffer.size;
memcpy(msg, stomp->read_buffer.pos, stomp->read_buffer.size);
stomp->read_buffer.size = 0;
if (stomp_select_ex(stomp, 0, 0)) {
return len + stomp_recv(stomp, msg + len, length - len);
} else {
return len;
}
}
}
/* }}} */
/* {{{ stomp_read_buffer
/* {{{ _stomp_read_until
*/
static int stomp_read_buffer(stomp_t *stomp, char **data)
static size_t _stomp_read_until(stomp_t *stomp, char **data, const char delimiter)
{
int rc = 0;
size_t i = 0;
size_t bufsize = STOMP_BUFSIZE + 1;
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
size_t length = 0;
size_t bufsize = STOMP_BUFSIZE;
char *buffer = (char *) emalloc(STOMP_BUFSIZE);
while (1) {
unsigned int i, found;
char *c;
found = 0;
size_t length = 1;
rc = stomp_recv(stomp, buffer + i, length);
if (rc < 1) {
efree(buffer);
return -1;
// First populate the buffer
if (stomp->read_buffer.size == 0) {
stomp->read_buffer.size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
stomp->read_buffer.pos = stomp->read_buffer.buf;
}
if (1 == length) {
i++;
if (buffer[i-1] == 0) {
char endline[1];
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
efree(buffer);
return 0;
}
// Then search the delimiter
c = stomp->read_buffer.pos;
for (i = 1; i <= stomp->read_buffer.size ; i++) {
if (*c == delimiter) {
found = 1;
break;
} else {
c++;
}
}
if (!found) i--;
if (i >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
// Make sure we have enough place in the buffer
if ((i+length) >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
// Copy and update the buffer
memcpy(buffer + length, stomp->read_buffer.pos, i);
length += i;
stomp->read_buffer.pos += i;
stomp->read_buffer.size -= i;
if (found) {
break;
}
}
if (i > 1) {
*data = (char *) emalloc(i);
if (NULL == *data) {
efree(buffer);
return -1;
}
memcpy(*data, buffer, i);
if (length) {
*data = buffer;
} else {
efree(buffer);
*data = NULL;
}
efree(buffer);
return length;
}
/* }}} */
return i-1;
/* {{{ stomp_read_buffer
*/
static size_t stomp_read_buffer(stomp_t *stomp, char **data)
{
size_t length = _stomp_read_until(stomp, data, 0);
if (stomp_select_ex(stomp, 0, 0)) {
char endline[1];
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
if (*data) {
efree(*data);
*data = NULL;
}
return 0;
}
}
if (length > 1) {
length --;
} else if (length) {
efree(*data);
*data = NULL;
length = 0;
}
return length;
}
/* }}} */
@ -415,52 +545,16 @@ static int stomp_read_buffer(stomp_t *stomp, char **data)
*/
static int stomp_read_line(stomp_t *stomp, char **data)
{
int rc = 0;
size_t i = 0;
size_t bufsize = STOMP_BUFSIZE + 1;
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
while (1) {
size_t length = 1;
rc = stomp_recv(stomp, buffer + i, length);
if (rc < 1) {
efree(buffer);
return -1;
}
if (1 == length) {
i++;
if (buffer[i-1] == '\n') {
buffer[i-1] = 0;
break;
} else if (buffer[i-1] == 0) {
efree(buffer);
return 0;
}
if (i >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
}
size_t length = _stomp_read_until(stomp, data, '\n');
if (length > 1) {
(*data)[length - 1] = 0;
length--;
} else if (length) {
efree(*data);
*data = NULL;
length = 0;
}
if (i > 1) {
*data = (char *) emalloc(i);
if (NULL == *data) {
efree(buffer);
return -1;
}
memcpy(*data, buffer, i);
}
efree(buffer);
return i-1;
return length;
}
/* }}} */
@ -484,16 +578,16 @@ void stomp_free_frame(stomp_frame_t *frame)
}
/* }}} */
/* {{{ stomp_read_frame
/* {{{ stomp_read_frame
*/
stomp_frame_t *stomp_read_frame(stomp_t *stomp)
stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
{
stomp_frame_t *f = NULL;
char *cmd = NULL, *length_str = NULL;
int length = 0;
if (stomp->buffer) {
return stomp_frame_buffer_shift(&stomp->buffer);
if (use_stack && stomp->frame_stack) {
return stomp_frame_stack_shift(&stomp->frame_stack);
}
if (!stomp_select(stomp)) {
@ -519,20 +613,20 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
while (1) {
char *p = NULL;
length = stomp_read_line(stomp, &p);
if (length < 0) {
RETURN_READ_FRAME_FAIL;
}
if (0 == length) {
break;
} else {
} else {
char *p2 = NULL;
char *key;
char *value;
p2 = strstr(p,":");
if (p2 == NULL) {
efree(p);
RETURN_READ_FRAME_FAIL;
@ -553,19 +647,26 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
/* Check for the content length */
if (zend_hash_find(f->headers, "content-length", sizeof("content-length"), (void **)&length_str) == SUCCESS) {
int recv_size = 0;
char endbuffer[2];
length = 2;
f->body_length = atoi(length_str);
f->body = (char *) emalloc(f->body_length);
if (-1 == stomp_recv(stomp, f->body, f->body_length)) {
while (recv_size != f->body_length) {
int l = stomp_recv(stomp, f->body + recv_size, f->body_length - recv_size);
if (-1 == l) {
RETURN_READ_FRAME_FAIL;
} else {
recv_size += l;
}
}
length = stomp_recv(stomp, endbuffer, 2);
if (endbuffer[0] != '\0' || ((2 == length) && (endbuffer[1] != '\n'))) {
RETURN_READ_FRAME_FAIL;
}
if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
RETURN_READ_FRAME_FAIL;
}
} else {
f->body_length = stomp_read_buffer(stomp, &f->body);
}
@ -578,14 +679,12 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
*/
int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
int success = 1;
char error[1024];
char *receipt = NULL;
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
stomp_frame_cell_t *buffer = NULL;
success = 0;
while (1) {
stomp_frame_t *res = stomp_read_frame(stomp);
stomp_frame_t *res = stomp_read_frame_ex(stomp, 0);
if (res) {
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
char *receipt_id = NULL;
@ -594,25 +693,21 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
&& !strcmp(receipt, receipt_id)) {
success = 1;
} else {
snprintf(error, sizeof(error), "Unexpected receipt id : %s", receipt_id);
stomp_set_error(stomp, error, 0, NULL);
stomp_set_error(stomp, "Invalid receipt", 0, "%s", receipt_id);
}
stomp_free_frame(res);
stomp->buffer = buffer;
return success;
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
stomp_set_error(stomp, error_msg, 0, res->body);
stomp_set_error(stomp, error_msg, 0, "%s", res->body);
}
stomp_free_frame(res);
stomp->buffer = buffer;
return success;
} else {
stomp_frame_buffer_push(&buffer, res);
stomp_frame_stack_push(&stomp->frame_stack, res);
}
} else {
stomp->buffer = buffer;
return success;
}
}
@ -623,28 +718,26 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
/* {{{ stomp_select
*/
int stomp_select(stomp_t *stomp)
int stomp_select_ex(stomp_t *stomp, const long int sec, const long int usec)
{
int n;
struct timeval tv;
if (stomp->buffer) {
if (stomp->read_buffer.size || stomp->frame_stack) {
return 1;
}
tv.tv_sec = stomp->options.read_timeout_sec;
tv.tv_usec = stomp->options.read_timeout_usec;
tv.tv_sec = sec;
tv.tv_usec = usec;
n = php_pollfd_for(stomp->fd, PHP_POLLREADABLE, &tv);
if (n < 1) {
#if !defined(PHP_WIN32) && !(defined(NETWARE) && defined(USE_WINSOCK))
if (n == 0) {
if (n == 0) {
errno = ETIMEDOUT;
}
#endif
}
#endif
return 0;
}
return 1;
}
/* }}} */

22
stomp.h Executable file → Normal file
View File

@ -53,10 +53,10 @@ typedef struct _stomp_frame {
int body_length;
} stomp_frame_t;
typedef struct _stomp_frame_cell {
typedef struct _stomp_frame_stack {
stomp_frame_t *frame;
struct _stomp_frame_cell *next;
} stomp_frame_cell_t;
struct _stomp_frame_stack *next;
} stomp_frame_stack_t;
typedef struct _stomp {
php_socket_t fd;
@ -72,18 +72,26 @@ typedef struct _stomp {
#if HAVE_STOMP_SSL
SSL *ssl_handle;
#endif
stomp_frame_cell_t *buffer;
stomp_frame_stack_t *frame_stack;
struct {
size_t size;
char buf[STOMP_BUFSIZE];
char *pos;
} read_buffer;
} stomp_t;
stomp_t *stomp_init();
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
void stomp_close(stomp_t *stomp);
int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC);
stomp_frame_t *stomp_read_frame(stomp_t *connection);
stomp_frame_t *stomp_read_frame_ex(stomp_t *connection, int use_stack);
int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
int stomp_select(stomp_t *connection);
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details);
int stomp_select_ex(stomp_t *connection, const long int sec, const long int usec);
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...) ZEND_ATTRIBUTE_PTR_FORMAT(printf, 4, 0);
void stomp_free_frame(stomp_frame_t *frame);
#define stomp_select(s) stomp_select_ex(s, s->options.read_timeout_sec, s->options.read_timeout_sec)
#define stomp_read_frame(c) stomp_read_frame_ex(c, 1)
#endif /* _STOMP_H_ */
/*

View File

@ -0,0 +1,14 @@
--TEST--
Test stomp_connect() - Test error on CONNECT
--SKIPIF--
<?php if (!extension_loaded("stomp")) print "skip"; ?>
--FILE--
<?php
try {
$stomp = new Stomp('tcp://localhost', 'anotpresentusername1234');
} catch (Exception $e) {
var_dump(get_class($e));
}
?>
--EXPECTF--
string(14) "StompException"

14
tests/006-send/003.phpt Normal file
View File

@ -0,0 +1,14 @@
--TEST--
Test stomp::send() - test send with receipt
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->send('/queue/test-06', 'A real message', array('receipt' => 'message-12345')));
?>
--EXPECTF--
bool(true)

View File

@ -0,0 +1,26 @@
--TEST--
Test stomp::readFrame() - test frame stack
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->subscribe('/queue/test-buffer', array('ack' => 'auto')));
var_dump($s->send('/queue/test-buffer', "Message1", array('receipt' => 'msg-1')));
var_dump($s->send('/queue/test-buffer', "Message2", array('receipt' => 'msg-2')));
var_dump($s->send('/queue/test-buffer', "Message3", array('receipt' => 'msg-3')));
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
?>
--EXPECTF--
bool(true)
bool(true)
bool(true)
bool(true)
string(8) "Message1"
string(8) "Message2"
string(8) "Message3"

View File

@ -19,7 +19,7 @@ try {
$stomp->send($queue, $msg);
/* subscribe to messages from the queue 'foo' */
$stomp->subscribe($queue);
$stomp->subscribe($queue, array('ack' => 'auto'));
/* read a frame */
$frame = $stomp->readFrame();