Compare commits

..

80 Commits

Author SHA1 Message Date
Vitaliy Filippov acf43f619f Only add content-length header for messages containing zero byte
Allows OpenMQ/ActiveMQ to correctly map messages to JMS TextMessages
instead of only BytesMessages.

Fixes https://bugs.php.net/bug.php?id=70280
2015-08-18 12:08:17 +03:00
Vitaliy Filippov 618d2bd4d4 Fix compiler warning (assignment makes pointer from integer without a cast) 2015-08-18 12:07:50 +03:00
Gennady Feldman eb55496781 Fixing stomp_writable() check so we can catch when connect failed. 2015-06-05 14:41:04 -04:00
Gennady Feldman 15a3b9a059 1.0.8 => 1.0.9-dev 2015-05-18 18:19:58 -04:00
Gennady Feldman 50bfb5d8da Releasing v1.0.8 2015-05-18 18:17:11 -04:00
Gennady Feldman 4bd40f20a2 Fixing PHP_STOMP_VERSION constant, per Remi's request. 2015-05-18 18:08:25 -04:00
Remi Collet bf4539e523 fix perm on source files 2015-05-16 07:18:45 +02:00
Gennady Feldman 2c4b2f8bf1 1.0.7 => 1.0.8-dev 2015-05-15 17:21:29 -04:00
Gennady Feldman 0e38c2452e Releasing v1.0.7 2015-05-15 17:12:34 -04:00
Gennady Feldman 59b74e4142 Merge branch 'pull-request/6' 2015-05-05 10:20:59 -04:00
Gennady Feldman fbacdccd37 Fix error checking after stomp_send().
This one has been in there for a while and would hang for me in
1.0.6 release if the connection was refused.

Some tests would fail too. This error was also somewhat masked in
previous release. We should abort and throw an exception if send
fails.
2015-05-05 10:17:49 -04:00
Gennady Feldman bdedf5922c Adding #ifdef HAVE_NETINET_IN_H around the new TCP_NODELAY code to fix compilation on Windows and possibly others. 2015-02-25 11:58:32 -05:00
Pierrick Charron 8b0bbd4db2 Improve frame stack and add test 2014-12-09 01:49:39 -05:00
Pierrick Charron 79a3ca34c4 Fix error message on invalid receipt 2014-12-09 01:10:11 -05:00
Remi Collet 5024fff376 add missing LICENSE file (mandatory for downstream) 2014-12-08 06:25:08 +01:00
Pierrick Charron 8aece537c1 1.0.6 => 1.0.7-dev 2014-12-07 21:57:57 -05:00
Pierrick Charron ca0de7d66a Release stomp v1.0.6 2014-12-07 21:53:45 -05:00
Pierrick Charron 200bd35bb2 Fix error message mismatch 2014-12-07 21:15:03 -05:00
Pierrick Charron 059566498b Make stomp_set_error() to be printf like (mi+php at aldan dot algebra dot com) 2014-12-07 18:49:04 -05:00
Pierrick Charron d37e34917d Make sure connection is closed on error 2014-12-07 14:40:23 -05:00
Pierrick Charron 189b8dcfc2 Fixed bug #64671 (Add stomp_nack and Stomp::nack functions). 2014-12-07 13:22:01 -05:00
Pierrick Charron 026798d7c4 Update package.xml for merged #68497 2014-12-07 13:03:31 -05:00
Pierrick Charron aafb968d49 Merge remote-tracking branch 'github-fntlnz/fix-bug-68497' 2014-12-07 13:01:37 -05:00
Pierrick Charron c338977f53 Update package.xml for Bug #67170 2014-12-07 12:55:45 -05:00
Lorenzo Fontana 8e18eb079d Fix Bug #68497 Stomp client doesn't parse ERROR response on CONNECT 2014-12-02 19:28:20 +01:00
Yarek Tyshchenko 214d0eb602 Disable Nagle's Algorithm by adding TCP_NODELAY option to the socket
Signed-off-by: Gennady Feldman <gena01@gmail.com>
2014-12-01 10:43:38 -05:00
Peter Petermann beb86e70fe added fix be marcelog, see #59970 2013-03-17 17:33:02 +01:00
Pierrick Charron 5ac6a47e5d Remove compilation warnings 2012-11-21 08:59:02 -05:00
Pierrick Charron 6dff7f7fd1 Remove useless new line 2012-11-20 21:45:38 -05:00
Pierrick Charron d705083ab3 Merge branch 'read_buffer'
* read_buffer:
  Reimplement read_line and read_buffer
  read buffer implementation on top of stomp_recv
2012-11-20 21:28:31 -05:00
Pierrick Charron e7feab2950 Reimplement read_line and read_buffer 2012-11-20 21:25:46 -05:00
Pierrick Charron 19f47cd105 read buffer implementation on top of stomp_recv 2012-11-20 19:54:00 -05:00
Pierrick Charron 9e0839fff1 Rename few things 2012-11-20 19:12:26 -05:00
Pierrick Charron 36c7391402 Auto ack for this test 2012-11-20 17:34:51 -05:00
Pierrick Charron 6a0fd1393f Package.xml 2012-11-20 17:11:12 -05:00
Pierrick Charron d4c9f11e07 Combine the two sendto call in one 2012-11-20 17:00:43 -05:00
Pierrick Charron 0a6e6b2f5a s#writeable#writable#g 2012-11-19 09:03:22 -05:00
Pierrick Charron a1619b9aa1 Add new note 2012-11-18 20:31:55 -05:00
Pierrick Charron 520e311c83 Fix rabbitmq compatibility 2012-11-18 20:20:23 -05:00
Pierrick Charron 71caee45bc Add two new ini config default_username and default_password 2012-11-18 19:33:12 -05:00
Pierrick Charron 5f7d260c05 Fix stomp_read_frame when buffered 2012-11-18 19:23:27 -05:00
Pierrick Charron ce374724b9 1.0.5 => 1.0.6-dev 2012-11-18 17:35:17 -05:00
Pierrick Charron 5a3c1fe602 Package XML 2012-11-18 17:32:28 -05:00
Pierrick Charron 04fd7131b1 Prepare version number for release 2012-11-18 17:32:21 -05:00
Pierrick Charron acde8ab523 Add tests for binary safety 2012-11-18 17:12:26 -05:00
Pierrick Charron 8e8861ea8f Auto ack received messages for readframe test 2012-11-18 17:08:14 -05:00
Pierrick Charron 7d61983988 Fixed bug #59972 - Make body binary safe 2012-11-18 17:05:32 -05:00
Pierrick Charron 5793a7efee Add a buffer for receipts 2012-11-18 11:09:14 -05:00
Pierrick Charron d83dfa7117 Fix memory leak when Stomp can not write it's message 2012-11-17 17:27:40 -05:00
Pierrick Charron 49ae478103 Fix test to work on all versions of activeMQ 2012-11-17 12:38:54 -05:00
Pierrick Charron b68730beb9 New maintenance version 2012-09-17 14:13:40 +00:00
Pierrick Charron 4886ef7435 Fixed bug #62831 (Stomp module seems not initializing SSL library first). (Patch by lwhsu at lwhsu dot org) 2012-08-16 03:23:42 +00:00
Pierrick Charron 6643708893 Add item in the todo list 2011-09-14 15:50:08 +00:00
Pierrick Charron 43be636313 Make test work with -dev 2011-09-09 19:27:26 +00:00
Pierrick Charron 4afeec47b5 Fix macros 2011-09-01 14:19:38 +00:00
Pierrick Charron e95a759e70 Modify the phpinfo to display either or not the SSL support is enabled 2010-10-16 03:50:50 +00:00
Pierrick Charron df9b30bbb3 Update the php documentation 2010-10-13 03:39:55 +00:00
Pierrick Charron 957f9528dc Tag 1.0.3 and bump trunk version 2010-10-13 03:28:20 +00:00
Pierrick Charron 98ed6eac3c Fixed default read timeout usec 2010-10-13 03:13:56 +00:00
Pierrick Charron 61327e6fa7 Fixed bug #18772 (setTimeout usecs not honored). 2010-10-13 03:13:19 +00:00
Pierrick Charron 90ef82399f Remove useless call to zend_hash_internal_pointer_reset 2010-08-20 05:36:18 +00:00
Pierrick Charron 8655cf0971 Update package.xml 2010-08-13 14:06:56 +00:00
Pierrick Charron 77271afef3 Modify package.xml and version number for release 1.0.2 2010-08-13 13:51:33 +00:00
Pierrick Charron d667481cd7 Coding standards 2010-08-12 03:27:21 +00:00
Pierrick Charron e4b0c389d6 Avoid too many call to php_pollfd_for_ms (this fix SSL connection problem) 2010-08-12 03:21:53 +00:00
Pierrick Charron cc688cd647 tag 1.0.1 release 2010-08-03 13:21:20 +00:00
Pierrick Charron c3d53e5f2e Update tests to match new changes 2010-07-27 06:09:14 +00:00
Pierrick Charron 7d24da3a22 Update package.xml with recent changes 2010-07-27 05:55:15 +00:00
Pierrick Charron 078bac3324 New details property in the StompException class
New method StompException::getDetails()
Add the body content in the Stomp::Error() method
2010-07-27 05:48:59 +00:00
Pierrick Charron ca58640f70 Fix memory leak 2010-07-26 05:11:37 +00:00
Pierrick Charron f444d89b61 TODO list 2010-06-09 20:31:41 +00:00
Pierrick Charron 97ab0bcdd2 Add more specific error messages when receipt is not valid 2010-06-08 03:56:04 +00:00
Pierrick Charron cabdb94a17 Update package.xml 2010-06-07 02:55:28 +00:00
Pierrick Charron 928bc0975e Code cleaning 2010-06-07 02:53:34 +00:00
Pierrick Charron 56c9d3d3f9 Fixed bug #17262 (Server is not responding on win32). 2010-06-07 02:31:30 +00:00
Pierrick Charron 3c59221f67 Remove unused variable 2010-06-07 01:06:18 +00:00
Pierrick Charron 2c77d459c0 Update tests 2010-06-07 00:07:31 +00:00
Pierrick Charron cb81e1651a Update package.xml 2010-06-06 18:31:29 +00:00
Pierrick Charron 994b70f0b6 Make the php_stomp.c compile with the new runtime cache 2010-06-06 18:22:03 +00:00
Pierrick Charron 3b33476692 Add headers param to the constructor to allow things like activemq durable/retroactive client support 2010-03-17 08:28:53 +00:00
25 changed files with 867 additions and 263 deletions

