Compare commits
42 Commits
release_1_
...
master
Author | SHA1 | Date |
---|---|---|
|
acf43f619f | |
|
618d2bd4d4 | |
![]() |
eb55496781 | |
![]() |
15a3b9a059 | |
![]() |
50bfb5d8da | |
![]() |
4bd40f20a2 | |
![]() |
bf4539e523 | |
![]() |
2c4b2f8bf1 | |
![]() |
0e38c2452e | |
![]() |
59b74e4142 | |
![]() |
fbacdccd37 | |
![]() |
bdedf5922c | |
![]() |
8b0bbd4db2 | |
![]() |
79a3ca34c4 | |
![]() |
5024fff376 | |
![]() |
8aece537c1 | |
![]() |
ca0de7d66a | |
![]() |
200bd35bb2 | |
![]() |
059566498b | |
![]() |
d37e34917d | |
![]() |
189b8dcfc2 | |
![]() |
026798d7c4 | |
![]() |
aafb968d49 | |
![]() |
c338977f53 | |
![]() |
8e18eb079d | |
![]() |
214d0eb602 | |
![]() |
beb86e70fe | |
![]() |
5ac6a47e5d | |
![]() |
6dff7f7fd1 | |
![]() |
d705083ab3 | |
![]() |
e7feab2950 | |
![]() |
19f47cd105 | |
![]() |
9e0839fff1 | |
![]() |
36c7391402 | |
![]() |
6a0fd1393f | |
![]() |
d4c9f11e07 | |
![]() |
0a6e6b2f5a | |
![]() |
a1619b9aa1 | |
![]() |
520e311c83 | |
![]() |
71caee45bc | |
![]() |
5f7d260c05 | |
![]() |
ce374724b9 |
|
@ -0,0 +1,28 @@
|
|||
.deps
|
||||
.libs/
|
||||
Makefile
|
||||
Makefile.fragments
|
||||
Makefile.global
|
||||
Makefile.objects
|
||||
acinclude.m4
|
||||
aclocal.m4
|
||||
autom4te.cache/
|
||||
config.guess
|
||||
config.h
|
||||
config.h.in
|
||||
config.log
|
||||
config.nice
|
||||
config.status
|
||||
config.sub
|
||||
configure
|
||||
configure.in
|
||||
install-sh
|
||||
libtool
|
||||
ltmain.sh
|
||||
missing
|
||||
mkinstalldirs
|
||||
modules/
|
||||
php_stomp.lo
|
||||
run-tests.php
|
||||
stomp.la
|
||||
stomp.lo
|
|
@ -0,0 +1,68 @@
|
|||
--------------------------------------------------------------------
|
||||
The PHP License, version 3.01
|
||||
Copyright (c) 1999 - 2014 The PHP Group. All rights reserved.
|
||||
--------------------------------------------------------------------
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, is permitted provided that the following conditions
|
||||
are met:
|
||||
|
||||
1. Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
|
||||
2. Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in
|
||||
the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
|
||||
3. The name "PHP" must not be used to endorse or promote products
|
||||
derived from this software without prior written permission. For
|
||||
written permission, please contact group@php.net.
|
||||
|
||||
4. Products derived from this software may not be called "PHP", nor
|
||||
may "PHP" appear in their name, without prior written permission
|
||||
from group@php.net. You may indicate that your software works in
|
||||
conjunction with PHP by saying "Foo for PHP" instead of calling
|
||||
it "PHP Foo" or "phpfoo"
|
||||
|
||||
5. The PHP Group may publish revised and/or new versions of the
|
||||
license from time to time. Each version will be given a
|
||||
distinguishing version number.
|
||||
Once covered code has been published under a particular version
|
||||
of the license, you may always continue to use it under the terms
|
||||
of that version. You may also choose to use such covered code
|
||||
under the terms of any subsequent version of the license
|
||||
published by the PHP Group. No one other than the PHP Group has
|
||||
the right to modify the terms applicable to covered code created
|
||||
under this License.
|
||||
|
||||
6. Redistributions of any form whatsoever must retain the following
|
||||
acknowledgment:
|
||||
"This product includes PHP software, freely available from
|
||||
<http://www.php.net/software/>".
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE PHP DEVELOPMENT TEAM ``AS IS'' AND
|
||||
ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
||||
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
||||
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PHP
|
||||
DEVELOPMENT TEAM OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
|
||||
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
|
||||
OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
--------------------------------------------------------------------
|
||||
|
||||
This software consists of voluntary contributions made by many
|
||||
individuals on behalf of the PHP Group.
|
||||
|
||||
The PHP Group can be contacted via Email at group@php.net.
|
||||
|
||||
For more information on the PHP Group and the PHP project,
|
||||
please see <http://www.php.net>.
|
||||
|
||||
PHP includes the Zend Engine, freely available at
|
||||
<http://www.zend.com>.
|
7
TODO
7
TODO
|
@ -1,8 +1,5 @@
|
|||
- Add the ability to connect to a network of brokers
|
||||
(Cf: http://activemq.apache.org/networks-of-brokers.html)
|
||||
- Add Stomp 1.1 full support
|
||||
|
||||
- Review tests to work on every message broker
|
||||
- ActimeMQ
|
||||
- stompserver (Ruby)
|
||||
- TBD
|
||||
- Verify if we need to call recv with MSG_PEEK for the newline char
|
||||
or if the current implementation is enough (cf bug #59217)
|
||||
|
|
73
package.xml
73
package.xml
|
@ -4,7 +4,7 @@
|
|||
<channel>pecl.php.net</channel>
|
||||
<summary>Stomp client extension</summary>
|
||||
<description>
|
||||
This extension allows php applications to communicate with any Stomp compliant Message Brokers through easy object oriented and procedural interfaces.
|
||||
This extension allows php applications to communicate with any Stomp compliant Message Brokers through easy object oriented and procedural interfaces.
|
||||
</description>
|
||||
<lead>
|
||||
<name>Pierrick Charron</name>
|
||||
|
@ -12,20 +12,22 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
<email>pierrick@php.net</email>
|
||||
<active>yes</active>
|
||||
</lead>
|
||||
<date>2012-11-18</date>
|
||||
<version><release>1.0.5</release><api>1.0.5</api></version>
|
||||
<lead>
|
||||
<name>Gennady Feldman</name>
|
||||
<user>gena01</user>
|
||||
<email>gena01@php.net</email>
|
||||
<active>yes</active>
|
||||
</lead>
|
||||
<date>XXXX-XX-XX</date>
|
||||
<version><release>1.0.X</release><api>1.0.X</api></version>
|
||||
<stability><release>stable</release><api>stable</api></stability>
|
||||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<notes>
|
||||
- Fix memory leak when Stomp can't write the message on the queue. (Pierrick)
|
||||
- Add a buffer for receipts. (Pierrick)
|
||||
- Fixed bug #62831 (Stomp module seems not initializing SSL library first).
|
||||
(Patch by lwhsu at lwhsu dot org)
|
||||
- Fixed bug #59972 (Message body are not binary safe). (Pierrick)
|
||||
</notes>
|
||||
<contents>
|
||||
<dir name="/">
|
||||
<file role="doc" name="CREDITS" />
|
||||
<file role="doc" name="LICENSE" />
|
||||
<file role="src" name="config.m4" />
|
||||
<file role="src" name="config.w32" />
|
||||
<file role="src" name="php_stomp.c" />
|
||||
|
@ -77,10 +79,47 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
<providesextension>stomp</providesextension>
|
||||
|
||||
<extsrcrelease>
|
||||
<configureoption default="/usr" name="with-openssl-dir" prompt="OpenSSL install prefix (no to disable SSL support)" />
|
||||
<configureoption default="/usr" name="with-openssl-dir" prompt="OpenSSL install prefix (no to disable SSL support)" />
|
||||
</extsrcrelease>
|
||||
|
||||
<changelog>
|
||||
<release>
|
||||
<version><release>1.0.8</release><api>1.0.8</api></version>
|
||||
<stability><release>stable</release><api>stable</api></stability>
|
||||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<date>2015-05-18</date>
|
||||
<notes>
|
||||
- Fix perm on source files. (Remi)
|
||||
- Fixing PHP_STOMP_VERSION constant, per Remi's request. (Gennady)
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
<version><release>1.0.7</release><api>1.0.7</api></version>
|
||||
<stability><release>stable</release><api>stable</api></stability>
|
||||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<date>2015-05-15</date>
|
||||
<notes>
|
||||
- add LICENSE file as documentation (Remi)
|
||||
- Fixed Windows compilation regression due to new TCP_NODELAY code. (Gennady Feldman)
|
||||
- Fixed bug where error checking was missing after stomp_send(). (Gennady Feldman)
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
<version><release>1.0.6</release><api>1.0.6</api></version>
|
||||
<stability><release>stable</release><api>stable</api></stability>
|
||||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<date>2014-12-07</date>
|
||||
<notes>
|
||||
- Add two new ini options stomp.default_username and stomp.default_passowrd (Pierrick)
|
||||
- General performance improvements (Pierrick)
|
||||
- Fix stomp_read_frame when buffered (Pierrick)
|
||||
- Fixed bug #59217 (Connections to RabbitMQ via CLI). (Pierrick).
|
||||
- Fixed bug #59970 (acking a message makes rabbitmq disconnect the server). (Pierrick)
|
||||
- Fixed bug #67170 (Disable Nagle's Algorithm with TCP_NODELAY, it delays sending small messages). (Yarek Tyshchenko)
|
||||
- Fixed bug #68497 (Stomp client doesn't parse ERROR response on CONNECT). (Lorenzo Fontana)
|
||||
- Fixed bug #64671 (Add stomp_nack and Stomp::nack functions). (Pierrick)
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
<version><release>1.0.5</release><api>1.0.5</api></version>
|
||||
<stability><release>stable</release><api>stable</api></stability>
|
||||
|
@ -118,7 +157,7 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<date>2010-08-13</date>
|
||||
<notes>
|
||||
- Fixed SSL connection bug introduced in 1.0.1
|
||||
- Fixed SSL connection bug introduced in 1.0.1
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
|
@ -144,7 +183,7 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
- Bump to stable
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
<release>
|
||||
<version><release>0.4.1</release><api>0.4.1</api></version>
|
||||
<stability><release>beta</release><api>beta</api></stability>
|
||||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
|
@ -168,10 +207,10 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<date>2009-11-22</date>
|
||||
<notes>
|
||||
- Adds alt class
|
||||
- Adds alt class
|
||||
- Fixed bug #16936 (Module segfaults on readFrame if Frame > STOMP_BUFSIZE)
|
||||
- Fixed bug #16933 (readFrame does not notice when server shuts down)
|
||||
- Fixed bug #16930 (readFrame reports error-frames as "timeout")
|
||||
- Fixed bug #16930 (readFrame reports error-frames as "timeout")
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
|
@ -193,8 +232,8 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
- new stomp_connect_error() function (pierrick)
|
||||
- stomp_begin, stomp_abort and stomp_commit now accept additional headers (pierrick)
|
||||
- new connection timeout and read timeout ini configuration (pierrick)
|
||||
- Fix a memory leak in stomp_read_line (pierrick)
|
||||
- Better set of test (Pierrick and Anis)
|
||||
- Fix a memory leak in stomp_read_line (pierrick)
|
||||
- Better set of test (Pierrick and Anis)
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
|
@ -204,7 +243,7 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
<date>2009-11-01</date>
|
||||
<notes>
|
||||
- Windows build fix (kalle)
|
||||
- Add SSL support (pierrick)
|
||||
- Add SSL support (pierrick)
|
||||
</notes>
|
||||
</release>
|
||||
<release>
|
||||
|
@ -213,7 +252,7 @@ This extension allows php applications to communicate with any Stomp compliant M
|
|||
<license uri="http://www.php.net/license">PHP License</license>
|
||||
<date>2009-10-30</date>
|
||||
<notes>
|
||||
- Initial PECL release. (pierrick)
|
||||
- Initial PECL release. (pierrick)
|
||||
</notes>
|
||||
</release>
|
||||
</changelog>
|
||||
|
|
|
@ -62,7 +62,9 @@
|
|||
SEPARATE_ZVAL(value); \
|
||||
convert_to_string(*value); \
|
||||
} \
|
||||
zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
|
||||
if (strcmp(string_key, "content-length") != 0) { \
|
||||
zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
|
||||
}\
|
||||
efree(string_key); \
|
||||
} \
|
||||
}
|
||||
|
@ -175,6 +177,17 @@ ZEND_ARG_INFO(0, msg)
|
|||
ZEND_ARG_ARRAY_INFO(0, headers, 1)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_INFO_EX(stomp_nack_args, 0, 0, 2)
|
||||
ZEND_ARG_INFO(0, link)
|
||||
ZEND_ARG_INFO(0, msg)
|
||||
ZEND_ARG_ARRAY_INFO(0, headers, 1)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_INFO_EX(stomp_oop_nack_args, 0, 0, 1)
|
||||
ZEND_ARG_INFO(0, msg)
|
||||
ZEND_ARG_ARRAY_INFO(0, headers, 1)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_INFO_EX(stomp_set_read_timeout_args, 0, 0, 2)
|
||||
ZEND_ARG_INFO(0, link)
|
||||
ZEND_ARG_INFO(0, seconds)
|
||||
|
@ -209,6 +222,7 @@ zend_function_entry stomp_functions[] = {
|
|||
PHP_FE(stomp_commit, stomp_transaction_args)
|
||||
PHP_FE(stomp_abort, stomp_transaction_args)
|
||||
PHP_FE(stomp_ack, stomp_ack_args)
|
||||
PHP_FE(stomp_nack, stomp_nack_args)
|
||||
PHP_FE(stomp_error, stomp_link_only)
|
||||
PHP_FE(stomp_set_read_timeout, stomp_set_read_timeout_args)
|
||||
PHP_FE(stomp_get_read_timeout, stomp_link_only)
|
||||
|
@ -230,6 +244,7 @@ static zend_function_entry stomp_methods[] = {
|
|||
PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args)
|
||||
PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args)
|
||||
PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args)
|
||||
PHP_FALIAS(nack, stomp_nack, stomp_oop_nack_args)
|
||||
PHP_FALIAS(error, stomp_error, stomp_no_args)
|
||||
PHP_FALIAS(setReadTimeout, stomp_set_read_timeout, stomp_oop_set_read_timeout_args)
|
||||
PHP_FALIAS(getReadTimeout, stomp_get_read_timeout, stomp_no_args)
|
||||
|
@ -276,6 +291,8 @@ zend_module_entry stomp_module_entry = {
|
|||
|
||||
PHP_INI_BEGIN()
|
||||
STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals)
|
||||
STD_PHP_INI_ENTRY("stomp.default_username", "", PHP_INI_ALL, OnUpdateString, default_username, zend_stomp_globals, stomp_globals)
|
||||
STD_PHP_INI_ENTRY("stomp.default_password", "", PHP_INI_ALL, OnUpdateString, default_password, zend_stomp_globals, stomp_globals)
|
||||
STD_PHP_INI_ENTRY("stomp.default_read_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, read_timeout_sec, zend_stomp_globals, stomp_globals)
|
||||
STD_PHP_INI_ENTRY("stomp.default_read_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, read_timeout_usec, zend_stomp_globals, stomp_globals)
|
||||
STD_PHP_INI_ENTRY("stomp.default_connection_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, connection_timeout_sec, zend_stomp_globals, stomp_globals)
|
||||
|
@ -286,6 +303,8 @@ PHP_INI_END()
|
|||
static PHP_GINIT_FUNCTION(stomp)
|
||||
{
|
||||
stomp_globals->default_broker = NULL;
|
||||
stomp_globals->default_username = NULL;
|
||||
stomp_globals->default_password = NULL;
|
||||
stomp_globals->read_timeout_sec = 2;
|
||||
stomp_globals->read_timeout_usec = 0;
|
||||
stomp_globals->connection_timeout_sec = 2;
|
||||
|
@ -512,14 +531,17 @@ PHP_FUNCTION(stomp_connect)
|
|||
|
||||
if (stomp->status) {
|
||||
stomp_frame_t *res;
|
||||
int rres;
|
||||
stomp_frame_t frame = {0};
|
||||
|
||||
INIT_FRAME(frame, "CONNECT");
|
||||
if (username_len == 0) {
|
||||
username = "";
|
||||
if (!username) {
|
||||
username = STOMP_G(default_username);
|
||||
username_len = strlen(username);
|
||||
}
|
||||
if (password_len == 0) {
|
||||
password = "";
|
||||
if (!password) {
|
||||
password = STOMP_G(default_password);
|
||||
password_len = strlen(password);
|
||||
}
|
||||
zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL);
|
||||
zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL);
|
||||
|
@ -528,13 +550,30 @@ PHP_FUNCTION(stomp_connect)
|
|||
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
|
||||
}
|
||||
|
||||
stomp_send(stomp, &frame TSRMLS_CC);
|
||||
rres = stomp_send(stomp, &frame TSRMLS_CC);
|
||||
CLEAR_FRAME(frame);
|
||||
|
||||
if (0 == rres) {
|
||||
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, stomp->errnum TSRMLS_CC, stomp->error);
|
||||
if (stomp->error_details) {
|
||||
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, stomp->error_details TSRMLS_CC);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Retreive Response */
|
||||
res = stomp_read_frame(stomp);
|
||||
if (NULL == res) {
|
||||
STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING);
|
||||
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
|
||||
char *error_msg = NULL;
|
||||
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
|
||||
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg);
|
||||
if (res->body) {
|
||||
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
|
||||
}
|
||||
|
||||
stomp_free_frame(res);
|
||||
}
|
||||
} else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) {
|
||||
if (stomp->error) {
|
||||
STOMP_ERROR_DETAILS(stomp->errnum, stomp->error, stomp->error_details);
|
||||
|
@ -708,6 +747,8 @@ PHP_FUNCTION(stomp_send)
|
|||
CLEAR_FRAME(frame);
|
||||
RETURN_FALSE;
|
||||
}
|
||||
if (frame.body_length > 0 && strnlen(frame.body, frame.body_length) >= frame.body_length)
|
||||
frame.body_length = 0;
|
||||
|
||||
if (stomp_send(stomp, &frame TSRMLS_CC) > 0) {
|
||||
success = stomp_valid_receipt(stomp, &frame);
|
||||
|
@ -1073,10 +1114,9 @@ PHP_FUNCTION(stomp_abort)
|
|||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
|
||||
Acknowledge consumption of a message from a subscription using client acknowledgment */
|
||||
PHP_FUNCTION(stomp_ack)
|
||||
{
|
||||
/* {{{ _php_stomp_acknowledgment
|
||||
*/
|
||||
static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
|
||||
zval *stomp_object = getThis();
|
||||
zval *msg = NULL, *headers = NULL;
|
||||
stomp_t *stomp = NULL;
|
||||
|
@ -1097,7 +1137,7 @@ PHP_FUNCTION(stomp_ack)
|
|||
ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp);
|
||||
}
|
||||
|
||||
INIT_FRAME(frame, "ACK");
|
||||
INIT_FRAME(frame, cmd);
|
||||
|
||||
if (NULL != headers) {
|
||||
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
|
||||
|
@ -1125,6 +1165,22 @@ PHP_FUNCTION(stomp_ack)
|
|||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
|
||||
Acknowledge consumption of a message from a subscription using client acknowledgment */
|
||||
PHP_FUNCTION(stomp_ack)
|
||||
{
|
||||
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "ACK");
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ proto boolean Stomp::nack(mixed msg [, array headers])
|
||||
Negative Acknowledgment of a message from a subscription */
|
||||
PHP_FUNCTION(stomp_nack)
|
||||
{
|
||||
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "NACK");
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ proto string Stomp::error()
|
||||
Get the last error message */
|
||||
PHP_FUNCTION(stomp_error)
|
||||
|
|
|
@ -29,11 +29,7 @@ typedef struct _stomp_object {
|
|||
} stomp_object_t;
|
||||
|
||||
#define PHP_STOMP_EXTNAME "Stomp"
|
||||
#define PHP_STOMP_MAJOR_VERSION "1"
|
||||
#define PHP_STOMP_MINOR_VERSION "0"
|
||||
#define PHP_STOMP_PATCH_VERSION "5"
|
||||
#define PHP_STOMP_VERSION_STATUS ""
|
||||
#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION PHP_STOMP_VERSION_STATUS
|
||||
#define PHP_STOMP_VERSION "1.0.9-dev"
|
||||
|
||||
#define PHP_STOMP_RES_NAME "stomp connection"
|
||||
|
||||
|
@ -82,6 +78,7 @@ PHP_FUNCTION(stomp_begin);
|
|||
PHP_FUNCTION(stomp_commit);
|
||||
PHP_FUNCTION(stomp_abort);
|
||||
PHP_FUNCTION(stomp_ack);
|
||||
PHP_FUNCTION(stomp_nack);
|
||||
PHP_FUNCTION(stomp_error);
|
||||
PHP_FUNCTION(stomp_set_read_timeout);
|
||||
PHP_FUNCTION(stomp_get_read_timeout);
|
||||
|
@ -97,6 +94,8 @@ ZEND_BEGIN_MODULE_GLOBALS(stomp)
|
|||
long read_timeout_usec;
|
||||
long connection_timeout_sec;
|
||||
long connection_timeout_usec;
|
||||
char *default_username;
|
||||
char *default_password;
|
||||
|
||||
/* Others */
|
||||
long error_no;
|
||||
|
|
427
stomp.c
427
stomp.c
|
@ -23,19 +23,50 @@
|
|||
#endif
|
||||
|
||||
#include "php.h"
|
||||
#include "zend_exceptions.h"
|
||||
#include "ext/standard/php_smart_str.h"
|
||||
#include "stomp.h"
|
||||
#include "php_stomp.h"
|
||||
|
||||
#ifdef HAVE_NETINET_IN_H
|
||||
#include <netinet/tcp.h>
|
||||
#endif
|
||||
#define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; }
|
||||
|
||||
ZEND_EXTERN_MODULE_GLOBALS(stomp);
|
||||
extern zend_class_entry *stomp_ce_exception;
|
||||
|
||||
/* {{{ DEBUG */
|
||||
#if PHP_DEBUG
|
||||
static void print_stomp_frame(stomp_frame_t *frame TSRMLS_DC) {
|
||||
php_printf("------ START FRAME ------\n");
|
||||
php_printf("%s\n", frame->command);
|
||||
/* Headers */
|
||||
if (frame->headers) {
|
||||
char *key;
|
||||
ulong pos;
|
||||
zend_hash_internal_pointer_reset(frame->headers);
|
||||
|
||||
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
|
||||
char *value = NULL;
|
||||
|
||||
php_printf("%s:", key);
|
||||
|
||||
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
|
||||
php_printf("%s", value);
|
||||
}
|
||||
|
||||
php_printf("\n");
|
||||
zend_hash_move_forward(frame->headers);
|
||||
}
|
||||
}
|
||||
php_printf("\n%s\n", frame->body);
|
||||
php_printf("------ END FRAME ------\n");
|
||||
}
|
||||
#endif
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_init
|
||||
*/
|
||||
stomp_t *stomp_init()
|
||||
stomp_t *stomp_init()
|
||||
{
|
||||
/* Memory allocation */
|
||||
stomp_t *stomp = (stomp_t *) emalloc(sizeof(stomp_t));
|
||||
|
@ -59,36 +90,37 @@ stomp_t *stomp_init()
|
|||
stomp->ssl_handle = NULL;
|
||||
#endif
|
||||
|
||||
stomp->buffer = NULL;
|
||||
stomp->frame_stack = NULL;
|
||||
stomp->read_buffer.size = 0;
|
||||
return stomp;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_frame_buffer_push
|
||||
/* {{{ stomp_frame_stack_push
|
||||
*/
|
||||
void stomp_frame_buffer_push(stomp_frame_cell_t **pcell, stomp_frame_t *frame)
|
||||
static void stomp_frame_stack_push(stomp_frame_stack_t **stack, stomp_frame_t *frame)
|
||||
{
|
||||
stomp_frame_cell_t *cell = (stomp_frame_cell_t *) emalloc(sizeof(stomp_frame_cell_t));
|
||||
stomp_frame_stack_t *cell = (stomp_frame_stack_t *) emalloc(sizeof(stomp_frame_stack_t));
|
||||
cell->frame = frame;
|
||||
cell->next = NULL;
|
||||
|
||||
if (!*pcell) {
|
||||
*pcell = cell;
|
||||
if (!*stack) {
|
||||
*stack = cell;
|
||||
} else {
|
||||
stomp_frame_cell_t *cursor = *pcell;
|
||||
stomp_frame_stack_t *cursor = *stack;
|
||||
while (cursor->next != NULL) cursor = cursor->next;
|
||||
cursor->next = cell;
|
||||
}
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_frame_buffer_shift
|
||||
/* {{{ stomp_frame_stack_shift
|
||||
*/
|
||||
stomp_frame_t *stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) {
|
||||
static stomp_frame_t *stomp_frame_stack_shift(stomp_frame_stack_t **stack) {
|
||||
stomp_frame_t *frame = NULL;
|
||||
if (*pcell) {
|
||||
stomp_frame_cell_t *cell = *pcell;
|
||||
*pcell = cell->next;
|
||||
if (*stack) {
|
||||
stomp_frame_stack_t *cell = *stack;
|
||||
*stack = cell->next;
|
||||
frame = cell->frame;
|
||||
efree(cell);
|
||||
}
|
||||
|
@ -96,22 +128,25 @@ stomp_frame_t *stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) {
|
|||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_frame_buffer_clear
|
||||
/* {{{ stomp_frame_stack_clear
|
||||
*/
|
||||
void stomp_frame_buffer_clear(stomp_frame_cell_t **pcell) {
|
||||
static void stomp_frame_stack_clear(stomp_frame_stack_t **stack) {
|
||||
stomp_frame_t *frame = NULL;
|
||||
while (frame = stomp_frame_buffer_shift(pcell)) efree(frame);
|
||||
while ((frame = stomp_frame_stack_shift(stack))) efree(frame);
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_set_error
|
||||
/* {{{ stomp_set_error
|
||||
*/
|
||||
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details)
|
||||
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...)
|
||||
{
|
||||
va_list ap;
|
||||
int len;
|
||||
|
||||
if (stomp->error != NULL) {
|
||||
efree(stomp->error);
|
||||
stomp->error = NULL;
|
||||
}
|
||||
}
|
||||
if (stomp->error_details != NULL) {
|
||||
efree(stomp->error_details);
|
||||
stomp->error_details = NULL;
|
||||
|
@ -120,20 +155,33 @@ void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *
|
|||
if (error != NULL) {
|
||||
stomp->error = estrdup(error);
|
||||
}
|
||||
if (details != NULL) {
|
||||
stomp->error_details = estrdup(details);
|
||||
if (fmt != NULL) {
|
||||
stomp->error_details = emalloc(STOMP_BUFSIZE);
|
||||
if (stomp->error_details == NULL) {
|
||||
return; /* Nothing else can be done */
|
||||
}
|
||||
va_start(ap, fmt);
|
||||
/*
|
||||
* Would've been better to call vasprintf(), but that
|
||||
* function is missing on some platforms...
|
||||
*/
|
||||
len = vsnprintf(stomp->error_details, STOMP_BUFSIZE, fmt, ap);
|
||||
va_end(ap);
|
||||
if (len < STOMP_BUFSIZE) {
|
||||
stomp->error_details = erealloc(stomp->error_details, len+1);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_writeable
|
||||
/* {{{ stomp_writable
|
||||
*/
|
||||
int stomp_writeable(stomp_t *stomp)
|
||||
int stomp_writable(stomp_t *stomp)
|
||||
{
|
||||
int n;
|
||||
|
||||
n = php_pollfd_for_ms(stomp->fd, POLLOUT, 1000);
|
||||
if (n < 1) {
|
||||
if (n != POLLOUT) {
|
||||
#ifndef PHP_WIN32
|
||||
if (n == 0) {
|
||||
errno = ETIMEDOUT;
|
||||
|
@ -146,13 +194,14 @@ int stomp_writeable(stomp_t *stomp)
|
|||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_connect
|
||||
/* {{{ stomp_connect
|
||||
*/
|
||||
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC)
|
||||
{
|
||||
char error[1024];
|
||||
socklen_t size;
|
||||
struct timeval tv;
|
||||
int flag = 1;
|
||||
|
||||
if (stomp->host != NULL)
|
||||
{
|
||||
|
@ -170,22 +219,28 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
|
|||
stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC);
|
||||
if (stomp->fd == -1) {
|
||||
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
|
||||
stomp_set_error(stomp, error, errno, NULL);
|
||||
stomp_set_error(stomp, error, errno, "%s", strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef HAVE_NETINET_IN_H
|
||||
setsockopt(stomp->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
|
||||
#endif
|
||||
|
||||
size = sizeof(stomp->localaddr);
|
||||
memset(&stomp->localaddr, 0, size);
|
||||
if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
|
||||
snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
|
||||
stomp_set_error(stomp, error, errno, NULL);
|
||||
return 0;
|
||||
stomp_set_error(stomp, error, errno, NULL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (stomp_writeable(stomp)) {
|
||||
if (stomp_writable(stomp)) {
|
||||
#if HAVE_STOMP_SSL
|
||||
if (stomp->options.use_ssl) {
|
||||
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
|
||||
int ret;
|
||||
|
||||
if (NULL == ctx) {
|
||||
stomp_set_error(stomp, "failed to create the SSL context", 0, NULL);
|
||||
return 0;
|
||||
|
@ -199,20 +254,20 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
|
|||
SSL_CTX_free(ctx);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
SSL_set_fd(stomp->ssl_handle, stomp->fd);
|
||||
|
||||
if (SSL_connect(stomp->ssl_handle) <= 0) {
|
||||
stomp_set_error(stomp, "SSL/TLS handshake failed", 0, NULL);
|
||||
if ((ret = SSL_connect(stomp->ssl_handle)) <= 0) {
|
||||
stomp_set_error(stomp, "SSL/TLS handshake failed", 0, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
|
||||
SSL_shutdown(stomp->ssl_handle);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return 1;
|
||||
} else {
|
||||
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
|
||||
stomp_set_error(stomp, error, errno, NULL);
|
||||
stomp_set_error(stomp, error, errno, "%s", strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -246,7 +301,7 @@ void stomp_close(stomp_t *stomp)
|
|||
if (stomp->error_details) {
|
||||
efree(stomp->error_details);
|
||||
}
|
||||
stomp_frame_buffer_clear(&stomp->buffer);
|
||||
stomp_frame_stack_clear(&stomp->frame_stack);
|
||||
efree(stomp);
|
||||
}
|
||||
/* }}} */
|
||||
|
@ -264,7 +319,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
|
|||
/* Headers */
|
||||
if (frame->headers) {
|
||||
|
||||
char *key;
|
||||
char *key;
|
||||
ulong pos;
|
||||
zend_hash_internal_pointer_reset(frame->headers);
|
||||
|
||||
|
@ -285,7 +340,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
|
|||
}
|
||||
|
||||
if (frame->body_length > 0) {
|
||||
smart_str_appends(&buf, "content-length:");
|
||||
smart_str_appendl(&buf, "content-length:", sizeof("content-length:") - 1);
|
||||
smart_str_append_long(&buf, frame->body_length);
|
||||
smart_str_appendc(&buf, '\n');
|
||||
}
|
||||
|
@ -296,35 +351,32 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
|
|||
smart_str_appendl(&buf, frame->body, frame->body_length > 0 ? frame->body_length : strlen(frame->body));
|
||||
}
|
||||
|
||||
if (!stomp_writeable(stomp)) {
|
||||
char error[1024];
|
||||
snprintf(error, sizeof(error), "Unable to send data");
|
||||
stomp_set_error(stomp, error, errno, NULL);
|
||||
smart_str_appendl(&buf, "\0", sizeof("\0")-1);
|
||||
|
||||
if (!stomp_writable(stomp)) {
|
||||
smart_str_free(&buf);
|
||||
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef HAVE_STOMP_SSL
|
||||
if (stomp->options.use_ssl) {
|
||||
if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) {
|
||||
char error[1024];
|
||||
snprintf(error, sizeof(error), "Unable to send data");
|
||||
stomp_set_error(stomp, error, errno, NULL);
|
||||
int ret;
|
||||
if (-1 == (ret = SSL_write(stomp->ssl_handle, buf.c, buf.len))) {
|
||||
smart_str_free(&buf);
|
||||
stomp_set_error(stomp, "Unable to send data", errno, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
#endif
|
||||
if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) {
|
||||
char error[1024];
|
||||
snprintf(error, sizeof(error), "Unable to send data");
|
||||
stomp_set_error(stomp, error, errno, NULL);
|
||||
#endif
|
||||
if (-1 == send(stomp->fd, buf.c, buf.len, 0)) {
|
||||
smart_str_free(&buf);
|
||||
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
#ifdef HAVE_STOMP_SSL
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
smart_str_free(&buf);
|
||||
|
||||
|
@ -334,10 +386,12 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
|
|||
|
||||
/* {{{ stomp_recv
|
||||
*/
|
||||
int stomp_recv(stomp_t *stomp, char *msg, size_t length)
|
||||
static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length)
|
||||
{
|
||||
int len;
|
||||
|
||||
stomp_select(stomp);
|
||||
|
||||
#if HAVE_STOMP_SSL
|
||||
if(stomp->options.use_ssl) {
|
||||
len = SSL_read(stomp->ssl_handle, msg, length);
|
||||
|
@ -348,66 +402,142 @@ int stomp_recv(stomp_t *stomp, char *msg, size_t length)
|
|||
}
|
||||
#endif
|
||||
|
||||
if (len == 0) {
|
||||
TSRMLS_FETCH();
|
||||
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, "Unexpected EOF while reading from socket");
|
||||
if (len == -1) {
|
||||
#if HAVE_STOMP_SSL
|
||||
if (stomp->options.use_ssl) {
|
||||
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL in use)", strerror(errno));
|
||||
} else {
|
||||
#endif
|
||||
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL not in use)", strerror(errno));
|
||||
#if HAVE_STOMP_SSL
|
||||
}
|
||||
#endif
|
||||
stomp->status = -1;
|
||||
} else if (len == 0) {
|
||||
stomp_set_error(stomp, "Sender closed connection unexpectedly", 0, NULL);
|
||||
stomp->status = -1;
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int stomp_recv(stomp_t *stomp, char *msg, const size_t length)
|
||||
{
|
||||
if (stomp->read_buffer.size == 0) {
|
||||
if (length >= STOMP_BUFSIZE) {
|
||||
return _stomp_recv(stomp, msg, length);
|
||||
} else {
|
||||
size_t recv_size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
|
||||
if (recv_size <= length) {
|
||||
memcpy(msg, stomp->read_buffer.buf, recv_size);
|
||||
return recv_size;
|
||||
} else {
|
||||
memcpy(msg, stomp->read_buffer.buf, length);
|
||||
stomp->read_buffer.pos = stomp->read_buffer.buf + length;
|
||||
stomp->read_buffer.size = recv_size - length;
|
||||
return length;
|
||||
}
|
||||
}
|
||||
} else if (stomp->read_buffer.size >= length) {
|
||||
memcpy(msg, stomp->read_buffer.pos, length);
|
||||
stomp->read_buffer.pos += length;
|
||||
stomp->read_buffer.size -= length;
|
||||
return length;
|
||||
} else {
|
||||
int len = stomp->read_buffer.size;
|
||||
memcpy(msg, stomp->read_buffer.pos, stomp->read_buffer.size);
|
||||
stomp->read_buffer.size = 0;
|
||||
if (stomp_select_ex(stomp, 0, 0)) {
|
||||
return len + stomp_recv(stomp, msg + len, length - len);
|
||||
} else {
|
||||
return len;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_read_buffer
|
||||
/* {{{ _stomp_read_until
|
||||
*/
|
||||
static int stomp_read_buffer(stomp_t *stomp, char **data)
|
||||
static size_t _stomp_read_until(stomp_t *stomp, char **data, const char delimiter)
|
||||
{
|
||||
int rc = 0;
|
||||
size_t i = 0;
|
||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
||||
size_t length = 0;
|
||||
size_t bufsize = STOMP_BUFSIZE;
|
||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE);
|
||||
|
||||
while (1) {
|
||||
unsigned int i, found;
|
||||
char *c;
|
||||
found = 0;
|
||||
|
||||
size_t length = 1;
|
||||
rc = stomp_recv(stomp, buffer + i, length);
|
||||
if (rc < 1) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
// First populate the buffer
|
||||
if (stomp->read_buffer.size == 0) {
|
||||
stomp->read_buffer.size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
|
||||
stomp->read_buffer.pos = stomp->read_buffer.buf;
|
||||
}
|
||||
|
||||
if (1 == length) {
|
||||
i++;
|
||||
|
||||
if (buffer[i-1] == 0) {
|
||||
char endline[1];
|
||||
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
|
||||
efree(buffer);
|
||||
return 0;
|
||||
}
|
||||
// Then search the delimiter
|
||||
c = stomp->read_buffer.pos;
|
||||
for (i = 1; i <= stomp->read_buffer.size ; i++) {
|
||||
if (*c == delimiter) {
|
||||
found = 1;
|
||||
break;
|
||||
} else {
|
||||
c++;
|
||||
}
|
||||
}
|
||||
if (!found) i--;
|
||||
|
||||
if (i >= bufsize) {
|
||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||
bufsize += STOMP_BUFSIZE;
|
||||
}
|
||||
// Make sure we have enough place in the buffer
|
||||
if ((i+length) >= bufsize) {
|
||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||
bufsize += STOMP_BUFSIZE;
|
||||
}
|
||||
|
||||
// Copy and update the buffer
|
||||
memcpy(buffer + length, stomp->read_buffer.pos, i);
|
||||
length += i;
|
||||
stomp->read_buffer.pos += i;
|
||||
stomp->read_buffer.size -= i;
|
||||
|
||||
if (found) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i > 1) {
|
||||
*data = (char *) emalloc(i);
|
||||
if (NULL == *data) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(*data, buffer, i);
|
||||
if (length) {
|
||||
*data = buffer;
|
||||
} else {
|
||||
efree(buffer);
|
||||
*data = NULL;
|
||||
}
|
||||
|
||||
efree(buffer);
|
||||
return length;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
return i-1;
|
||||
/* {{{ stomp_read_buffer
|
||||
*/
|
||||
static size_t stomp_read_buffer(stomp_t *stomp, char **data)
|
||||
{
|
||||
size_t length = _stomp_read_until(stomp, data, 0);
|
||||
if (stomp_select_ex(stomp, 0, 0)) {
|
||||
char endline[1];
|
||||
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
|
||||
if (*data) {
|
||||
efree(*data);
|
||||
*data = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (length > 1) {
|
||||
length --;
|
||||
} else if (length) {
|
||||
efree(*data);
|
||||
*data = NULL;
|
||||
length = 0;
|
||||
}
|
||||
return length;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
|
@ -415,52 +545,16 @@ static int stomp_read_buffer(stomp_t *stomp, char **data)
|
|||
*/
|
||||
static int stomp_read_line(stomp_t *stomp, char **data)
|
||||
{
|
||||
int rc = 0;
|
||||
size_t i = 0;
|
||||
size_t bufsize = STOMP_BUFSIZE + 1;
|
||||
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
|
||||
|
||||
while (1) {
|
||||
|
||||
size_t length = 1;
|
||||
rc = stomp_recv(stomp, buffer + i, length);
|
||||
if (rc < 1) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (1 == length) {
|
||||
i++;
|
||||
|
||||
if (buffer[i-1] == '\n') {
|
||||
buffer[i-1] = 0;
|
||||
break;
|
||||
} else if (buffer[i-1] == 0) {
|
||||
efree(buffer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (i >= bufsize) {
|
||||
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
|
||||
bufsize += STOMP_BUFSIZE;
|
||||
}
|
||||
}
|
||||
|
||||
size_t length = _stomp_read_until(stomp, data, '\n');
|
||||
if (length > 1) {
|
||||
(*data)[length - 1] = 0;
|
||||
length--;
|
||||
} else if (length) {
|
||||
efree(*data);
|
||||
*data = NULL;
|
||||
length = 0;
|
||||
}
|
||||
|
||||
if (i > 1) {
|
||||
*data = (char *) emalloc(i);
|
||||
if (NULL == *data) {
|
||||
efree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(*data, buffer, i);
|
||||
}
|
||||
|
||||
efree(buffer);
|
||||
|
||||
return i-1;
|
||||
return length;
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
|
@ -484,16 +578,16 @@ void stomp_free_frame(stomp_frame_t *frame)
|
|||
}
|
||||
/* }}} */
|
||||
|
||||
/* {{{ stomp_read_frame
|
||||
/* {{{ stomp_read_frame
|
||||
*/
|
||||
stomp_frame_t *stomp_read_frame(stomp_t *stomp)
|
||||
stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
|
||||
{
|
||||
stomp_frame_t *f = NULL;
|
||||
char *cmd = NULL, *length_str = NULL;
|
||||
int length = 0;
|
||||
|
||||
if (stomp->buffer) {
|
||||
return stomp_frame_buffer_shift(&stomp->buffer);
|
||||
if (use_stack && stomp->frame_stack) {
|
||||
return stomp_frame_stack_shift(&stomp->frame_stack);
|
||||
}
|
||||
|
||||
if (!stomp_select(stomp)) {
|
||||
|
@ -519,20 +613,20 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
|
|||
while (1) {
|
||||
char *p = NULL;
|
||||
length = stomp_read_line(stomp, &p);
|
||||
|
||||
|
||||
if (length < 0) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
|
||||
if (0 == length) {
|
||||
break;
|
||||
} else {
|
||||
} else {
|
||||
char *p2 = NULL;
|
||||
char *key;
|
||||
char *value;
|
||||
|
||||
p2 = strstr(p,":");
|
||||
|
||||
|
||||
if (p2 == NULL) {
|
||||
efree(p);
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
|
@ -553,19 +647,26 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
|
|||
|
||||
/* Check for the content length */
|
||||
if (zend_hash_find(f->headers, "content-length", sizeof("content-length"), (void **)&length_str) == SUCCESS) {
|
||||
int recv_size = 0;
|
||||
char endbuffer[2];
|
||||
length = 2;
|
||||
|
||||
f->body_length = atoi(length_str);
|
||||
f->body = (char *) emalloc(f->body_length);
|
||||
|
||||
if (-1 == stomp_recv(stomp, f->body, f->body_length)) {
|
||||
while (recv_size != f->body_length) {
|
||||
int l = stomp_recv(stomp, f->body + recv_size, f->body_length - recv_size);
|
||||
if (-1 == l) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
} else {
|
||||
recv_size += l;
|
||||
}
|
||||
}
|
||||
|
||||
length = stomp_recv(stomp, endbuffer, 2);
|
||||
if (endbuffer[0] != '\0' || ((2 == length) && (endbuffer[1] != '\n'))) {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
|
||||
if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
|
||||
RETURN_READ_FRAME_FAIL;
|
||||
}
|
||||
} else {
|
||||
f->body_length = stomp_read_buffer(stomp, &f->body);
|
||||
}
|
||||
|
@ -578,14 +679,12 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
|
|||
*/
|
||||
int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
||||
int success = 1;
|
||||
char error[1024];
|
||||
char *receipt = NULL;
|
||||
|
||||
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
|
||||
stomp_frame_cell_t *buffer = NULL;
|
||||
success = 0;
|
||||
while (1) {
|
||||
stomp_frame_t *res = stomp_read_frame(stomp);
|
||||
stomp_frame_t *res = stomp_read_frame_ex(stomp, 0);
|
||||
if (res) {
|
||||
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
|
||||
char *receipt_id = NULL;
|
||||
|
@ -594,25 +693,21 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
|||
&& !strcmp(receipt, receipt_id)) {
|
||||
success = 1;
|
||||
} else {
|
||||
snprintf(error, sizeof(error), "Unexpected receipt id : %s", receipt_id);
|
||||
stomp_set_error(stomp, error, 0, NULL);
|
||||
stomp_set_error(stomp, "Invalid receipt", 0, "%s", receipt_id);
|
||||
}
|
||||
stomp_free_frame(res);
|
||||
stomp->buffer = buffer;
|
||||
return success;
|
||||
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
|
||||
char *error_msg = NULL;
|
||||
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
|
||||
stomp_set_error(stomp, error_msg, 0, res->body);
|
||||
stomp_set_error(stomp, error_msg, 0, "%s", res->body);
|
||||
}
|
||||
stomp_free_frame(res);
|
||||
stomp->buffer = buffer;
|
||||
return success;
|
||||
} else {
|
||||
stomp_frame_buffer_push(&buffer, res);
|
||||
stomp_frame_stack_push(&stomp->frame_stack, res);
|
||||
}
|
||||
} else {
|
||||
stomp->buffer = buffer;
|
||||
return success;
|
||||
}
|
||||
}
|
||||
|
@ -623,28 +718,26 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
|
|||
|
||||
/* {{{ stomp_select
|
||||
*/
|
||||
int stomp_select(stomp_t *stomp)
|
||||
int stomp_select_ex(stomp_t *stomp, const long int sec, const long int usec)
|
||||
{
|
||||
int n;
|
||||
struct timeval tv;
|
||||
|
||||
if (stomp->buffer) {
|
||||
if (stomp->read_buffer.size || stomp->frame_stack) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
tv.tv_sec = stomp->options.read_timeout_sec;
|
||||
tv.tv_usec = stomp->options.read_timeout_usec;
|
||||
tv.tv_sec = sec;
|
||||
tv.tv_usec = usec;
|
||||
|
||||
n = php_pollfd_for(stomp->fd, PHP_POLLREADABLE, &tv);
|
||||
if (n < 1) {
|
||||
#if !defined(PHP_WIN32) && !(defined(NETWARE) && defined(USE_WINSOCK))
|
||||
if (n == 0) {
|
||||
if (n == 0) {
|
||||
errno = ETIMEDOUT;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
/* }}} */
|
||||
|
|
|
@ -53,10 +53,10 @@ typedef struct _stomp_frame {
|
|||
int body_length;
|
||||
} stomp_frame_t;
|
||||
|
||||
typedef struct _stomp_frame_cell {
|
||||
typedef struct _stomp_frame_stack {
|
||||
stomp_frame_t *frame;
|
||||
struct _stomp_frame_cell *next;
|
||||
} stomp_frame_cell_t;
|
||||
struct _stomp_frame_stack *next;
|
||||
} stomp_frame_stack_t;
|
||||
|
||||
typedef struct _stomp {
|
||||
php_socket_t fd;
|
||||
|
@ -72,18 +72,26 @@ typedef struct _stomp {
|
|||
#if HAVE_STOMP_SSL
|
||||
SSL *ssl_handle;
|
||||
#endif
|
||||
stomp_frame_cell_t *buffer;
|
||||
stomp_frame_stack_t *frame_stack;
|
||||
struct {
|
||||
size_t size;
|
||||
char buf[STOMP_BUFSIZE];
|
||||
char *pos;
|
||||
} read_buffer;
|
||||
} stomp_t;
|
||||
|
||||
stomp_t *stomp_init();
|
||||
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
|
||||
void stomp_close(stomp_t *stomp);
|
||||
int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC);
|
||||
stomp_frame_t *stomp_read_frame(stomp_t *connection);
|
||||
stomp_frame_t *stomp_read_frame_ex(stomp_t *connection, int use_stack);
|
||||
int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
|
||||
int stomp_select(stomp_t *connection);
|
||||
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details);
|
||||
int stomp_select_ex(stomp_t *connection, const long int sec, const long int usec);
|
||||
void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...) ZEND_ATTRIBUTE_PTR_FORMAT(printf, 4, 0);
|
||||
void stomp_free_frame(stomp_frame_t *frame);
|
||||
|
||||
#define stomp_select(s) stomp_select_ex(s, s->options.read_timeout_sec, s->options.read_timeout_sec)
|
||||
#define stomp_read_frame(c) stomp_read_frame_ex(c, 1)
|
||||
#endif /* _STOMP_H_ */
|
||||
|
||||
/*
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
--TEST--
|
||||
Test stomp_connect() - Test error on CONNECT
|
||||
--SKIPIF--
|
||||
<?php if (!extension_loaded("stomp")) print "skip"; ?>
|
||||
--FILE--
|
||||
<?php
|
||||
try {
|
||||
$stomp = new Stomp('tcp://localhost', 'anotpresentusername1234');
|
||||
} catch (Exception $e) {
|
||||
var_dump(get_class($e));
|
||||
}
|
||||
?>
|
||||
--EXPECTF--
|
||||
string(14) "StompException"
|
|
@ -0,0 +1,14 @@
|
|||
--TEST--
|
||||
Test stomp::send() - test send with receipt
|
||||
--SKIPIF--
|
||||
<?php
|
||||
if (!extension_loaded("stomp")) print "skip";
|
||||
if (!stomp_connect()) print "skip";
|
||||
?>
|
||||
--FILE--
|
||||
<?php
|
||||
$s = new Stomp();
|
||||
var_dump($s->send('/queue/test-06', 'A real message', array('receipt' => 'message-12345')));
|
||||
?>
|
||||
--EXPECTF--
|
||||
bool(true)
|
|
@ -0,0 +1,26 @@
|
|||
--TEST--
|
||||
Test stomp::readFrame() - test frame stack
|
||||
--SKIPIF--
|
||||
<?php
|
||||
if (!extension_loaded("stomp")) print "skip";
|
||||
if (!stomp_connect()) print "skip";
|
||||
?>
|
||||
--FILE--
|
||||
<?php
|
||||
$s = new Stomp();
|
||||
var_dump($s->subscribe('/queue/test-buffer', array('ack' => 'auto')));
|
||||
var_dump($s->send('/queue/test-buffer', "Message1", array('receipt' => 'msg-1')));
|
||||
var_dump($s->send('/queue/test-buffer', "Message2", array('receipt' => 'msg-2')));
|
||||
var_dump($s->send('/queue/test-buffer', "Message3", array('receipt' => 'msg-3')));
|
||||
var_dump($s->readFrame()->body);
|
||||
var_dump($s->readFrame()->body);
|
||||
var_dump($s->readFrame()->body);
|
||||
?>
|
||||
--EXPECTF--
|
||||
bool(true)
|
||||
bool(true)
|
||||
bool(true)
|
||||
bool(true)
|
||||
string(8) "Message1"
|
||||
string(8) "Message2"
|
||||
string(8) "Message3"
|
|
@ -19,7 +19,7 @@ try {
|
|||
$stomp->send($queue, $msg);
|
||||
|
||||
/* subscribe to messages from the queue 'foo' */
|
||||
$stomp->subscribe($queue);
|
||||
$stomp->subscribe($queue, array('ack' => 'auto'));
|
||||
|
||||
/* read a frame */
|
||||
$frame = $stomp->readFrame();
|
||||
|
|
Loading…
Reference in New Issue