28
.gitignore vendored Normal file
View File

@ -0,0 +1,28 @@
.deps
.libs/
Makefile
Makefile.fragments
Makefile.global
Makefile.objects
acinclude.m4
aclocal.m4
autom4te.cache/
config.guess
config.h
config.h.in
config.log
config.nice
config.status
config.sub
configure
configure.in
install-sh
libtool
ltmain.sh
missing
mkinstalldirs
modules/
php_stomp.lo
run-tests.php
stomp.la
stomp.lo

68
LICENSE Normal file
View File

@ -0,0 +1,68 @@
--------------------------------------------------------------------
The PHP License, version 3.01
Copyright (c) 1999 - 2014 The PHP Group. All rights reserved.
--------------------------------------------------------------------
Redistribution and use in source and binary forms, with or without
modification, is permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
3. The name "PHP" must not be used to endorse or promote products
derived from this software without prior written permission. For
written permission, please contact group@php.net.
4. Products derived from this software may not be called "PHP", nor
may "PHP" appear in their name, without prior written permission
from group@php.net. You may indicate that your software works in
conjunction with PHP by saying "Foo for PHP" instead of calling
it "PHP Foo" or "phpfoo"
5. The PHP Group may publish revised and/or new versions of the
license from time to time. Each version will be given a
distinguishing version number.
Once covered code has been published under a particular version
of the license, you may always continue to use it under the terms
of that version. You may also choose to use such covered code
under the terms of any subsequent version of the license
published by the PHP Group. No one other than the PHP Group has
the right to modify the terms applicable to covered code created
under this License.
6. Redistributions of any form whatsoever must retain the following
acknowledgment:
"This product includes PHP software, freely available from
<http://www.php.net/software/>".
THIS SOFTWARE IS PROVIDED BY THE PHP DEVELOPMENT TEAM ``AS IS'' AND
ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PHP
DEVELOPMENT TEAM OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
This software consists of voluntary contributions made by many
individuals on behalf of the PHP Group.
The PHP Group can be contacted via Email at group@php.net.
For more information on the PHP Group and the PHP project,
please see <http://www.php.net>.
PHP includes the Zend Engine, freely available at
<http://www.zend.com>.

5
TODO Normal file
View File

@ -0,0 +1,5 @@
- Add the ability to connect to a network of brokers
(Cf: http://activemq.apache.org/networks-of-brokers.html)
- Add Stomp 1.1 full support
- Verify if we need to call recv with MSG_PEEK for the newline char
or if the current implementation is enough (cf bug #59217)

View File

@ -5,11 +5,12 @@ class Stomp {
/** /**
* Connect to server * Connect to server
* *
* @param string $broker Broker URI * @param string $broker The broker URI
* @param string $username The username * @param string $username The username
* @param string $password The password * @param string $password The password
* @param array $headers additional headers (example: receipt).
*/ */
public function __construct($broker = null, $username = null, $password = null) { public function __construct($broker = null, $username = null, $password = null, array $headers = array()) {
} }
/** /**
@ -33,30 +34,30 @@ class Stomp {
* *
* @param string $destination indicates where to send the message * @param string $destination indicates where to send the message
* @param string|StompFrame $msg message to be sent * @param string|StompFrame $msg message to be sent
* @param array $properties extra properties (example: receipt, transaction) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function send($destination, $msg, array $properties = array()) { public function send($destination, $msg, array $headers = array()) {
} }
/** /**
* Register to listen to a given destination * Register to listen to a given destination
* *
* @param string $destination indicates which destination to subscribe to * @param string $destination indicates which destination to subscribe to
* @param array $properties extra properties (example: receipt, transaction, id) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function subscribe($destination, array $properties = array()) { public function subscribe($destination, array $headers = array()) {
} }
/** /**
* Remove an existing subscription * Remove an existing subscription
* *
* @param string $destination indicates which subscription to remove * @param string $destination indicates which subscription to remove
* @param array $properties extra properties (example: receipt, transaction, id) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function unsubscribe($destination, array $properties = array()) { public function unsubscribe($destination, array $headers = array()) {
} }
/** /**
@ -107,10 +108,10 @@ class Stomp {
* Acknowledge consumption of a message from a subscription using client acknowledgment * Acknowledge consumption of a message from a subscription using client acknowledgment
* *
* @param string|StompFrame $msg message/messageId to be acknowledged * @param string|StompFrame $msg message/messageId to be acknowledged
* @param array $properties extra properties (example: receipt, transaction) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
public function ack($msg, array $properties = array()) { public function ack($msg, array $headers = array()) {
} }
/** /**
@ -163,4 +164,12 @@ class StompFrame {
} }
class StompException extends Exception { class StompException extends Exception {
/**
* Get the stomp server error details
*
* @return string
*/
public function getDetails() {
}
} }

View File

@ -14,9 +14,10 @@ function stomp_version() {
* @param string $broker broker URI * @param string $broker broker URI
* @param string $username The username * @param string $username The username
* @param string $password The password * @param string $password The password
* @param array $headers additional headers (example: receipt).
* @return Ressource stomp connection identifier on success, or FALSE on failure * @return Ressource stomp connection identifier on success, or FALSE on failure
*/ */
function stomp_connect($broker = null, $username = null, $password = null) { function stomp_connect($broker = null, $username = null, $password = null, array $headers = array()) {
} }
/** /**
@ -43,10 +44,10 @@ function stomp_close($link) {
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates where to send the message * @param string $destination indicates where to send the message
* @param string|StompFrame $msg message to be sent * @param string|StompFrame $msg message to be sent
* @param array $properties extra properties (example: receipt, transaction) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_send($link, $destination, $msg, array $properties = array()) { function stomp_send($link, $destination, $msg, array $headers = array()) {
} }
/** /**
@ -54,10 +55,10 @@ function stomp_send($link, $destination, $msg, array $properties = array()) {
* *
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates which destination to subscribe to * @param string $destination indicates which destination to subscribe to
* @param array $properties extra properties (example: receipt, transaction, id) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_subscribe($link, $destination, array $properties = array()) { function stomp_subscribe($link, $destination, array $headers = array()) {
} }
/** /**
@ -65,10 +66,10 @@ function stomp_subscribe($link, $destination, array $properties = array()) {
* *
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string $destination indicates which subscription to remove * @param string $destination indicates which subscription to remove
* @param array $properties extra properties (example: receipt, transaction, id) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_unsubscribe($link, $destination, array $properties = array()) { function stomp_unsubscribe($link, $destination, array $headers = array()) {
} }
/** /**
@ -124,10 +125,10 @@ function stomp_abort($link, $transaction_id) {
* *
* @param ressource $link identifier returned by stomp_connect * @param ressource $link identifier returned by stomp_connect
* @param string|StompFrame $msg message/messageId to be acknowledged * @param string|StompFrame $msg message/messageId to be acknowledged
* @param array $properties extra properties (example: receipt, transaction) * @param array $headers additional headers (example: receipt).
* @return boolean TRUE on success, or FALSE on failure * @return boolean TRUE on success, or FALSE on failure
*/ */
function stomp_ack($link, $msg, array $properties = array()) { function stomp_ack($link, $msg, array $headers = array()) {
} }
/** /**

View File

@ -12,17 +12,22 @@ This extension allows php applications to communicate with any Stomp compliant M
<email>pierrick@php.net</email> <email>pierrick@php.net</email>
<active>yes</active> <active>yes</active>
</lead> </lead>
<date>2010-02-11</date> <lead>
<version><release>1.0.0</release><api>1.0.0</api></version> <name>Gennady Feldman</name>
<user>gena01</user>
<email>gena01@php.net</email>
<active>yes</active>
</lead>
<date>XXXX-XX-XX</date>
<version><release>1.0.X</release><api>1.0.X</api></version>
<stability><release>stable</release><api>stable</api></stability> <stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license> <license uri="http://www.php.net/license">PHP License</license>
<notes> <notes>
- Bump to stable
</notes> </notes>
<contents> <contents>
<dir name="/"> <dir name="/">
<file role="doc" name="CREDITS" /> <file role="doc" name="CREDITS" />
<file role="doc" name="LICENSE" />
<file role="src" name="config.m4" /> <file role="src" name="config.m4" />
<file role="src" name="config.w32" /> <file role="src" name="config.w32" />
<file role="src" name="php_stomp.c" /> <file role="src" name="php_stomp.c" />
@ -43,6 +48,8 @@ This extension allows php applications to communicate with any Stomp compliant M
<file role="test" name="tests/009-readFrame/001.phpt" /> <file role="test" name="tests/009-readFrame/001.phpt" />
<file role="test" name="tests/009-readFrame/002.phpt" /> <file role="test" name="tests/009-readFrame/002.phpt" />
<file role="test" name="tests/009-readFrame/003.phpt" /> <file role="test" name="tests/009-readFrame/003.phpt" />
<file role="test" name="tests/009-readFrame/004.phpt" />
<file role="test" name="tests/009-readFrame/005.phpt" />
<file role="test" name="tests/010-timeout/001.phpt" /> <file role="test" name="tests/010-timeout/001.phpt" />
<file role="test" name="tests/010-timeout/002.phpt" /> <file role="test" name="tests/010-timeout/002.phpt" />
<file role="test" name="tests/011-commit/001.phpt" /> <file role="test" name="tests/011-commit/001.phpt" />
@ -76,6 +83,106 @@ This extension allows php applications to communicate with any Stomp compliant M
</extsrcrelease> </extsrcrelease>
<changelog> <changelog>
<release>
<version><release>1.0.8</release><api>1.0.8</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-18</date>
<notes>
- Fix perm on source files. (Remi)
- Fixing PHP_STOMP_VERSION constant, per Remi's request. (Gennady)
</notes>
</release>
<release>
<version><release>1.0.7</release><api>1.0.7</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2015-05-15</date>
<notes>
- add LICENSE file as documentation (Remi)
- Fixed Windows compilation regression due to new TCP_NODELAY code. (Gennady Feldman)
- Fixed bug where error checking was missing after stomp_send(). (Gennady Feldman)
</notes>
</release>
<release>
<version><release>1.0.6</release><api>1.0.6</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2014-12-07</date>
<notes>
- Add two new ini options stomp.default_username and stomp.default_passowrd (Pierrick)
- General performance improvements (Pierrick)
- Fix stomp_read_frame when buffered (Pierrick)
- Fixed bug #59217 (Connections to RabbitMQ via CLI). (Pierrick).
- Fixed bug #59970 (acking a message makes rabbitmq disconnect the server). (Pierrick)
- Fixed bug #67170 (Disable Nagle's Algorithm with TCP_NODELAY, it delays sending small messages). (Yarek Tyshchenko)
- Fixed bug #68497 (Stomp client doesn't parse ERROR response on CONNECT). (Lorenzo Fontana)
- Fixed bug #64671 (Add stomp_nack and Stomp::nack functions). (Pierrick)
</notes>
</release>
<release>
<version><release>1.0.5</release><api>1.0.5</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2012-11-18</date>
<notes>
- Fix memory leak when Stomp can't write the message on the queue. (Pierrick)
- Add a buffer for receipts. (Pierrick)
- Fixed bug #62831 (Stomp module seems not initializing SSL library first).
(Patch by lwhsu at lwhsu dot org)
- Fixed bug #59972 (Message body are not binary safe). (Pierrick)
</notes>
</release>
<release>
<version><release>1.0.4</release><api>1.0.4</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2012-09-17</date>
<notes>
- Fix compatibility with 5.4
</notes>
</release>
<release>
<version><release>1.0.3</release><api>1.0.3</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-10-12</date>
<notes>
- Fixed bug #18772 (setTimeout usecs not honored)
</notes>
</release>
<release>
<version><release>1.0.2</release><api>1.0.2</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-08-13</date>
<notes>
- Fixed SSL connection bug introduced in 1.0.1
</notes>
</release>
<release>
<version><release>1.0.1</release><api>1.0.1</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-08-03</date>
<notes>
- Add new parameter to the constructor to allow client to send extra informations
- Add zend engine runtime cache support (introduced into trunk)
- Add new details property in the StompException class
- Add new StompException::getDetails() method
- Add the frame body content in the Stomp::Error() method
- Fixed bug #17262 (Server is not responding on win32)
</notes>
</release>
<release>
<version><release>1.0.0</release><api>1.0.0</api></version>
<stability><release>stable</release><api>stable</api></stability>
<license uri="http://www.php.net/license">PHP License</license>
<date>2010-02-11</date>
<notes>
- Bump to stable
</notes>
</release>
<release> <release>
<version><release>0.4.1</release><api>0.4.1</api></version> <version><release>0.4.1</release><api>0.4.1</api></version>
<stability><release>beta</release><api>beta</api></stability> <stability><release>beta</release><api>beta</api></stability>

178
php_stomp.c Executable file → Normal file
View File

@ -51,7 +51,6 @@
zval **value = NULL; \ zval **value = NULL; \
char *string_key = NULL; \ char *string_key = NULL; \
ulong num_key; \ ulong num_key; \
zend_hash_internal_pointer_reset(headers_ht); \
for (zend_hash_internal_pointer_reset(headers_ht); \ for (zend_hash_internal_pointer_reset(headers_ht); \
zend_hash_get_current_data(headers_ht, (void **)&value) == SUCCESS; \ zend_hash_get_current_data(headers_ht, (void **)&value) == SUCCESS; \
zend_hash_move_forward(headers_ht)) { \ zend_hash_move_forward(headers_ht)) { \
@ -63,7 +62,9 @@
SEPARATE_ZVAL(value); \ SEPARATE_ZVAL(value); \
convert_to_string(*value); \ convert_to_string(*value); \
} \ } \
if (strcmp(string_key, "content-length") != 0) { \
zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \ zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \
}\
efree(string_key); \ efree(string_key); \
} \ } \
} }
@ -72,14 +73,27 @@
zend_hash_destroy(frame.headers); \ zend_hash_destroy(frame.headers); \
efree(frame.headers); efree(frame.headers);
#define STOMP_ERROR(errno, msg, ... ) \ #define STOMP_ERROR(errno, msg) \
STOMP_G(error_no) = errno; \ STOMP_G(error_no) = errno; \
if (STOMP_G(error_msg)) { \ if (STOMP_G(error_msg)) { \
efree(STOMP_G(error_msg)); \ efree(STOMP_G(error_msg)); \
} \ } \
STOMP_G(error_msg) = estrdup(msg); \ STOMP_G(error_msg) = estrdup(msg); \
if (stomp_object) { \ if (stomp_object) { \
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg, ##__VA_ARGS__); \ zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg); \
}
#define STOMP_ERROR_DETAILS(errno, msg, details) \
STOMP_G(error_no) = errno; \
if (STOMP_G(error_msg)) { \
efree(STOMP_G(error_msg)); \
} \
STOMP_G(error_msg) = estrdup(msg); \
if (stomp_object) { \
zval *object = zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg); \
if (details) { \
zend_update_property_string(stomp_ce_exception, object, "details", sizeof("details")-1, (char *) details TSRMLS_CC); \
} \
} }
static int le_stomp; static int le_stomp;
@ -101,6 +115,7 @@ ZEND_BEGIN_ARG_INFO_EX(stomp_connect_args, 0, 0, 0)
ZEND_ARG_INFO(0, broker) ZEND_ARG_INFO(0, broker)
ZEND_ARG_INFO(0, username) ZEND_ARG_INFO(0, username)
ZEND_ARG_INFO(0, password) ZEND_ARG_INFO(0, password)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO() ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_link_only, 0, 0, 1) ZEND_BEGIN_ARG_INFO_EX(stomp_link_only, 0, 0, 1)
@ -162,6 +177,17 @@ ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1) ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO() ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_nack_args, 0, 0, 2)
ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_oop_nack_args, 0, 0, 1)
ZEND_ARG_INFO(0, msg)
ZEND_ARG_ARRAY_INFO(0, headers, 1)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(stomp_set_read_timeout_args, 0, 0, 2) ZEND_BEGIN_ARG_INFO_EX(stomp_set_read_timeout_args, 0, 0, 2)
ZEND_ARG_INFO(0, link) ZEND_ARG_INFO(0, link)
ZEND_ARG_INFO(0, seconds) ZEND_ARG_INFO(0, seconds)
@ -196,6 +222,7 @@ zend_function_entry stomp_functions[] = {
PHP_FE(stomp_commit, stomp_transaction_args) PHP_FE(stomp_commit, stomp_transaction_args)
PHP_FE(stomp_abort, stomp_transaction_args) PHP_FE(stomp_abort, stomp_transaction_args)
PHP_FE(stomp_ack, stomp_ack_args) PHP_FE(stomp_ack, stomp_ack_args)
PHP_FE(stomp_nack, stomp_nack_args)
PHP_FE(stomp_error, stomp_link_only) PHP_FE(stomp_error, stomp_link_only)
PHP_FE(stomp_set_read_timeout, stomp_set_read_timeout_args) PHP_FE(stomp_set_read_timeout, stomp_set_read_timeout_args)
PHP_FE(stomp_get_read_timeout, stomp_link_only) PHP_FE(stomp_get_read_timeout, stomp_link_only)
@ -217,6 +244,7 @@ static zend_function_entry stomp_methods[] = {
PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args) PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args)
PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args) PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args)
PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args) PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args)
PHP_FALIAS(nack, stomp_nack, stomp_oop_nack_args)
PHP_FALIAS(error, stomp_error, stomp_no_args) PHP_FALIAS(error, stomp_error, stomp_no_args)
PHP_FALIAS(setReadTimeout, stomp_set_read_timeout, stomp_oop_set_read_timeout_args) PHP_FALIAS(setReadTimeout, stomp_set_read_timeout, stomp_oop_set_read_timeout_args)
PHP_FALIAS(getReadTimeout, stomp_get_read_timeout, stomp_no_args) PHP_FALIAS(getReadTimeout, stomp_get_read_timeout, stomp_no_args)
@ -231,6 +259,13 @@ static zend_function_entry stomp_frame_methods[] = {
}; };
/* }}} */ /* }}} */
/* {{{ stomp_exception_methods[] */
static zend_function_entry stomp_exception_methods[] = {
PHP_ME(stompexception, getDetails, stomp_no_args, ZEND_ACC_PUBLIC)
{NULL, NULL, NULL}
};
/* }}} */
/* {{{ stomp_module_entry */ /* {{{ stomp_module_entry */
zend_module_entry stomp_module_entry = { zend_module_entry stomp_module_entry = {
#if ZEND_MODULE_API_NO >= 20010901 #if ZEND_MODULE_API_NO >= 20010901
@ -256,6 +291,8 @@ zend_module_entry stomp_module_entry = {
PHP_INI_BEGIN() PHP_INI_BEGIN()
STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_username", "", PHP_INI_ALL, OnUpdateString, default_username, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_password", "", PHP_INI_ALL, OnUpdateString, default_password, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, read_timeout_sec, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_read_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, read_timeout_sec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_read_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, read_timeout_usec, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_read_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, read_timeout_usec, zend_stomp_globals, stomp_globals)
STD_PHP_INI_ENTRY("stomp.default_connection_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, connection_timeout_sec, zend_stomp_globals, stomp_globals) STD_PHP_INI_ENTRY("stomp.default_connection_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, connection_timeout_sec, zend_stomp_globals, stomp_globals)
@ -266,10 +303,15 @@ PHP_INI_END()
static PHP_GINIT_FUNCTION(stomp) static PHP_GINIT_FUNCTION(stomp)
{ {
stomp_globals->default_broker = NULL; stomp_globals->default_broker = NULL;
stomp_globals->default_username = NULL;
stomp_globals->default_password = NULL;
stomp_globals->read_timeout_sec = 2; stomp_globals->read_timeout_sec = 2;
stomp_globals->read_timeout_usec = 0; stomp_globals->read_timeout_usec = 0;
stomp_globals->connection_timeout_sec = 2; stomp_globals->connection_timeout_sec = 2;
stomp_globals->connection_timeout_usec = 0; stomp_globals->connection_timeout_usec = 0;
#if HAVE_STOMP_SSL
SSL_library_init();
#endif
} }
/* }}} */ /* }}} */
@ -307,17 +349,26 @@ static void stomp_object_free_storage(stomp_object_t *intern TSRMLS_DC)
} }
#if (PHP_MAJOR_VERSION == 5 && PHP_MINOR_VERSION >= 4) || (PHP_MAJOR_VERSION > 5)
#define PHP_STOMP_RUNTIME_CACHE
#endif
static zend_object_value php_stomp_new(zend_class_entry *ce TSRMLS_DC) static zend_object_value php_stomp_new(zend_class_entry *ce TSRMLS_DC)
{ {
zend_object_value retval; zend_object_value retval;
stomp_object_t *intern; stomp_object_t *intern;
#ifndef PHP_STOMP_RUNTIME_CACHE
zval *tmp; zval *tmp;
#endif
intern = (stomp_object_t *) ecalloc(1, sizeof(stomp_object_t)); intern = (stomp_object_t *) ecalloc(1, sizeof(stomp_object_t));
intern->stomp = NULL; intern->stomp = NULL;
zend_object_std_init(&intern->std, ce TSRMLS_CC); zend_object_std_init(&intern->std, ce TSRMLS_CC);
#ifdef PHP_STOMP_RUNTIME_CACHE
object_properties_init(&intern->std, ce);
#else
zend_hash_copy(intern->std.properties, &ce->default_properties, (copy_ctor_func_t) zval_add_ref, (void *) &tmp, sizeof(zval *)); zend_hash_copy(intern->std.properties, &ce->default_properties, (copy_ctor_func_t) zval_add_ref, (void *) &tmp, sizeof(zval *));
#endif
retval.handle = zend_objects_store_put(intern, (zend_objects_store_dtor_t)zend_objects_destroy_object, (zend_objects_free_object_storage_t) stomp_object_free_storage, NULL TSRMLS_CC); retval.handle = zend_objects_store_put(intern, (zend_objects_store_dtor_t)zend_objects_destroy_object, (zend_objects_free_object_storage_t) stomp_object_free_storage, NULL TSRMLS_CC);
retval.handlers = zend_get_std_object_handlers(); retval.handlers = zend_get_std_object_handlers();
@ -349,9 +400,12 @@ PHP_MINIT_FUNCTION(stomp)
zend_declare_property_null(stomp_ce_frame, "body", sizeof("body")-1, ZEND_ACC_PUBLIC TSRMLS_CC); zend_declare_property_null(stomp_ce_frame, "body", sizeof("body")-1, ZEND_ACC_PUBLIC TSRMLS_CC);
/* Register StompException class */ /* Register StompException class */
INIT_CLASS_ENTRY(ce, PHP_STOMP_EXCEPTION_CLASSNAME, NULL); INIT_CLASS_ENTRY(ce, PHP_STOMP_EXCEPTION_CLASSNAME, stomp_exception_methods);
stomp_ce_exception = zend_register_internal_class_ex(&ce, zend_exception_get_default(TSRMLS_C), NULL TSRMLS_CC); stomp_ce_exception = zend_register_internal_class_ex(&ce, zend_exception_get_default(TSRMLS_C), NULL TSRMLS_CC);
/* Properties */
zend_declare_property_null(stomp_ce_exception, "details", sizeof("details")-1, ZEND_ACC_PRIVATE TSRMLS_CC);
/** Register INI entries **/ /** Register INI entries **/
REGISTER_INI_ENTRIES(); REGISTER_INI_ENTRIES();
@ -397,6 +451,8 @@ PHP_MINFO_FUNCTION(stomp)
php_info_print_table_row(2, "API version", PHP_STOMP_VERSION); php_info_print_table_row(2, "API version", PHP_STOMP_VERSION);
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
php_info_print_table_row(2, "SSL Support", "enabled"); php_info_print_table_row(2, "SSL Support", "enabled");
#else
php_info_print_table_row(2, "SSL Support", "disabled");
#endif #endif
php_info_print_table_end(); php_info_print_table_end();
DISPLAY_INI_ENTRIES(); DISPLAY_INI_ENTRIES();
@ -411,25 +467,22 @@ PHP_FUNCTION(stomp_version)
} }
/* }}} */ /* }}} */
/* {{{ proto Stomp::__construct([string broker [, string username [, string password]]]) /* {{{ proto Stomp::__construct([string broker [, string username [, string password [, array headers]]]])
Connect to server */ Connect to server */
PHP_FUNCTION(stomp_connect) PHP_FUNCTION(stomp_connect)
{ {
zval *stomp_object = getThis(); zval *stomp_object = getThis();
zval *headers = NULL;
stomp_t *stomp = NULL; stomp_t *stomp = NULL;
char *broker = NULL, *username = NULL, *password = NULL; char *broker = NULL, *username = NULL, *password = NULL;
int broker_len = 0, username_len = 0, password_len = 0; int broker_len = 0, username_len = 0, password_len = 0;
struct timeval tv;
php_url *url_parts; php_url *url_parts;
#ifdef HAVE_STOMP_SSL #ifdef HAVE_STOMP_SSL
int use_ssl = 0; int use_ssl = 0;
#endif #endif
tv.tv_sec = 2; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sssa!", &broker, &broker_len, &username, &username_len, &password, &password_len, &headers) == FAILURE) {
tv.tv_usec = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sss", &broker, &broker_len, &username, &username_len, &password, &password_len) == FAILURE) {
return; return;
} }
@ -478,28 +531,52 @@ PHP_FUNCTION(stomp_connect)
if (stomp->status) { if (stomp->status) {
stomp_frame_t *res; stomp_frame_t *res;
int rres;
stomp_frame_t frame = {0}; stomp_frame_t frame = {0};
INIT_FRAME(frame, "CONNECT"); INIT_FRAME(frame, "CONNECT");
if (username_len == 0) { if (!username) {
username = ""; username = STOMP_G(default_username);
username_len = strlen(username);
} }
if (password_len == 0) { if (!password) {
password = ""; password = STOMP_G(default_password);
password_len = strlen(password);
} }
zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL); zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL);
zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL); zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL);
stomp_send(stomp, &frame TSRMLS_CC); if (NULL != headers) {
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
}
rres = stomp_send(stomp, &frame TSRMLS_CC);
CLEAR_FRAME(frame); CLEAR_FRAME(frame);
if (0 == rres) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, stomp->errnum TSRMLS_CC, stomp->error);
if (stomp->error_details) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, stomp->error_details TSRMLS_CC);
}
return;
}
/* Retreive Response */ /* Retreive Response */
res = stomp_read_frame(stomp); res = stomp_read_frame(stomp);
if (NULL == res) { if (NULL == res) {
STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING); STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING);
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg);
if (res->body) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
}
stomp_free_frame(res);
}
} else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) { } else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) {
if (stomp->error) { if (stomp->error) {
STOMP_ERROR(stomp->errnum, stomp->error); STOMP_ERROR_DETAILS(stomp->errnum, stomp->error, stomp->error_details);
} else { } else {
STOMP_ERROR(0, PHP_STOMP_ERR_UNKNOWN); STOMP_ERROR(0, PHP_STOMP_ERR_UNKNOWN);
} }
@ -528,7 +605,7 @@ PHP_FUNCTION(stomp_connect)
} }
} }
} else { } else {
STOMP_ERROR(0, stomp->error); STOMP_ERROR_DETAILS(0, stomp->error, stomp->error_details);
} }
stomp_close(stomp); stomp_close(stomp);
@ -648,7 +725,7 @@ PHP_FUNCTION(stomp_send)
if (Z_TYPE_P(msg) == IS_STRING) { if (Z_TYPE_P(msg) == IS_STRING) {
frame.body = Z_STRVAL_P(msg); frame.body = Z_STRVAL_P(msg);
frame.body_length = -1; frame.body_length = Z_STRLEN_P(msg);
} else if (Z_TYPE_P(msg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(msg), stomp_ce_frame TSRMLS_CC)) { } else if (Z_TYPE_P(msg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(msg), stomp_ce_frame TSRMLS_CC)) {
zval *frame_obj_prop = NULL; zval *frame_obj_prop = NULL;
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "command", sizeof("command")-1, 1 TSRMLS_CC); frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "command", sizeof("command")-1, 1 TSRMLS_CC);
@ -659,7 +736,7 @@ PHP_FUNCTION(stomp_send)
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "body", sizeof("body")-1, 1 TSRMLS_CC); frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "body", sizeof("body")-1, 1 TSRMLS_CC);
if (Z_TYPE_P(frame_obj_prop) == IS_STRING) { if (Z_TYPE_P(frame_obj_prop) == IS_STRING) {
frame.body = Z_STRVAL_P(frame_obj_prop); frame.body = Z_STRVAL_P(frame_obj_prop);
frame.body_length = -1; frame.body_length = Z_STRLEN_P(frame_obj_prop);
} }
frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "headers", sizeof("headers")-1, 1 TSRMLS_CC); frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "headers", sizeof("headers")-1, 1 TSRMLS_CC);
if (Z_TYPE_P(frame_obj_prop) == IS_ARRAY) { if (Z_TYPE_P(frame_obj_prop) == IS_ARRAY) {
@ -670,6 +747,8 @@ PHP_FUNCTION(stomp_send)
CLEAR_FRAME(frame); CLEAR_FRAME(frame);
RETURN_FALSE; RETURN_FALSE;
} }
if (frame.body_length > 0 && strnlen(frame.body, frame.body_length) >= frame.body_length)
frame.body_length = 0;
if (stomp_send(stomp, &frame TSRMLS_CC) > 0) { if (stomp_send(stomp, &frame TSRMLS_CC) > 0) {
success = stomp_valid_receipt(stomp, &frame); success = stomp_valid_receipt(stomp, &frame);
@ -812,7 +891,6 @@ PHP_FUNCTION(stomp_read_frame)
zval *stomp_object = getThis(); zval *stomp_object = getThis();
stomp_t *stomp = NULL; stomp_t *stomp = NULL;
stomp_frame_t *res = NULL; stomp_frame_t *res = NULL;
int sel_res = 0;
char *class_name = NULL; char *class_name = NULL;
int class_name_len = 0; int class_name_len = 0;
zend_class_entry *ce = NULL; zend_class_entry *ce = NULL;
@ -842,13 +920,17 @@ PHP_FUNCTION(stomp_read_frame)
} }
if ((sel_res = stomp_select(stomp)) > 0 && (res = stomp_read_frame(stomp))) { if ((res = stomp_read_frame(stomp))) {
zval *headers = NULL; zval *headers = NULL;
if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) { if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL; char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) { if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
STOMP_ERROR(0, error_msg) zval *excobj = zend_throw_exception_ex(stomp_ce_exception, 0 TSRMLS_CC, error_msg);
if (res->body) {
zend_update_property_string(stomp_ce_exception, excobj, "details", sizeof("details")-1, (char *) res->body TSRMLS_CC);
}
stomp_free_frame(res); stomp_free_frame(res);
RETURN_FALSE; RETURN_FALSE;
} }
@ -952,16 +1034,13 @@ PHP_FUNCTION(stomp_read_frame)
array_init(return_value); array_init(return_value);
add_assoc_string_ex(return_value, "command", sizeof("command"), res->command, 1); add_assoc_string_ex(return_value, "command", sizeof("command"), res->command, 1);
if (res->body) { if (res->body) {
add_assoc_string_ex(return_value, "body", sizeof("body"), res->body, 1); add_assoc_stringl_ex(return_value, "body", sizeof("body"), res->body, res->body_length, 1);
} }
add_assoc_zval_ex(return_value, "headers", sizeof("headers"), headers); add_assoc_zval_ex(return_value, "headers", sizeof("headers"), headers);
} }
stomp_free_frame(res); stomp_free_frame(res);
} else { } else {
if (sel_res == -1) {
STOMP_ERROR(0, "Error while selecting from socket: %d", errno);
}
RETURN_FALSE; RETURN_FALSE;
} }
} }
@ -1035,10 +1114,9 @@ PHP_FUNCTION(stomp_abort)
} }
/* }}} */ /* }}} */
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers]) /* {{{ _php_stomp_acknowledgment
Acknowledge consumption of a message from a subscription using client acknowledgment */ */
PHP_FUNCTION(stomp_ack) static void _php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAMETERS, char *cmd) {
{
zval *stomp_object = getThis(); zval *stomp_object = getThis();
zval *msg = NULL, *headers = NULL; zval *msg = NULL, *headers = NULL;
stomp_t *stomp = NULL; stomp_t *stomp = NULL;
@ -1059,7 +1137,7 @@ PHP_FUNCTION(stomp_ack)
ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp);
} }
INIT_FRAME(frame, "ACK"); INIT_FRAME(frame, cmd);
if (NULL != headers) { if (NULL != headers) {
FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers)); FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(headers));
@ -1087,6 +1165,22 @@ PHP_FUNCTION(stomp_ack)
} }
/* }}} */ /* }}} */
/* {{{ proto boolean Stomp::ack(mixed msg [, array headers])
Acknowledge consumption of a message from a subscription using client acknowledgment */
PHP_FUNCTION(stomp_ack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "ACK");
}
/* }}} */
/* {{{ proto boolean Stomp::nack(mixed msg [, array headers])
Negative Acknowledgment of a message from a subscription */
PHP_FUNCTION(stomp_nack)
{
_php_stomp_acknowledgment(INTERNAL_FUNCTION_PARAM_PASSTHRU, "NACK");
}
/* }}} */
/* {{{ proto string Stomp::error() /* {{{ proto string Stomp::error()
Get the last error message */ Get the last error message */
PHP_FUNCTION(stomp_error) PHP_FUNCTION(stomp_error)
@ -1105,7 +1199,15 @@ PHP_FUNCTION(stomp_error)
} }
if (stomp->error) { if (stomp->error) {
if (stomp->error_details) {
char *error_msg = (char *) emalloc(strlen(stomp->error) + strlen(stomp->error_details) + 10);
strcpy(error_msg, stomp->error);
strcat(error_msg, "\n\n");
strcat(error_msg, stomp->error_details);
RETURN_STRING(error_msg, 0);
} else {
RETURN_STRING(stomp->error, 1); RETURN_STRING(stomp->error, 1);
}
} else { } else {
RETURN_FALSE; RETURN_FALSE;
} }
@ -1185,3 +1287,17 @@ PHP_METHOD(stompframe, __construct)
} }
} }
/* }}} */ /* }}} */
/* {{{ proto string StompException::getDetails()
Get error details */
PHP_METHOD(stompexception, getDetails)
{
zval *object = getThis();
zval *details = NULL;
details = zend_read_property(stomp_ce_exception, object, "details", sizeof("details")-1, 1 TSRMLS_CC);
RETURN_STRINGL(Z_STRVAL_P(details), Z_STRLEN_P(details), 1);
}
/* }}} */

View File

@ -29,11 +29,7 @@ typedef struct _stomp_object {
} stomp_object_t; } stomp_object_t;
#define PHP_STOMP_EXTNAME "Stomp" #define PHP_STOMP_EXTNAME "Stomp"
#define PHP_STOMP_MAJOR_VERSION "1" #define PHP_STOMP_VERSION "1.0.9-dev"
#define PHP_STOMP_MINOR_VERSION "0"
#define PHP_STOMP_PATCH_VERSION "0"
#define PHP_STOMP_VERSION_STATUS ""
#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION PHP_STOMP_VERSION_STATUS
#define PHP_STOMP_RES_NAME "stomp connection" #define PHP_STOMP_RES_NAME "stomp connection"
@ -82,12 +78,15 @@ PHP_FUNCTION(stomp_begin);
PHP_FUNCTION(stomp_commit); PHP_FUNCTION(stomp_commit);
PHP_FUNCTION(stomp_abort); PHP_FUNCTION(stomp_abort);
PHP_FUNCTION(stomp_ack); PHP_FUNCTION(stomp_ack);
PHP_FUNCTION(stomp_nack);
PHP_FUNCTION(stomp_error); PHP_FUNCTION(stomp_error);
PHP_FUNCTION(stomp_set_read_timeout); PHP_FUNCTION(stomp_set_read_timeout);
PHP_FUNCTION(stomp_get_read_timeout); PHP_FUNCTION(stomp_get_read_timeout);
PHP_METHOD(stompframe, __construct); PHP_METHOD(stompframe, __construct);
PHP_METHOD(stompexception, getDetails);
ZEND_BEGIN_MODULE_GLOBALS(stomp) ZEND_BEGIN_MODULE_GLOBALS(stomp)
/* INI */ /* INI */
char *default_broker; char *default_broker;
@ -95,6 +94,8 @@ ZEND_BEGIN_MODULE_GLOBALS(stomp)
long read_timeout_usec; long read_timeout_usec;
long connection_timeout_sec; long connection_timeout_sec;
long connection_timeout_usec; long connection_timeout_usec;
char *default_username;
char *default_password;
/* Others */ /* Others */
long error_no; long error_no;

489
stomp.c
View File

@ -23,16 +23,47 @@
#endif #endif
#include "php.h" #include "php.h"
#include "zend_exceptions.h"
#include "ext/standard/php_smart_str.h" #include "ext/standard/php_smart_str.h"
#include "stomp.h" #include "stomp.h"
#include "php_stomp.h" #include "php_stomp.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/tcp.h>
#endif
#define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; } #define RETURN_READ_FRAME_FAIL { stomp_free_frame(f); return NULL; }
ZEND_EXTERN_MODULE_GLOBALS(stomp); ZEND_EXTERN_MODULE_GLOBALS(stomp);
extern zend_class_entry *stomp_ce_exception; extern zend_class_entry *stomp_ce_exception;
/* {{{ DEBUG */
#if PHP_DEBUG
static void print_stomp_frame(stomp_frame_t *frame TSRMLS_DC) {
php_printf("------ START FRAME ------\n");
php_printf("%s\n", frame->command);
/* Headers */
if (frame->headers) {
char *key;
ulong pos;
zend_hash_internal_pointer_reset(frame->headers);
while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
char *value = NULL;
php_printf("%s:", key);
if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
php_printf("%s", value);
}
php_printf("\n");
zend_hash_move_forward(frame->headers);
}
}
php_printf("\n%s\n", frame->body);
php_printf("------ END FRAME ------\n");
}
#endif
/* }}} */
/* {{{ stomp_init /* {{{ stomp_init
*/ */
stomp_t *stomp_init() stomp_t *stomp_init()
@ -46,33 +77,120 @@ stomp_t *stomp_init()
stomp->port = 0; stomp->port = 0;
stomp->status = 0; stomp->status = 0;
stomp->error = NULL; stomp->error = NULL;
stomp->error_details = NULL;
stomp->errnum = 0; stomp->errnum = 0;
stomp->session = NULL; stomp->session = NULL;
stomp->options.connect_timeout_sec = 2; stomp->options.connect_timeout_sec = 2;
stomp->options.connect_timeout_usec = 0; stomp->options.connect_timeout_usec = 0;
stomp->options.read_timeout_sec = 2; stomp->options.read_timeout_sec = 2;
stomp->options.read_timeout_usec = 2; stomp->options.read_timeout_usec = 0;
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
stomp->options.use_ssl = 0; stomp->options.use_ssl = 0;
stomp->ssl_handle = NULL; stomp->ssl_handle = NULL;
#endif #endif
stomp->frame_stack = NULL;
stomp->read_buffer.size = 0;
return stomp; return stomp;
} }
/* }}} */ /* }}} */
/* {{{ stomp_frame_stack_push
*/
static void stomp_frame_stack_push(stomp_frame_stack_t **stack, stomp_frame_t *frame)
{
stomp_frame_stack_t *cell = (stomp_frame_stack_t *) emalloc(sizeof(stomp_frame_stack_t));
cell->frame = frame;
cell->next = NULL;
if (!*stack) {
*stack = cell;
} else {
stomp_frame_stack_t *cursor = *stack;
while (cursor->next != NULL) cursor = cursor->next;
cursor->next = cell;
}
}
/* }}} */
/* {{{ stomp_frame_stack_shift
*/
static stomp_frame_t *stomp_frame_stack_shift(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
if (*stack) {
stomp_frame_stack_t *cell = *stack;
*stack = cell->next;
frame = cell->frame;
efree(cell);
}
return frame;
}
/* }}} */
/* {{{ stomp_frame_stack_clear
*/
static void stomp_frame_stack_clear(stomp_frame_stack_t **stack) {
stomp_frame_t *frame = NULL;
while ((frame = stomp_frame_stack_shift(stack))) efree(frame);
}
/* }}} */
/* {{{ stomp_set_error /* {{{ stomp_set_error
*/ */
void stomp_set_error(stomp_t *stomp, const char *error, int errnum) void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...)
{ {
if (error != NULL) { va_list ap;
int len;
if (stomp->error != NULL) { if (stomp->error != NULL) {
efree(stomp->error); efree(stomp->error);
stomp->error = NULL;
}
if (stomp->error_details != NULL) {
efree(stomp->error_details);
stomp->error_details = NULL;
} }
stomp->error = estrdup(error);
stomp->errnum = errnum; stomp->errnum = errnum;
if (error != NULL) {
stomp->error = estrdup(error);
} }
if (fmt != NULL) {
stomp->error_details = emalloc(STOMP_BUFSIZE);
if (stomp->error_details == NULL) {
return; /* Nothing else can be done */
}
va_start(ap, fmt);
/*
* Would've been better to call vasprintf(), but that
* function is missing on some platforms...
*/
len = vsnprintf(stomp->error_details, STOMP_BUFSIZE, fmt, ap);
va_end(ap);
if (len < STOMP_BUFSIZE) {
stomp->error_details = erealloc(stomp->error_details, len+1);
}
}
}
/* }}} */
/* {{{ stomp_writable
*/
int stomp_writable(stomp_t *stomp)
{
int n;
n = php_pollfd_for_ms(stomp->fd, POLLOUT, 1000);
if (n != POLLOUT) {
#ifndef PHP_WIN32
if (n == 0) {
errno = ETIMEDOUT;
}
#endif
return 0;
}
return 1;
} }
/* }}} */ /* }}} */
@ -83,7 +201,7 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
char error[1024]; char error[1024];
socklen_t size; socklen_t size;
struct timeval tv; struct timeval tv;
fd_set rfds; int flag = 1;
if (stomp->host != NULL) if (stomp->host != NULL)
{ {
@ -101,30 +219,30 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC); stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC);
if (stomp->fd == -1) { if (stomp->fd == -1) {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port); snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno); stomp_set_error(stomp, error, errno, "%s", strerror(errno));
return 0; return 0;
} }
#ifdef HAVE_NETINET_IN_H
setsockopt(stomp->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
#endif
size = sizeof(stomp->localaddr); size = sizeof(stomp->localaddr);
memset(&stomp->localaddr, 0, size); memset(&stomp->localaddr, 0, size);
if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) { if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno); snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
stomp_set_error(stomp, error, errno); stomp_set_error(stomp, error, errno, NULL);
return 0; return 0;
} }
tv.tv_sec = 0; if (stomp_writable(stomp)) {
tv.tv_usec = 0;
FD_ZERO(&rfds);
FD_SET(stomp->fd, &rfds);
if (select(stomp->fd + 1, NULL, &rfds, NULL, &tv) > 0) {
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
if (stomp->options.use_ssl) { if (stomp->options.use_ssl) {
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method()); SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
int ret;
if (NULL == ctx) { if (NULL == ctx) {
stomp_set_error(stomp, "failed to create the SSL context", 0); stomp_set_error(stomp, "failed to create the SSL context", 0, NULL);
return 0; return 0;
} }
@ -132,15 +250,15 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
stomp->ssl_handle = SSL_new(ctx); stomp->ssl_handle = SSL_new(ctx);
if (stomp->ssl_handle == NULL) { if (stomp->ssl_handle == NULL) {
stomp_set_error(stomp, "failed to create the SSL handle", 0); stomp_set_error(stomp, "failed to create the SSL handle", 0, NULL);
SSL_CTX_free(ctx); SSL_CTX_free(ctx);
return 0; return 0;
} }
SSL_set_fd(stomp->ssl_handle, stomp->fd); SSL_set_fd(stomp->ssl_handle, stomp->fd);
if (SSL_connect(stomp->ssl_handle) <= 0) { if ((ret = SSL_connect(stomp->ssl_handle)) <= 0) {
stomp_set_error(stomp, "SSL/TLS handshake failed", 0); stomp_set_error(stomp, "SSL/TLS handshake failed", 0, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
SSL_shutdown(stomp->ssl_handle); SSL_shutdown(stomp->ssl_handle);
return 0; return 0;
} }
@ -149,7 +267,7 @@ int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_D
return 1; return 1;
} else { } else {
snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port); snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
stomp_set_error(stomp, error, errno); stomp_set_error(stomp, error, errno, "%s", strerror(errno));
return 0; return 0;
} }
} }
@ -180,7 +298,10 @@ void stomp_close(stomp_t *stomp)
if (stomp->error) { if (stomp->error) {
efree(stomp->error); efree(stomp->error);
} }
if (stomp->error_details) {
efree(stomp->error_details);
}
stomp_frame_stack_clear(&stomp->frame_stack);
efree(stomp); efree(stomp);
} }
/* }}} */ /* }}} */
@ -219,7 +340,7 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
} }
if (frame->body_length > 0) { if (frame->body_length > 0) {
smart_str_appends(&buf, "content-length: "); smart_str_appendl(&buf, "content-length:", sizeof("content-length:") - 1);
smart_str_append_long(&buf, frame->body_length); smart_str_append_long(&buf, frame->body_length);
smart_str_appendc(&buf, '\n'); smart_str_appendc(&buf, '\n');
} }
@ -227,23 +348,30 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
smart_str_appendc(&buf, '\n'); smart_str_appendc(&buf, '\n');
if (frame->body > 0) { if (frame->body > 0) {
smart_str_appends(&buf, frame->body); smart_str_appendl(&buf, frame->body, frame->body_length > 0 ? frame->body_length : strlen(frame->body));
}
smart_str_appendl(&buf, "\0", sizeof("\0")-1);
if (!stomp_writable(stomp)) {
smart_str_free(&buf);
stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
return 0;
} }
#ifdef HAVE_STOMP_SSL #ifdef HAVE_STOMP_SSL
if (stomp->options.use_ssl) { if (stomp->options.use_ssl) {
if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) { int ret;
char error[1024]; if (-1 == (ret = SSL_write(stomp->ssl_handle, buf.c, buf.len))) {
snprintf(error, sizeof(error), "Unable to send data"); smart_str_free(&buf);
stomp_set_error(stomp, error, errno); stomp_set_error(stomp, "Unable to send data", errno, "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
return 0; return 0;
} }
} else { } else {
#endif #endif
if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) { if (-1 == send(stomp->fd, buf.c, buf.len, 0)) {
char error[1024]; smart_str_free(&buf);
snprintf(error, sizeof(error), "Unable to send data"); stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
stomp_set_error(stomp, error, errno);
return 0; return 0;
} }
#ifdef HAVE_STOMP_SSL #ifdef HAVE_STOMP_SSL
@ -258,9 +386,12 @@ int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
/* {{{ stomp_recv /* {{{ stomp_recv
*/ */
int stomp_recv(stomp_t *stomp, char *msg, size_t length) static int _stomp_recv(stomp_t *stomp, char *msg, const size_t length)
{ {
int len; int len;
stomp_select(stomp);
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
if(stomp->options.use_ssl) { if(stomp->options.use_ssl) {
len = SSL_read(stomp->ssl_handle, msg, length); len = SSL_read(stomp->ssl_handle, msg, length);
@ -271,66 +402,142 @@ int stomp_recv(stomp_t *stomp, char *msg, size_t length)
} }
#endif #endif
if (len == 0) { if (len == -1) {
TSRMLS_FETCH(); #if HAVE_STOMP_SSL
zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, "Unexpected EOF while reading from socket"); if (stomp->options.use_ssl) {
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL in use)", strerror(errno));
} else {
#endif
stomp_set_error(stomp, "Error reading from socket", errno, "%s. (SSL not in use)", strerror(errno));
#if HAVE_STOMP_SSL
}
#endif
stomp->status = -1;
} else if (len == 0) {
stomp_set_error(stomp, "Sender closed connection unexpectedly", 0, NULL);
stomp->status = -1; stomp->status = -1;
} }
return len; return len;
} }
int stomp_recv(stomp_t *stomp, char *msg, const size_t length)
{
if (stomp->read_buffer.size == 0) {
if (length >= STOMP_BUFSIZE) {
return _stomp_recv(stomp, msg, length);
} else {
size_t recv_size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
if (recv_size <= length) {
memcpy(msg, stomp->read_buffer.buf, recv_size);
return recv_size;
} else {
memcpy(msg, stomp->read_buffer.buf, length);
stomp->read_buffer.pos = stomp->read_buffer.buf + length;
stomp->read_buffer.size = recv_size - length;
return length;
}
}
} else if (stomp->read_buffer.size >= length) {
memcpy(msg, stomp->read_buffer.pos, length);
stomp->read_buffer.pos += length;
stomp->read_buffer.size -= length;
return length;
} else {
int len = stomp->read_buffer.size;
memcpy(msg, stomp->read_buffer.pos, stomp->read_buffer.size);
stomp->read_buffer.size = 0;
if (stomp_select_ex(stomp, 0, 0)) {
return len + stomp_recv(stomp, msg + len, length - len);
} else {
return len;
}
}
}
/* }}} */
/* {{{ _stomp_read_until
*/
static size_t _stomp_read_until(stomp_t *stomp, char **data, const char delimiter)
{
size_t length = 0;
size_t bufsize = STOMP_BUFSIZE;
char *buffer = (char *) emalloc(STOMP_BUFSIZE);
while (1) {
unsigned int i, found;
char *c;
found = 0;
// First populate the buffer
if (stomp->read_buffer.size == 0) {
stomp->read_buffer.size = _stomp_recv(stomp, stomp->read_buffer.buf, STOMP_BUFSIZE);
stomp->read_buffer.pos = stomp->read_buffer.buf;
}
// Then search the delimiter
c = stomp->read_buffer.pos;
for (i = 1; i <= stomp->read_buffer.size ; i++) {
if (*c == delimiter) {
found = 1;
break;
} else {
c++;
}
}
if (!found) i--;
// Make sure we have enough place in the buffer
if ((i+length) >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
// Copy and update the buffer
memcpy(buffer + length, stomp->read_buffer.pos, i);
length += i;
stomp->read_buffer.pos += i;
stomp->read_buffer.size -= i;
if (found) {
break;
}
}
if (length) {
*data = buffer;
} else {
efree(buffer);
*data = NULL;
}
return length;
}
/* }}} */ /* }}} */
/* {{{ stomp_read_buffer /* {{{ stomp_read_buffer
*/ */
static int stomp_read_buffer(stomp_t *stomp, char **data) static size_t stomp_read_buffer(stomp_t *stomp, char **data)
{ {
int rc = 0; size_t length = _stomp_read_until(stomp, data, 0);
size_t i = 0; if (stomp_select_ex(stomp, 0, 0)) {
size_t bufsize = STOMP_BUFSIZE + 1;
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
while (1) {
size_t length = 1;
rc = stomp_recv(stomp, buffer + i, length);
if (rc < 1) {
efree(buffer);
return -1;
}
if (1 == length) {
i++;
if (buffer[i-1] == 0) {
char endline[1]; char endline[1];
if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) { if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
efree(buffer); if (*data) {
efree(*data);
*data = NULL;
}
return 0; return 0;
} }
break;
} }
if (length > 1) {
if (i >= bufsize) { length --;
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE); } else if (length) {
bufsize += STOMP_BUFSIZE; efree(*data);
*data = NULL;
length = 0;
} }
return length;
}
}
if (i > 1) {
*data = (char *) emalloc(i);
if (NULL == *data) {
efree(buffer);
return -1;
}
memcpy(*data, buffer, i);
}
efree(buffer);
return i-1;
} }
/* }}} */ /* }}} */
@ -338,52 +545,16 @@ static int stomp_read_buffer(stomp_t *stomp, char **data)
*/ */
static int stomp_read_line(stomp_t *stomp, char **data) static int stomp_read_line(stomp_t *stomp, char **data)
{ {
int rc = 0; size_t length = _stomp_read_until(stomp, data, '\n');
size_t i = 0; if (length > 1) {
size_t bufsize = STOMP_BUFSIZE + 1; (*data)[length - 1] = 0;
char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1); length--;
} else if (length) {
while (1) { efree(*data);
*data = NULL;
size_t length = 1; length = 0;
rc = stomp_recv(stomp, buffer + i, length);
if (rc < 1) {
efree(buffer);
return -1;
} }
return length;
if (1 == length) {
i++;
if (buffer[i-1] == '\n') {
buffer[i-1] = 0;
break;
} else if (buffer[i-1] == 0) {
efree(buffer);
return 0;
}
if (i >= bufsize) {
buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
bufsize += STOMP_BUFSIZE;
}
}
}
if (i > 1) {
*data = (char *) emalloc(i);
if (NULL == *data) {
efree(buffer);
return -1;
}
memcpy(*data, buffer, i);
}
efree(buffer);
return i-1;
} }
/* }}} */ /* }}} */
@ -409,12 +580,20 @@ void stomp_free_frame(stomp_frame_t *frame)
/* {{{ stomp_read_frame /* {{{ stomp_read_frame
*/ */
stomp_frame_t *stomp_read_frame(stomp_t *stomp) stomp_frame_t *stomp_read_frame_ex(stomp_t *stomp, int use_stack)
{ {
stomp_frame_t *f = NULL; stomp_frame_t *f = NULL;
char *cmd = NULL, *length_str = NULL; char *cmd = NULL, *length_str = NULL;
int length = 0; int length = 0;
if (use_stack && stomp->frame_stack) {
return stomp_frame_stack_shift(&stomp->frame_stack);
}
if (!stomp_select(stomp)) {
return NULL;
}
INIT_STOMP_FRAME(f); INIT_STOMP_FRAME(f);
if (NULL == f) { if (NULL == f) {
@ -467,20 +646,27 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
} }
/* Check for the content length */ /* Check for the content length */
if (zend_hash_find(f->headers, "content-length", strlen("content-length"), (void **)&length_str) == SUCCESS) { if (zend_hash_find(f->headers, "content-length", sizeof("content-length"), (void **)&length_str) == SUCCESS) {
int recv_size = 0;
char endbuffer[2]; char endbuffer[2];
length = 2;
f->body_length = atoi(length_str); f->body_length = atoi(length_str);
f->body = (char *) emalloc(f->body_length); f->body = (char *) emalloc(f->body_length);
if (-1 == stomp_recv(stomp, f->body, f->body_length)) { while (recv_size != f->body_length) {
int l = stomp_recv(stomp, f->body + recv_size, f->body_length - recv_size);
if (-1 == l) {
RETURN_READ_FRAME_FAIL;
} else {
recv_size += l;
}
}
length = stomp_recv(stomp, endbuffer, 2);
if (endbuffer[0] != '\0' || ((2 == length) && (endbuffer[1] != '\n'))) {
RETURN_READ_FRAME_FAIL; RETURN_READ_FRAME_FAIL;
} }
if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
RETURN_READ_FRAME_FAIL;
}
} else { } else {
f->body_length = stomp_read_buffer(stomp, &f->body); f->body_length = stomp_read_buffer(stomp, &f->body);
} }
@ -494,9 +680,11 @@ stomp_frame_t *stomp_read_frame(stomp_t *stomp)
int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) { int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
int success = 1; int success = 1;
char *receipt = NULL; char *receipt = NULL;
if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) { if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
stomp_frame_t *res = stomp_read_frame(stomp);
success = 0; success = 0;
while (1) {
stomp_frame_t *res = stomp_read_frame_ex(stomp, 0);
if (res) { if (res) {
if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) { if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
char *receipt_id = NULL; char *receipt_id = NULL;
@ -504,14 +692,24 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
&& strlen(receipt) == strlen(receipt_id) && strlen(receipt) == strlen(receipt_id)
&& !strcmp(receipt, receipt_id)) { && !strcmp(receipt, receipt_id)) {
success = 1; success = 1;
} else {
stomp_set_error(stomp, "Invalid receipt", 0, "%s", receipt_id);
} }
stomp_free_frame(res);
return success;
} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) { } else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
char *error_msg = NULL; char *error_msg = NULL;
if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) { if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
stomp_set_error(stomp, error_msg, 0); stomp_set_error(stomp, error_msg, 0, "%s", res->body);
}
} }
stomp_free_frame(res); stomp_free_frame(res);
return success;
} else {
stomp_frame_stack_push(&stomp->frame_stack, res);
}
} else {
return success;
}
} }
} }
return success; return success;
@ -520,17 +718,26 @@ int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
/* {{{ stomp_select /* {{{ stomp_select
*/ */
int stomp_select(stomp_t *stomp) int stomp_select_ex(stomp_t *stomp, const long int sec, const long int usec)
{ {
int n;
struct timeval tv; struct timeval tv;
fd_set rfds;
tv.tv_sec = stomp->options.read_timeout_sec; if (stomp->read_buffer.size || stomp->frame_stack) {
tv.tv_usec = stomp->options.read_timeout_usec; return 1;
}
tv.tv_sec = sec;
tv.tv_usec = usec;
FD_ZERO(&rfds); n = php_pollfd_for(stomp->fd, PHP_POLLREADABLE, &tv);
FD_SET(stomp->fd, &rfds); if (n < 1) {
#if !defined(PHP_WIN32) && !(defined(NETWARE) && defined(USE_WINSOCK))
return select(stomp->fd + 1, &rfds, NULL, NULL, &tv); if (n == 0) {
errno = ETIMEDOUT;
}
#endif
return 0;
}
return 1;
} }
/* }}} */ /* }}} */

38
stomp.h Executable file → Normal file
View File

@ -45,6 +45,19 @@ typedef struct _stomp_options {
#endif #endif
} stomp_options_t; } stomp_options_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
typedef struct _stomp_frame_stack {
stomp_frame_t *frame;
struct _stomp_frame_stack *next;
} stomp_frame_stack_t;
typedef struct _stomp { typedef struct _stomp {
php_socket_t fd; php_socket_t fd;
php_sockaddr_storage localaddr; php_sockaddr_storage localaddr;
@ -54,30 +67,31 @@ typedef struct _stomp {
int status; int status;
char *error; char *error;
int errnum; int errnum;
char *error_details;
char *session; char *session;
#if HAVE_STOMP_SSL #if HAVE_STOMP_SSL
SSL *ssl_handle; SSL *ssl_handle;
#endif #endif
stomp_frame_stack_t *frame_stack;
struct {
size_t size;
char buf[STOMP_BUFSIZE];
char *pos;
} read_buffer;
} stomp_t; } stomp_t;
typedef struct _stomp_frame {
char *command;
int command_length;
HashTable *headers;
char *body;
int body_length;
} stomp_frame_t;
stomp_t *stomp_init(); stomp_t *stomp_init();
int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC); int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC);
void stomp_close(stomp_t *stomp); void stomp_close(stomp_t *stomp);
int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC); int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC);
stomp_frame_t *stomp_read_frame(stomp_t *connection); stomp_frame_t *stomp_read_frame_ex(stomp_t *connection, int use_stack);
int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame); int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
int stomp_select(stomp_t *connection); int stomp_select_ex(stomp_t *connection, const long int sec, const long int usec);
void stomp_set_error(stomp_t *stomp, const char *error, int errnum); void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...) ZEND_ATTRIBUTE_PTR_FORMAT(printf, 4, 0);
void stomp_free_frame(stomp_frame_t *frame); void stomp_free_frame(stomp_frame_t *frame);
#define stomp_select(s) stomp_select_ex(s, s->options.read_timeout_sec, s->options.read_timeout_sec)
#define stomp_read_frame(c) stomp_read_frame_ex(c, 1)
#endif /* _STOMP_H_ */ #endif /* _STOMP_H_ */
/* /*

View File

@ -7,4 +7,4 @@ Test stomp_version()
echo stomp_version(); echo stomp_version();
?> ?>
--EXPECTF-- --EXPECTF--
%d.%d.%d %d.%d.%s

View File

@ -0,0 +1,14 @@
--TEST--
Test stomp_connect() - Test error on CONNECT
--SKIPIF--
<?php if (!extension_loaded("stomp")) print "skip"; ?>
--FILE--
<?php
try {
$stomp = new Stomp('tcp://localhost', 'anotpresentusername1234');
} catch (Exception $e) {
var_dump(get_class($e));
}
?>
--EXPECTF--
string(14) "StompException"

View File

@ -14,7 +14,6 @@ stomp_send($link, '/queue/test-06', array());
var_dump(stomp_send($link, '/queue/test-06', '')); var_dump(stomp_send($link, '/queue/test-06', ''));
var_dump(stomp_send($link, '/queue/test-06', 'A realMessage')); var_dump(stomp_send($link, '/queue/test-06', 'A realMessage'));
var_dump(stomp_send($link, '/queue/test-06', 'بياريك شارون')); var_dump(stomp_send($link, '/queue/test-06', 'بياريك شارون'));
var_dump(stomp_send($link, 'بياريك شارون', 'بياريك شارون', array('receipt' => 'message-123')), stomp_error($link));
?> ?>
--EXPECTF-- --EXPECTF--
@ -24,5 +23,3 @@ Warning: stomp_send(): Expects parameter %d to be a string or a StompFrame objec
bool(true) bool(true)
bool(true) bool(true)
bool(true) bool(true)
bool(false)
string(%d) "%s"

View File

@ -14,7 +14,6 @@ $s->send('/queue/test-06', array());
var_dump($s->send('/queue/test-06', '')); var_dump($s->send('/queue/test-06', ''));
var_dump($s->send('/queue/test-06', 'A realMessage')); var_dump($s->send('/queue/test-06', 'A realMessage'));
var_dump($s->send('/queue/test-06', 'بياريك شارون')); var_dump($s->send('/queue/test-06', 'بياريك شارون'));
var_dump($s->send('بياريك شارون', 'بياريك شارون', array('receipt' => 'message-123')), $s->error());
?> ?>
--EXPECTF-- --EXPECTF--
@ -24,5 +23,3 @@ Warning: Stomp::send(): Expects parameter %d to be a string or a StompFrame obje
bool(true) bool(true)
bool(true) bool(true)
bool(true) bool(true)
bool(false)
string(%d) "%s"

14
tests/006-send/003.phpt Normal file
View File

@ -0,0 +1,14 @@
--TEST--
Test stomp::send() - test send with receipt
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->send('/queue/test-06', 'A real message', array('receipt' => 'message-12345')));
?>
--EXPECTF--
bool(true)

View File

@ -14,4 +14,4 @@ $s->subscribe('/queue/test', 'string');
--EXPECTF-- --EXPECTF--
Warning: Stomp::subscribe(): Destination can not be empty in %s on line %d Warning: Stomp::subscribe(): Destination can not be empty in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::subscribe() must be an array, string given in %s on line %d Catchable fatal error: Argument 2 passed to Stomp::subscribe() must be %s array, string given in %s on line %d

View File

@ -14,4 +14,4 @@ $s->unsubscribe('/queue/test', 'string');
--EXPECTF-- --EXPECTF--
Warning: Stomp::unsubscribe(): Destination can not be empty in %s on line %d Warning: Stomp::unsubscribe(): Destination can not be empty in %s on line %d
Catchable fatal error: Argument 2 passed to Stomp::unsubscribe() must be an array, string given in %s on line %d Catchable fatal error: Argument 2 passed to Stomp::unsubscribe() must be %s array, string given in %s on line %d

View File

@ -9,7 +9,7 @@ Test stomp::readFrame() - tests functionnality and parameters
<?php <?php
$s = new Stomp(); $s = new Stomp();
$s->send('/queue/test-09', 'A test Message'); $s->send('/queue/test-09', 'A test Message');
$s->subscribe('/queue/test-09'); $s->subscribe('/queue/test-09', array('ack' => 'auto'));
var_dump($s->readFrame()->body); var_dump($s->readFrame()->body);
var_dump($s->readFrame()); var_dump($s->readFrame());

View File

@ -9,7 +9,7 @@ Test stomp_read_frame() - test functionnality and parameters
<?php <?php
$link = stomp_connect(); $link = stomp_connect();
stomp_send($link, '/queue/test-09', 'A test Message'); stomp_send($link, '/queue/test-09', 'A test Message');
stomp_subscribe($link, '/queue/test-09'); stomp_subscribe($link, '/queue/test-09', array('ack' => 'auto'));
$result = stomp_read_frame($link); $result = stomp_read_frame($link);
var_dump($result['body']); var_dump($result['body']);
var_dump(stomp_read_frame($link)); var_dump(stomp_read_frame($link));

View File

@ -18,7 +18,7 @@ class customFrame extends stompFrame
$s = new Stomp(); $s = new Stomp();
$s->send('/queue/test-09', 'A test Message'); $s->send('/queue/test-09', 'A test Message');
$s->subscribe('/queue/test-09'); $s->subscribe('/queue/test-09', array('ack' => 'auto'));
$frame = $s->readFrame('customFrame'); $frame = $s->readFrame('customFrame');
var_dump(get_class($frame), $frame->body); var_dump(get_class($frame), $frame->body);
?> ?>

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,26 @@
--TEST--
Test stomp::readFrame() - test frame stack
--SKIPIF--
<?php
if (!extension_loaded("stomp")) print "skip";
if (!stomp_connect()) print "skip";
?>
--FILE--
<?php
$s = new Stomp();
var_dump($s->subscribe('/queue/test-buffer', array('ack' => 'auto')));
var_dump($s->send('/queue/test-buffer', "Message1", array('receipt' => 'msg-1')));
var_dump($s->send('/queue/test-buffer', "Message2", array('receipt' => 'msg-2')));
var_dump($s->send('/queue/test-buffer', "Message3", array('receipt' => 'msg-3')));
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
var_dump($s->readFrame()->body);
?>
--EXPECTF--
bool(true)
bool(true)
bool(true)
bool(true)
string(8) "Message1"
string(8) "Message2"
string(8) "Message3"

View File

@ -17,7 +17,7 @@ var_dump($s->send('/queue/test-011-commit', 'bar', array('transaction' => 't1'))
// sends a message to the queue and asks for a receipt // sends a message to the queue and asks for a receipt
$s->send('/queue/test-011-commit', 'bar', array('transaction' => 't2', 'receipt' => 'tptp')); $s->send('/queue/test-011-commit', 'bar', array('transaction' => 't2', 'receipt' => 'tptp'));
var_dump($s->error()); echo gettype($s->error()) . PHP_EOL;
// commits a valid transaction // commits a valid transaction
var_dump($s->commit('t1')); var_dump($s->commit('t1'));
@ -28,15 +28,15 @@ var_dump($s->commit(null));
// commits a non valid transaction (a transaction id that does not exist) and asks for a receipt // commits a non valid transaction (a transaction id that does not exist) and asks for a receipt
$s->commit('t2', array('receipt' => 'commit-key')); $s->commit('t2', array('receipt' => 'commit-key'));
var_dump($s->error()); echo gettype($s->error());
unset($s); unset($s);
?> ?>
--EXPECTF-- --EXPECTF--
bool(true) bool(true)
bool(true) bool(true)
string(%d) "Invalid transaction id: %s" string
bool(true) bool(true)
bool(false) bool(false)
bool(true) bool(true)
string(%d) "Must specify the transaction you are committing" string

View File

@ -19,7 +19,7 @@ try {
$stomp->send($queue, $msg); $stomp->send($queue, $msg);
/* subscribe to messages from the queue 'foo' */ /* subscribe to messages from the queue 'foo' */
$stomp->subscribe($queue); $stomp->subscribe($queue, array('ack' => 'auto'));
/* read a frame */ /* read a frame */
$frame = $stomp->readFrame(); $frame = $stomp->readFrame();