commit 3718df2b8434ece654d3337138e608cfa834017f Author: Pierrick Charron Date: Fri Oct 30 01:28:16 2009 +0000 Stomp extension - Initial import diff --git a/CREDITS b/CREDITS new file mode 100644 index 0000000..fb82648 --- /dev/null +++ b/CREDITS @@ -0,0 +1,2 @@ +stomp +Pierrick Charron diff --git a/EXPERIMENTAL b/EXPERIMENTAL new file mode 100644 index 0000000..204874b --- /dev/null +++ b/EXPERIMENTAL @@ -0,0 +1,2 @@ +this extension is experimental, its functions may change their names or move +to extension all together so do not rely to much on them you have been warned! diff --git a/TODO b/TODO new file mode 100644 index 0000000..f09aa46 --- /dev/null +++ b/TODO @@ -0,0 +1,4 @@ +Stomp ext TODO +============== + +- SSL Support diff --git a/config.m4 b/config.m4 new file mode 100644 index 0000000..60f34ce --- /dev/null +++ b/config.m4 @@ -0,0 +1,10 @@ +dnl $Id$ +dnl config.m4 for extension stomp + +PHP_ARG_ENABLE(stomp, whether to enable stomp support, +Make sure that the comment is aligned: +[ --enable-stomp Enable stomp support]) + +if test "$PHP_STOMP" != "no"; then + PHP_NEW_EXTENSION(stomp, stomp.c php_stomp.c, $ext_shared) +fi diff --git a/config.w32 b/config.w32 new file mode 100644 index 0000000..68d99de --- /dev/null +++ b/config.w32 @@ -0,0 +1,9 @@ +// $Id$ +// vim:ft=javascript + +ARG_ENABLE("stomp", "enable stomp support", "no"); + +if (PHP_STOMP != "no") { + EXTENSION("stomp", "stomp.c php_stomp.c"); +} + diff --git a/doc/classes.php b/doc/classes.php new file mode 100644 index 0000000..59b6bc0 --- /dev/null +++ b/doc/classes.php @@ -0,0 +1,165 @@ +send($queue, $msg); + + $stomp->subscribe($queue); + $frame = $stomp->readFrame(); + if ($frame->body === $msg) { + echo "Worked\n"; + $stomp->ack($frame, array('receipt' => 'message-12345')); + } else { + echo "Failed\n"; + } + + $stomp->disconnect(); +} catch(StompException $e) { + echo $e->getMessage(); +} + diff --git a/examples/procedural.php b/examples/procedural.php new file mode 100644 index 0000000..662ca49 --- /dev/null +++ b/examples/procedural.php @@ -0,0 +1,24 @@ + 't1')); + stomp_commit($stomp, 't1'); + + stomp_subscribe($stomp, $queue); + $frame = stomp_read_frame($stomp); + if ($frame['body'] === $msg) { + echo "Worked\n"; + stomp_ack($stomp, $frame['headers']['message-id']); + } else { + echo "Failed\n"; + } + + stomp_close($stomp); +} + diff --git a/php_stomp.c b/php_stomp.c new file mode 100755 index 0000000..8ab68a5 --- /dev/null +++ b/php_stomp.c @@ -0,0 +1,992 @@ +/* + +----------------------------------------------------------------------+ + | PHP Version 5 | + +----------------------------------------------------------------------+ + | Copyright (c) 1997-2009 The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Pierrick Charron | + +----------------------------------------------------------------------+ +*/ + +/* $Id$ */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "php.h" +#include "php_ini.h" +#include "zend_exceptions.h" +#include "ext/standard/info.h" +#include "ext/standard/url.h" +#include "ext/standard/fsock.h" +#include "stomp.h" +#include "php_stomp.h" + +#include "ext/standard/php_smart_str.h" + +#define FETCH_STOMP_OBJECT \ + i_obj = (stomp_object_t *) zend_object_store_get_object(stomp_object TSRMLS_CC); \ + if (!(stomp = i_obj->stomp)) { \ + php_error_docref(NULL TSRMLS_CC, E_WARNING, PHP_STOMP_ERR_NO_CTR); \ + RETURN_FALSE; \ + } + +#define INIT_FRAME_L(frame, cmd, l) \ + frame.command = cmd; \ + frame.command_length = l; \ + ALLOC_HASHTABLE(frame.headers); \ + zend_hash_init(frame.headers, 0, NULL, NULL, 0); + +#define INIT_FRAME(frame, cmd) INIT_FRAME_L(frame, cmd, sizeof(cmd)-1) + +#define FRAME_HEADER_FROM_HASHTABLE(h, p) \ + HashTable *properties_ht = p; \ + zval **value = NULL; \ + char *string_key = NULL; \ + ulong num_key; \ + zend_hash_internal_pointer_reset(properties_ht); \ + for (zend_hash_internal_pointer_reset(properties_ht); \ + zend_hash_get_current_data(properties_ht, (void **)&value) == SUCCESS; \ + zend_hash_move_forward(properties_ht)) { \ + if (zend_hash_get_current_key(properties_ht, &string_key, &num_key, 1) != HASH_KEY_IS_STRING) { \ + php_error_docref(NULL TSRMLS_CC, E_WARNING, "Invalid argument or parameter array"); \ + break; \ + } else { \ + if (Z_TYPE_PP(value) != IS_STRING) { \ + SEPARATE_ZVAL(value); \ + convert_to_string(*value); \ + } \ + zend_hash_add(h, string_key, strlen(string_key)+1, Z_STRVAL_PP(value), Z_STRLEN_PP(value)+1, NULL); \ + efree(string_key); \ + } \ + } + +#define CLEAR_FRAME(frame) \ + zend_hash_destroy(frame.headers); \ + efree(frame.headers); + +#define STOMP_ERROR(errno, msg, ... ) \ + if (stomp_object) { \ + zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, msg, ##__VA_ARGS__); \ + } else { \ + php_error_docref(NULL TSRMLS_CC, E_WARNING, msg, ##__VA_ARGS__); \ + } + +static int le_stomp; + +ZEND_DECLARE_MODULE_GLOBALS(stomp) +static PHP_GINIT_FUNCTION(stomp); + +/* {{{ stomp_class_entry */ +zend_class_entry *stomp_ce_stomp; +zend_class_entry *stomp_ce_exception; +zend_class_entry *stomp_ce_frame; +/* }}} */ + +/* {{{ arg_info */ +ZEND_BEGIN_ARG_INFO_EX(stomp_no_args, 0, 0, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_connect_args, 0, 0, 0) +ZEND_ARG_INFO(0, broker) +ZEND_ARG_INFO(0, username) +ZEND_ARG_INFO(0, password) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_link_only, 0, 0, 1) +ZEND_ARG_INFO(0, link) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_send_args, 0, 0, 3) +ZEND_ARG_INFO(0, link) +ZEND_ARG_INFO(0, destination) +ZEND_ARG_INFO(0, msg) +ZEND_ARG_ARRAY_INFO(0, properties, 1) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_oop_send_args, 0, 0, 2) +ZEND_ARG_INFO(0, destination) +ZEND_ARG_INFO(0, msg) +ZEND_ARG_ARRAY_INFO(0, properties, 1) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_subscribe_args, 0, 0, 2) +ZEND_ARG_INFO(0, link) +ZEND_ARG_INFO(0, destination) +ZEND_ARG_ARRAY_INFO(0, properties, 1) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_oop_subscribe_args, 0, 0, 1) +ZEND_ARG_INFO(0, destination) +ZEND_ARG_ARRAY_INFO(0, properties, 1) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_transaction_args, 0, 0, 2) +ZEND_ARG_INFO(0, link) +ZEND_ARG_INFO(0, transaction_id) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_oop_transaction_args, 0, 0, 1) +ZEND_ARG_INFO(0, transaction_id) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_ack_args, 0, 0, 2) +ZEND_ARG_INFO(0, link) +ZEND_ARG_INFO(0, msg) +ZEND_ARG_ARRAY_INFO(0, properties, 1) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_oop_ack_args, 0, 0, 1) +ZEND_ARG_INFO(0, msg) +ZEND_ARG_ARRAY_INFO(0, properties, 1) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_set_timeout_args, 0, 0, 2) +ZEND_ARG_INFO(0, link) +ZEND_ARG_INFO(0, seconds) +ZEND_ARG_INFO(0, microseconds) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(stomp_oop_set_timeout_args, 0, 0, 1) +ZEND_ARG_INFO(0, seconds) +ZEND_ARG_INFO(0, microseconds) +ZEND_END_ARG_INFO() +/* }}} */ + +/* {{{ stomp_functions */ +zend_function_entry stomp_functions[] = { + PHP_FE(stomp_version, stomp_no_args) + PHP_FE(stomp_connect, stomp_connect_args) + PHP_FE(stomp_get_session_id, stomp_link_only) + PHP_FE(stomp_close, stomp_link_only) + PHP_FE(stomp_send, stomp_send_args) + PHP_FE(stomp_subscribe, stomp_subscribe_args) + PHP_FE(stomp_has_frame, stomp_link_only) + PHP_FE(stomp_read_frame, stomp_link_only) + PHP_FE(stomp_unsubscribe, stomp_subscribe_args) + PHP_FE(stomp_begin, stomp_transaction_args) + PHP_FE(stomp_commit, stomp_transaction_args) + PHP_FE(stomp_abort, stomp_transaction_args) + PHP_FE(stomp_ack, stomp_ack_args) + PHP_FE(stomp_error, stomp_link_only) + PHP_FE(stomp_set_timeout, stomp_set_timeout_args) + PHP_FE(stomp_get_timeout, stomp_link_only) + {NULL, NULL, NULL} +}; +/* }}} */ + +/* {{{ stomp_methods[] */ +static zend_function_entry stomp_methods[] = { + PHP_FALIAS(__construct, stomp_connect, stomp_connect_args) + PHP_FALIAS(getSessionId, stomp_get_session_id, stomp_no_args) + PHP_FALIAS(disconnect, stomp_close, stomp_no_args) + PHP_FALIAS(send, stomp_send, stomp_oop_send_args) + PHP_FALIAS(subscribe, stomp_subscribe, stomp_oop_subscribe_args) + PHP_FALIAS(hasFrame, stomp_has_frame, stomp_no_args) + PHP_FALIAS(readFrame, stomp_read_frame, stomp_no_args) + PHP_FALIAS(unsubscribe, stomp_unsubscribe, stomp_oop_subscribe_args) + PHP_FALIAS(begin, stomp_begin, stomp_oop_transaction_args) + PHP_FALIAS(commit, stomp_commit, stomp_oop_transaction_args) + PHP_FALIAS(abort, stomp_abort, stomp_oop_transaction_args) + PHP_FALIAS(ack, stomp_ack, stomp_oop_ack_args) + PHP_FALIAS(error, stomp_error, stomp_no_args) + PHP_FALIAS(setTimeout, stomp_set_timeout, stomp_oop_set_timeout_args) + PHP_FALIAS(getTimeout, stomp_get_timeout, stomp_no_args) + {NULL, NULL, NULL} +}; +/* }}} */ + +/* {{{ stomp_frame_methods[] */ +static zend_function_entry stomp_frame_methods[] = { + PHP_ME(stompframe, __construct, NULL, ZEND_ACC_PUBLIC) + {NULL, NULL, NULL} +}; +/* }}} */ + +/* {{{ stomp_module_entry */ +zend_module_entry stomp_module_entry = { +#if ZEND_MODULE_API_NO >= 20010901 + STANDARD_MODULE_HEADER, +#endif + PHP_STOMP_EXTNAME, + stomp_functions, + PHP_MINIT(stomp), + PHP_MSHUTDOWN(stomp), + NULL, + NULL, + PHP_MINFO(stomp), +#if ZEND_MODULE_API_NO >= 20010901 + PHP_STOMP_VERSION, +#endif + PHP_MODULE_GLOBALS(stomp), + PHP_GINIT(stomp), + NULL, + NULL, + STANDARD_MODULE_PROPERTIES_EX +}; +/* }}} */ + +PHP_INI_BEGIN() +STD_PHP_INI_ENTRY("stomp.default_broker", "tcp://localhost:61613", PHP_INI_ALL, OnUpdateString, default_broker, zend_stomp_globals, stomp_globals) +STD_PHP_INI_ENTRY("stomp.default_timeout_sec", "2", PHP_INI_ALL, OnUpdateLong, timeout_sec, zend_stomp_globals, stomp_globals) +STD_PHP_INI_ENTRY("stomp.default_timeout_usec", "0", PHP_INI_ALL, OnUpdateLong, timeout_usec, zend_stomp_globals, stomp_globals) +PHP_INI_END() + +/* {{{ PHP_GINIT_FUNCTION */ +static PHP_GINIT_FUNCTION(stomp) +{ + stomp_globals->default_broker = NULL; + stomp_globals->timeout_sec = 2; + stomp_globals->timeout_usec = 0; +} +/* }}} */ + +ZEND_DECLARE_MODULE_GLOBALS(stomp) + +#ifdef COMPILE_DL_STOMP +ZEND_GET_MODULE(stomp) +#endif + +/* {{{ constructor/destructor */ +static void stomp_send_disconnect(stomp_t *stomp TSRMLS_DC) +{ + stomp_frame_t frame = {0}; + INIT_FRAME(frame, "DISCONNECT"); + + stomp_send(stomp, &frame TSRMLS_CC); + CLEAR_FRAME(frame); +} + +static void php_destroy_stomp_res(zend_rsrc_list_entry *rsrc TSRMLS_DC) +{ + stomp_t *stomp = (stomp_t *) rsrc->ptr; + stomp_send_disconnect(stomp TSRMLS_CC); + stomp_close(stomp TSRMLS_CC); +} + +static void stomp_object_free_storage(stomp_object_t *intern TSRMLS_DC) +{ + zend_object_std_dtor(&intern->std TSRMLS_CC); + if (intern->stomp) { + stomp_send_disconnect(intern->stomp TSRMLS_CC); + stomp_close(intern->stomp TSRMLS_CC); + } + efree(intern); +} + + +static zend_object_value php_stomp_new(zend_class_entry *ce TSRMLS_DC) +{ + zend_object_value retval; + stomp_object_t *intern; + zval *tmp; + + intern = (stomp_object_t *) ecalloc(1, sizeof(stomp_object_t)); + intern->stomp = NULL; + + zend_object_std_init(&intern->std, ce TSRMLS_CC); + zend_hash_copy(intern->std.properties, &ce->default_properties, (copy_ctor_func_t) zval_add_ref, (void *) &tmp, sizeof(zval *)); + + retval.handle = zend_objects_store_put(intern, (zend_objects_store_dtor_t)zend_objects_destroy_object, (zend_objects_free_object_storage_t) stomp_object_free_storage, NULL TSRMLS_CC); + retval.handlers = zend_get_std_object_handlers(); + + return retval; +} +/* }}} */ + +/* {{{ PHP_MINIT_FUNCTION */ +PHP_MINIT_FUNCTION(stomp) +{ + zend_class_entry ce; + + /* Ressource */ + le_stomp = zend_register_list_destructors_ex(php_destroy_stomp_res, NULL, PHP_STOMP_RES_NAME, module_number); + + /* Register Stomp class */ + INIT_CLASS_ENTRY(ce, PHP_STOMP_CLASSNAME, stomp_methods); + stomp_ce_stomp = zend_register_internal_class(&ce TSRMLS_CC); + stomp_ce_stomp->create_object = php_stomp_new; + + /* Register StompFrame class */ + INIT_CLASS_ENTRY(ce, PHP_STOMP_FRAME_CLASSNAME, stomp_frame_methods); + stomp_ce_frame = zend_register_internal_class(&ce TSRMLS_CC); + + /* Properties */ + zend_declare_property_null(stomp_ce_frame, "command", sizeof("command")-1, ZEND_ACC_PUBLIC TSRMLS_CC); + zend_declare_property_null(stomp_ce_frame, "headers", sizeof("headers")-1, ZEND_ACC_PUBLIC TSRMLS_CC); + zend_declare_property_null(stomp_ce_frame, "body", sizeof("body")-1, ZEND_ACC_PUBLIC TSRMLS_CC); + + /* Register StompException class */ + INIT_CLASS_ENTRY(ce, PHP_STOMP_EXCEPTION_CLASSNAME, NULL); + stomp_ce_exception = zend_register_internal_class_ex(&ce, zend_exception_get_default(TSRMLS_C), NULL TSRMLS_CC); + + /** Register INI entries **/ + REGISTER_INI_ENTRIES(); + + return SUCCESS; +} +/* }}} */ + +/* {{{ PHP_MSHUTDOWN_FUNCTION */ +PHP_MSHUTDOWN_FUNCTION(stomp) +{ + /* Unregister INI entries */ + UNREGISTER_INI_ENTRIES(); + return SUCCESS; +} +/* }}} */ + +/* {{{ PHP_MINFO_FUNCTION */ +PHP_MINFO_FUNCTION(stomp) +{ + php_info_print_table_start(); + php_info_print_table_header(2, PHP_STOMP_EXTNAME, "enabled"); + php_info_print_table_row(2, "API version", PHP_STOMP_VERSION); + php_info_print_table_end(); + DISPLAY_INI_ENTRIES(); +} +/* }}} */ + +/* {{{ proto string stomp_version() + Get stomp extension version */ +PHP_FUNCTION(stomp_version) +{ + RETURN_STRINGL(PHP_STOMP_VERSION, sizeof(PHP_STOMP_VERSION)-1, 1); +} +/* }}} */ + +/* {{{ proto Stomp::__construct([string broker [, string username [, string password]]]) + Connect to server */ +PHP_FUNCTION(stomp_connect) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + char *broker = NULL, *username = NULL, *password = NULL; + int broker_len = 0, username_len = 0, password_len = 0; + struct timeval tv; + php_url *url_parts; + + tv.tv_sec = 2; + tv.tv_usec = 0; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sss", &broker, &broker_len, &username, &username_len, &password, &password_len) == FAILURE) { + return; + } + + /* Verify that broker URI */ + if (!broker) { + broker = STOMP_G(default_broker); + } + + url_parts = php_url_parse_ex(broker, strlen(broker)); + + if (!url_parts || !url_parts->host) { + STOMP_ERROR(0, PHP_STOMP_ERR_INVALID_BROKER_URI); + php_url_free(url_parts); + return; + } + + if (url_parts->scheme && strcmp(url_parts->scheme, "tcp") != 0) { + STOMP_ERROR(0, PHP_STOMP_ERR_INVALID_BROKER_URI_SCHEME); + php_url_free(url_parts); + return; + } + + stomp = stomp_new(url_parts->host, url_parts->port ? url_parts->port : 61613, STOMP_G(timeout_sec), STOMP_G(timeout_usec) TSRMLS_CC); + php_url_free(url_parts); + + if ((stomp->status = stomp_connect(stomp TSRMLS_CC))) { + stomp_frame_t *res; + stomp_frame_t frame = {0}; + + INIT_FRAME(frame, "CONNECT"); + if (username_len == 0) { + username = ""; + } + if (password_len == 0) { + password = ""; + } + zend_hash_add(frame.headers, "login", sizeof("login"), username, username_len + 1, NULL); + zend_hash_add(frame.headers, "passcode", sizeof("passcode"), password, password_len + 1, NULL); + + stomp_send(stomp, &frame TSRMLS_CC); + CLEAR_FRAME(frame); + + /* Retreive Response */ + res = stomp_read_frame(stomp); + if (NULL == res) { + STOMP_ERROR(0, PHP_STOMP_ERR_SERVER_NOT_RESPONDING); + } else if (0 != strncmp("CONNECTED", res->command, sizeof("CONNECTED")-1)) { + if (stomp->error) { + STOMP_ERROR(stomp->errnum, "%s", stomp->error); + } else { + STOMP_ERROR(0, PHP_STOMP_ERR_UNKNOWN); + } + } else { + char *key = NULL; + + if (zend_hash_find(res->headers, "session", sizeof("session"), (void **)&key) == SUCCESS) { + if (stomp->session) { + efree(stomp->session); + } + stomp->session = estrdup(key); + } + + frame_destroy(res); + + if (!stomp_object) { + ZEND_REGISTER_RESOURCE(return_value, stomp, le_stomp); + return; + } else { + stomp_object_t *i_obj = (stomp_object_t *) zend_object_store_get_object(stomp_object TSRMLS_CC); + if (i_obj->stomp) { + stomp_close(i_obj->stomp TSRMLS_CC); + } + i_obj->stomp = stomp; + RETURN_TRUE; + } + } + } else { + STOMP_ERROR(0, "%s", stomp->error); + } + + stomp_close(stomp TSRMLS_CC); + RETURN_FALSE; +} +/* }}} */ + +/* {{{ proto string Stomp::getSessionId() + Get the current stomp session ID */ +PHP_FUNCTION(stomp_get_session_id) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + if (stomp_object) { + stomp_object_t *i_obj = NULL; + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + if (!stomp) { + php_error_docref(NULL TSRMLS_CC, E_WARNING, PHP_STOMP_ERR_NO_CTR); + RETURN_FALSE; + } + + if (stomp->session) { + RETURN_STRING(stomp->session, 1); + } else { + RETURN_FALSE; + } +} +/* }}} */ + +/* {{{ proto boolean Stomp::disconnect() + Close stomp connection */ +PHP_FUNCTION(stomp_close) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + + if (stomp_object) { + stomp_object_t *i_obj = NULL; + FETCH_STOMP_OBJECT; + stomp_send_disconnect(stomp TSRMLS_CC); + stomp_close(stomp TSRMLS_CC); + i_obj->stomp = NULL; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + zend_list_delete(Z_RESVAL_P(arg)); + } + + RETURN_TRUE; +} +/* }}} */ + +/* {{{ proto boolean Stomp::send(string destination, mixed msg [, array properties]) + Sends a message to a destination in the messaging system */ +PHP_FUNCTION(stomp_send) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + char *destination = NULL; + int destination_length = 0; + zval *msg = NULL, *properties = NULL; + stomp_frame_t frame = {0}; + int success = 0; + + if (stomp_object) { + stomp_object_t *i_obj = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sz|a!", &destination, &destination_length, &msg, &properties) == FAILURE) { + return; + } + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rsz|a!", &arg, &destination, &destination_length, &msg, &properties) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + /* Verify destination */ + if (0 == destination_length) { + STOMP_ERROR(0, PHP_STOMP_ERR_EMPTY_DESTINATION); + RETURN_FALSE; + } + + INIT_FRAME(frame, "SEND"); + + /* Translate a PHP array to a stomp_header array */ + if (NULL != properties) { + FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(properties)); + } + + /* Add the destination */ + zend_hash_add(frame.headers, "destination", sizeof("destination"), destination, destination_length + 1, NULL); + + if (Z_TYPE_P(msg) == IS_STRING) { + frame.body = Z_STRVAL_P(msg); + frame.body_length = -1; + } else if (Z_TYPE_P(msg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(msg), stomp_ce_frame TSRMLS_CC)) { + zval *frame_obj_prop = NULL; + frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "command", sizeof("command")-1, 1 TSRMLS_CC); + if (Z_TYPE_P(frame_obj_prop) == IS_STRING) { + frame.command = Z_STRVAL_P(frame_obj_prop); + frame.command_length = Z_STRLEN_P(frame_obj_prop); + } + frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "body", sizeof("body")-1, 1 TSRMLS_CC); + if (Z_TYPE_P(frame_obj_prop) == IS_STRING) { + frame.body = Z_STRVAL_P(frame_obj_prop); + frame.body_length = -1; + } + frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "headers", sizeof("headers")-1, 1 TSRMLS_CC); + if (Z_TYPE_P(frame_obj_prop) == IS_ARRAY) { + FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(frame_obj_prop)); + } + } else { + php_error_docref(NULL TSRMLS_CC, E_ERROR, "Expects parameter %d to be a string or a StompFrame object.", stomp_object?2:3); + } + + if (stomp_send(stomp, &frame TSRMLS_CC) > 0) { + success = stomp_valid_receipt(stomp, &frame); + } + + CLEAR_FRAME(frame); + RETURN_BOOL(success); +} +/* }}} */ + +/* {{{ proto boolean Stomp::subscribe(string destination [, array properties]) + Register to listen to a given destination */ +PHP_FUNCTION(stomp_subscribe) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + char *destination = NULL; + int destination_length = 0; + zval *properties = NULL; + stomp_frame_t frame = {0}; + int success = 0; + + if (stomp_object) { + stomp_object_t *i_obj = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a!", &destination, &destination_length, &properties) == FAILURE) { + return; + } + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rs|a!", &arg, &destination, &destination_length, &properties) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + /* Verify destination */ + if (0 == destination_length) { + STOMP_ERROR(0, PHP_STOMP_ERR_EMPTY_DESTINATION); + RETURN_FALSE; + } + + INIT_FRAME(frame, "SUBSCRIBE"); + + /* Translate a PHP array to a stomp_header array */ + if (NULL != properties) { + FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(properties)); + } + + /* Add the destination */ + zend_hash_add(frame.headers, "ack", sizeof("ack"), "client", sizeof("client"), NULL); + zend_hash_add(frame.headers, "destination", sizeof("destination"), destination, destination_length + 1, NULL); + zend_hash_add(frame.headers, "activemq.prefetchSize", sizeof("activemq.prefetchSize"), "1", sizeof("1"), NULL); + + if (stomp_send(stomp, &frame TSRMLS_CC) > 0) { + success = stomp_valid_receipt(stomp, &frame); + } + + CLEAR_FRAME(frame); + RETURN_BOOL(success); +} +/* }}} */ + +/* {{{ proto boolean Stomp::unsubscribe(string destination [, array properties]) + Remove an existing subscription */ +PHP_FUNCTION(stomp_unsubscribe) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + char *destination = NULL; + int destination_length = 0; + zval *properties = NULL; + stomp_frame_t frame = {0}; + int success = 0; + + if (stomp_object) { + stomp_object_t *i_obj = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a!", &destination, &destination_length, &properties) == FAILURE) { + return; + } + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rs|a!", &arg, &destination, &destination_length, &properties) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + /* Verify destination */ + if (0 == destination_length) { + STOMP_ERROR(0, PHP_STOMP_ERR_EMPTY_DESTINATION); + RETURN_FALSE; + } + + INIT_FRAME(frame, "UNSUBSCRIBE"); + + /* Translate a PHP array to a stomp_header array */ + if (NULL != properties) { + FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(properties)); + } + + /* Add the destination */ + zend_hash_add(frame.headers, "destination", sizeof("destination"), destination, destination_length + 1, NULL); + + if (stomp_send(stomp, &frame TSRMLS_CC) > 0) { + success = stomp_valid_receipt(stomp, &frame); + } + + CLEAR_FRAME(frame); + RETURN_BOOL(success); +} +/* }}} */ + +/* {{{ proto boolean Stomp::hasFrame() + Indicate whether or not there is a frame ready to read */ +PHP_FUNCTION(stomp_has_frame) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + if (stomp_object) { + stomp_object_t *i_obj = NULL; + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + RETURN_BOOL(stomp_select(stomp) > 0); +} +/* }}} */ + +/* {{{ proto StompFrame Stomp::readFrame() + Read the next frame */ +PHP_FUNCTION(stomp_read_frame) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + stomp_frame_t *res = NULL; + + if (stomp_object) { + stomp_object_t *i_obj = NULL; + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + if (stomp_select(stomp) > 0 && (res = stomp_read_frame(stomp))) { + zval *headers = NULL; + MAKE_STD_ZVAL(headers); + array_init(headers); + if (res->headers) { + char *key; + ulong pos; + zend_hash_internal_pointer_reset(res->headers); + + while (zend_hash_get_current_key(res->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) { + char *value = NULL; + if (zend_hash_get_current_data(res->headers, (void **)&value) == SUCCESS) { + add_assoc_string(headers, key, value, 1); + } + zend_hash_move_forward(res->headers); + } + } + + if (stomp_object) { + object_init_ex(return_value, stomp_ce_frame); + zend_update_property_stringl(stomp_ce_frame, return_value, "command", sizeof("command")-1, res->command, res->command_length TSRMLS_CC); + if (res->body) { + zend_update_property_stringl(stomp_ce_frame, return_value, "body", sizeof("body")-1, res->body, res->body_length TSRMLS_CC); + } + zend_update_property(stomp_ce_frame, return_value, "headers", sizeof("headers")-1, headers TSRMLS_CC); + zval_ptr_dtor(&headers); + } else { + array_init(return_value); + add_assoc_string_ex(return_value, "command", sizeof("command"), res->command, 1); + if (res->body) { + add_assoc_string_ex(return_value, "body", sizeof("body"), res->body, 1); + } + add_assoc_zval_ex(return_value, "headers", sizeof("headers"), headers); + } + + frame_destroy(res); + } else { + RETURN_FALSE; + } +} +/* }}} */ + +/* {{{ _php_stomp_transaction */ +static void _php_stomp_transaction(INTERNAL_FUNCTION_PARAMETERS, char *cmd) { + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + char *transaction_id = NULL; + int transaction_id_length = 0; + stomp_frame_t frame = {0}; + int success = 0; + + if (stomp_object) { + stomp_object_t *i_obj = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|s", &transaction_id, &transaction_id_length) == FAILURE) { + return; + } + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r|s", &arg, &transaction_id, &transaction_id_length) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + INIT_FRAME_L(frame, cmd, strlen(cmd)); + + if (transaction_id_length > 0) { + zend_hash_add(frame.headers, "transaction", sizeof("transaction"), transaction_id, transaction_id_length + 1, NULL); + } + + if (stomp_send(stomp, &frame TSRMLS_CC) > 0) { + success = stomp_valid_receipt(stomp, &frame); + } + + CLEAR_FRAME(frame); + RETURN_BOOL(success); +} +/* }}} */ + +/* {{{ proto boolean Stomp::begin([string transactionId]) + Start a transaction */ +PHP_FUNCTION(stomp_begin) +{ + _php_stomp_transaction(INTERNAL_FUNCTION_PARAM_PASSTHRU, "BEGIN"); +} +/* }}} */ + +/* {{{ proto boolean Stomp::commit([string transactionId]) + Commit a transaction in progress */ +PHP_FUNCTION(stomp_commit) +{ + _php_stomp_transaction(INTERNAL_FUNCTION_PARAM_PASSTHRU, "COMMIT"); +} +/* }}} */ + +/* {{{ proto boolean Stomp::abort([string transactionId]) + Rollback a transaction in progress */ +PHP_FUNCTION(stomp_abort) +{ + _php_stomp_transaction(INTERNAL_FUNCTION_PARAM_PASSTHRU, "ABORT"); +} +/* }}} */ + +/* {{{ proto boolean Stomp::ack(mixed msg [, array properties]) + Acknowledge consumption of a message from a subscription using client acknowledgment */ +PHP_FUNCTION(stomp_ack) +{ + zval *stomp_object = getThis(); + zval *msg = NULL, *properties = NULL; + stomp_t *stomp = NULL; + stomp_frame_t frame = {0}; + int success = 0; + + if (stomp_object) { + stomp_object_t *i_obj = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|a!", &msg, &properties) == FAILURE) { + return; + } + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rz|a!", &arg, &msg, &properties) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + INIT_FRAME(frame, "ACK"); + + if (NULL != properties) { + FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(properties)); + } + + if (Z_TYPE_P(msg) == IS_STRING) { + zend_hash_add(frame.headers, "message-id", sizeof("message-id"), Z_STRVAL_P(msg), Z_STRLEN_P(msg) + 1, NULL); + } else if (Z_TYPE_P(msg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(msg), stomp_ce_frame TSRMLS_CC)) { + zval *frame_obj_prop = zend_read_property(stomp_ce_frame, msg, "headers", sizeof("headers")-1, 1 TSRMLS_CC); + if (Z_TYPE_P(frame_obj_prop) == IS_ARRAY) { + FRAME_HEADER_FROM_HASHTABLE(frame.headers, Z_ARRVAL_P(frame_obj_prop)); + } + } else { + php_error_docref(NULL TSRMLS_CC, E_ERROR, "Expects parameter %d to be a string or a StompFrame object.", stomp_object?2:3); + } + + if (stomp_send(stomp, &frame TSRMLS_CC) > 0) { + success = stomp_valid_receipt(stomp, &frame); + } + + CLEAR_FRAME(frame); + RETURN_BOOL(success); +} +/* }}} */ + +/* {{{ proto string Stomp::error() + Get the last error message */ +PHP_FUNCTION(stomp_error) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + if (stomp_object) { + stomp_object_t *i_obj = NULL; + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + if (stomp->error) { + RETURN_STRING(stomp->error, 1); + } else { + RETURN_FALSE; + } +} +/* }}} */ + +/* {{{ proto void Stomp::setTimeout(int seconds [, int microseconds]) + Set the timeout */ +PHP_FUNCTION(stomp_set_timeout) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + long sec, usec; + if (stomp_object) { + stomp_object_t *i_obj = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &sec, &usec) == FAILURE) { + return; + } + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rl|l", &arg, &sec, &usec) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + stomp->timeout_sec = sec; + stomp->timeout_usec = sec; +} +/* }}} */ + +/* {{{ proto array Stomp::getTimeout() + Get the timeout */ +PHP_FUNCTION(stomp_get_timeout) +{ + zval *stomp_object = getThis(); + stomp_t *stomp = NULL; + if (stomp_object) { + stomp_object_t *i_obj = NULL; + FETCH_STOMP_OBJECT; + } else { + zval *arg = NULL; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg) == FAILURE) { + return; + } + ZEND_FETCH_RESOURCE(stomp, stomp_t *, &arg, -1, PHP_STOMP_RES_NAME, le_stomp); + } + + array_init(return_value); + add_assoc_long_ex(return_value, "sec", sizeof("sec"), stomp->timeout_sec); + add_assoc_long_ex(return_value, "usec", sizeof("usec"), stomp->timeout_usec); +} +/* }}} */ + +/* {{{ proto void StompFrame::__construct([string command [, array headers [, string body]]]) + Create StompFrame object */ +PHP_METHOD(stompframe, __construct) +{ + zval *object = getThis(); + char *command = NULL, *body = NULL; + int command_length = 0, body_length = -1; + zval *headers = NULL; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sa!s", &command, &command_length, &headers, &body, &body_length) == FAILURE) { + return; + } + + if (command_length > 0) { + zend_update_property_stringl(stomp_ce_frame, object, "command", sizeof("command")-1, command, command_length TSRMLS_CC); + } + if (headers) { + zend_update_property(stomp_ce_frame, object, "headers", sizeof("headers")-1, headers TSRMLS_CC); + } + if (body_length > 0) { + zend_update_property_stringl(stomp_ce_frame, object, "body", sizeof("body")-1, body, body_length TSRMLS_CC); + } +} +/* }}} */ diff --git a/php_stomp.h b/php_stomp.h new file mode 100644 index 0000000..5d28d24 --- /dev/null +++ b/php_stomp.h @@ -0,0 +1,107 @@ +/* + +----------------------------------------------------------------------+ + | PHP Version 5 | + +----------------------------------------------------------------------+ + | Copyright (c) 1997-2009 The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Pierrick Charron | + +----------------------------------------------------------------------+ +*/ + +/* $Id$ */ + +#ifndef PHP_STOMP_H +#define PHP_STOMP_H + +typedef struct _stomp_object { + zend_object std; + stomp_t *stomp; +} stomp_object_t; + +#define PHP_STOMP_EXTNAME "Stomp" +#define PHP_STOMP_MAJOR_VERSION "0" +#define PHP_STOMP_MINOR_VERSION "1" +#define PHP_STOMP_PATCH_VERSION "0" +#define PHP_STOMP_VERSION PHP_STOMP_MAJOR_VERSION "." PHP_STOMP_MINOR_VERSION "." PHP_STOMP_PATCH_VERSION + +#define PHP_STOMP_RES_NAME "stomp connection" + +#define PHP_STOMP_CLASSNAME "Stomp" +#define PHP_STOMP_FRAME_CLASSNAME "StompFrame" +#define PHP_STOMP_EXCEPTION_CLASSNAME "StompException" + +#define PHP_STOMP_ERR_UNKNOWN "Stomp unknown error" +#define PHP_STOMP_ERR_INVALID_BROKER_URI "Invalid Broker URI" +#define PHP_STOMP_ERR_INVALID_BROKER_URI_SCHEME "Invalid Broker URI scheme" +#define PHP_STOMP_ERR_SERVER_NOT_RESPONDING "Server is not responding" +#define PHP_STOMP_ERR_EMPTY_DESTINATION "Destination can not be empty" +#define PHP_STOMP_ERR_NO_CTR "Stomp constructor was not called" + +extern zend_module_entry stomp_module_entry; +#define phpext_stomp_ptr &stomp_module_entry + +#ifdef PHP_WIN32 +#define PHP_STOMP_API __declspec(dllexport) +#else +#define PHP_STOMP_API +#endif + +#ifdef ZTS +#include "TSRM.h" +#endif + +PHP_MINIT_FUNCTION(stomp); +PHP_MSHUTDOWN_FUNCTION(stomp); +PHP_MINFO_FUNCTION(stomp); + +/* Methods declarations */ +PHP_FUNCTION(stomp_version); +PHP_FUNCTION(stomp_connect); +PHP_FUNCTION(stomp_get_session_id); +PHP_FUNCTION(stomp_close); +PHP_FUNCTION(stomp_send); +PHP_FUNCTION(stomp_subscribe); +PHP_FUNCTION(stomp_has_frame); +PHP_FUNCTION(stomp_read_frame); +PHP_FUNCTION(stomp_unsubscribe); +PHP_FUNCTION(stomp_begin); +PHP_FUNCTION(stomp_commit); +PHP_FUNCTION(stomp_abort); +PHP_FUNCTION(stomp_ack); +PHP_FUNCTION(stomp_error); +PHP_FUNCTION(stomp_set_timeout); +PHP_FUNCTION(stomp_get_timeout); + +PHP_METHOD(stompframe, __construct); + +ZEND_BEGIN_MODULE_GLOBALS(stomp) + char *default_broker; + long timeout_sec; + long timeout_usec; +ZEND_END_MODULE_GLOBALS(stomp) + +#ifdef ZTS +#define STOMP_G(v) TSRMG(stomp_globals_id, zend_stomp_globals *, v) +#else +#define STOMP_G(v) (stomp_globals.v) +#endif + +#endif /* PHP_STOMP_H */ + + +/* + * Local variables: + * tab-width: 4 + * c-basic-offset: 4 + * End: + * vim600: noet sw=4 ts=4 fdm=marker + * vim<600: noet sw=4 ts=4 + */ diff --git a/stomp.c b/stomp.c new file mode 100644 index 0000000..00013e4 --- /dev/null +++ b/stomp.c @@ -0,0 +1,447 @@ +/* + +----------------------------------------------------------------------+ + | PHP Version 5 | + +----------------------------------------------------------------------+ + | Copyright (c) 1997-2009 The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Pierrick Charron | + +----------------------------------------------------------------------+ +*/ + +/* $Id$ */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "php.h" +#include "ext/standard/php_smart_str.h" +#include "stomp.h" +#include "php_stomp.h" + +#define RETURN_READ_FRAME_FAIL { frame_destroy(f); return NULL; } + +/* {{{ stomp_new + */ +stomp_t *stomp_new(const char *host, unsigned short port, long timeout_sec, long timeout_usec TSRMLS_DC) +{ + /* Memory allocation for the stomp */ + stomp_t *stomp = (stomp_t *) emalloc(sizeof(stomp_t)); + memset(stomp, 0, sizeof(*stomp)); + + /* Define all values */ + stomp->host = (char *) emalloc(strlen(host) + 1); + memcpy(stomp->host, host, strlen(host)); + stomp->host[strlen(host)] = '\0'; + + stomp->port = port; + stomp->status = 0; + stomp->error = NULL; + stomp->errnum = 0; + stomp->timeout_sec = timeout_sec; + stomp->timeout_usec = timeout_usec; + stomp->session = NULL; + + return stomp; +} +/* }}} */ + +/* {{{ stomp_set_error + */ +void stomp_set_error(stomp_t *stomp, const char *error, int errnum) +{ + if (error != NULL) { + if (stomp->error != NULL) { + efree(stomp->error); + } + stomp->error = estrdup(error); + stomp->errnum = errnum; + } +} +/* }}} */ + +/* {{{ stomp_connect + */ +int stomp_connect(stomp_t *stomp TSRMLS_DC) +{ + char error[1024]; + socklen_t size; + struct timeval tv; + fd_set rfds; + + tv.tv_sec = stomp->timeout_sec; + tv.tv_usec = stomp->timeout_usec; + + stomp->fd = php_network_connect_socket_to_host(stomp->host, stomp->port, SOCK_STREAM, 0, &tv, NULL, NULL, NULL, 0 TSRMLS_CC); + if (stomp->fd == -1) { + snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port); + stomp_set_error(stomp, error, errno); + return 0; + } + + size = sizeof(stomp->localaddr); + memset(&stomp->localaddr, 0, size); + if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) { + snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno); + stomp_set_error(stomp, error, errno); + return 0; + } + + tv.tv_sec = 0; + tv.tv_usec = 0; + + FD_ZERO(&rfds); + FD_SET(stomp->fd, &rfds); + + if (select(stomp->fd + 1, NULL, &rfds, NULL, &tv) > 0) { + return 1; + } else { + snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port); + stomp_set_error(stomp, error, errno); + return 0; + } +} +/* }}} */ + +/* {{{ stomp_close + */ +int stomp_close(stomp_t *stomp TSRMLS_DC) +{ + if (NULL == stomp) { + return 1; + } + + if (stomp->fd != -1) { + closesocket(stomp->fd); + } + if (stomp->host) { + efree(stomp->host); + } + if (stomp->session) { + efree(stomp->session); + } + if (stomp->error) { + efree(stomp->error); + } + + efree(stomp); + return 1; +} +/* }}} */ + +/* {{{ stomp_send + */ +int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC) +{ + smart_str buf = {0}; + + /* Command */ + smart_str_appends(&buf, frame->command); + smart_str_appendc(&buf, '\n'); + + /* Headers */ + if (frame->headers) { + + char *key; + ulong pos; + zend_hash_internal_pointer_reset(frame->headers); + + while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) { + char *value = NULL; + + smart_str_appends(&buf, key); + smart_str_appendc(&buf, ':'); + + if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) { + smart_str_appends(&buf, value); + } + + smart_str_appendc(&buf, '\n'); + + zend_hash_move_forward(frame->headers); + } + } + + if (frame->body_length > 0) { + smart_str_appends(&buf, "content-length: "); + smart_str_append_long(&buf, frame->body_length); + smart_str_appendc(&buf, '\n'); + } + + smart_str_appendc(&buf, '\n'); + + if (frame->body > 0) { + smart_str_appends(&buf, frame->body); + } + + if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) { + return 0; + } + + smart_str_free(&buf); + + return 1; +} +/* }}} */ + +/* {{{ stomp_read_buffer + */ +static int stomp_read_buffer(stomp_t *stomp, char **data) +{ + int rc = 0; + size_t i = 0; + char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1); + + while (1) { + + size_t length = 1; + rc = stomp_recv(stomp, buffer + i, length); + if (rc < 1) { + efree(buffer); + return -1; + } + + if (1 == length) { + i++; + + if (buffer[i-1] == 0) { + char endline[1]; + if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) { + efree(buffer); + return 0; + } + break; + } + + if (i >= sizeof(buffer)) { + buffer = (char *) erealloc(buffer, sizeof(buffer) + STOMP_BUFSIZE); + } + + } + } + + if (i > 1) { + *data = (char *) emalloc(i); + if (NULL == *data) { + efree(buffer); + return -1; + } + + memcpy(*data, buffer, i); + } + + efree(buffer); + + return i-1; +} +/* }}} */ + +/* {{{ stomp_read_line + */ +static int stomp_read_line(stomp_t *stomp, char **data) +{ + int rc = 0; + size_t i = 0; + char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1); + + while (1) { + + size_t length = 1; + rc = stomp_recv(stomp, buffer + i, length); + if (rc < 1) { + efree(buffer); + return -1; + } + + if (1 == length) { + i++; + + if (buffer[i-1] == '\n') { + buffer[i-1] = 0; + break; + } else if (buffer[i-1] == 0) { + efree(buffer); + return 0; + } + + if (i >= sizeof(buffer)) { + buffer = (char *) erealloc(buffer, sizeof(buffer) + STOMP_BUFSIZE); + } + } + + } + + if (i > 1) { + *data = (char *) emalloc(i); + if (NULL == *data) { + efree(buffer); + return -1; + } + + memcpy(*data, buffer, i); + } + + efree(buffer); + + return i-1; +} +/* }}} */ + +/* {{{ frame_destroy + */ +void frame_destroy(stomp_frame_t *frame) +{ + if (frame) { + if (frame->command) { + efree(frame->command); + } + if (frame->body) { + efree(frame->body); + } + if (frame->headers) { + zend_hash_destroy(frame->headers); + efree(frame->headers); + } + efree(frame); + } +} +/* }}} */ + +/* {{{ stomp_read_frame + */ +stomp_frame_t *stomp_read_frame(stomp_t *stomp) +{ + stomp_frame_t *f = NULL; + char *cmd = NULL, *length_str = NULL; + int length = 0; + + INIT_STOMP_FRAME(f); + + if (NULL == f) { + return NULL; + } + + /* Parse the command */ + length = stomp_read_line(stomp, &cmd); + if (length < 1) { + RETURN_READ_FRAME_FAIL; + } + + f->command = cmd; + f->command_length = length; + + /* Parse the header */ + while (1) { + char *p = NULL; + length = stomp_read_line(stomp, &p); + + if (length < 0) { + RETURN_READ_FRAME_FAIL; + } + + if (0 == length) { + break; + } else { + char *p2 = NULL; + char *key; + char *value; + + p2 = strstr(p,":"); + + if (p2 == NULL) { + RETURN_READ_FRAME_FAIL; + } + + /* Null terminate the key */ + *p2=0; + key = p; + + /* The rest if the value. */ + value = p2+1; + + /* Insert key/value into hash table. */ + zend_hash_add(f->headers, key, strlen(key) + 1, value, strlen(value) + 1, NULL); + efree(p); + } + } + + /* Check for the content length */ + if (zend_hash_find(f->headers, "content-length", strlen("content-length"), (void **)&length_str) == SUCCESS) { + char endbuffer[2]; + length = 2; + + f->body_length = atoi(length_str); + f->body = (char *) emalloc(f->body_length); + + if (-1 == stomp_recv(stomp, f->body, f->body_length)) { + RETURN_READ_FRAME_FAIL; + } + + if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') { + RETURN_READ_FRAME_FAIL; + } + } else { + f->body_length = stomp_read_buffer(stomp, &f->body); + } + + if (0 == strncmp("ERROR", f->command, sizeof("ERROR") - 1)) { + char *error_msg = NULL; + if (zend_hash_find(f->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) { + stomp_set_error(stomp, error_msg, 0); + } + frame_destroy(f); + return NULL; + } + + return f; +} +/* }}} */ + +/* {{{ stomp_valid_receipt + */ +int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) { + int success = 1; + char *receipt = NULL; + if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) { + stomp_frame_t *res = stomp_read_frame(stomp); + success = 0; + if (res) { + if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) { + char *receipt_id = NULL; + if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS + && strlen(receipt) == strlen(receipt_id) + && !strcmp(receipt, receipt_id)) { + success = 1; + } + } + frame_destroy(res); + } + } + return success; +} +/* }}} */ + +/* {{{ stomp_select + */ +int stomp_select(stomp_t *stomp) +{ + struct timeval tv; + fd_set rfds; + + tv.tv_sec = stomp->timeout_sec; + tv.tv_usec = stomp->timeout_usec; + + FD_ZERO(&rfds); + FD_SET(stomp->fd, &rfds); + + return select(stomp->fd + 1, &rfds, NULL, NULL, &tv); +} +/* }}} */ diff --git a/stomp.h b/stomp.h new file mode 100755 index 0000000..272516c --- /dev/null +++ b/stomp.h @@ -0,0 +1,75 @@ +/* + +----------------------------------------------------------------------+ + | PHP Version 5 | + +----------------------------------------------------------------------+ + | Copyright (c) 1997-2009 The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Pierrick Charron | + +----------------------------------------------------------------------+ +*/ + +/* $Id$ */ + +#ifndef _STOMP_H_ +#define _STOMP_H_ + +#include "php_network.h" + +#define STOMP_BUFSIZE 4096 + +#define INIT_STOMP_FRAME(f) \ + f = (stomp_frame_t *) emalloc(sizeof(stomp_frame_t)); \ + f->command = NULL; f->body = NULL; \ + ALLOC_HASHTABLE(f->headers); \ + zend_hash_init(f->headers, 0, NULL, NULL, 0); + +#define stomp_recv(c,b,l) recv((c)->fd, b, l, 0) + +typedef struct _stomp { + php_socket_t fd; + php_sockaddr_storage localaddr; + char *host; + unsigned short port; + int status; + char *error; + int errnum; + long timeout_sec; + long timeout_usec; + char *session; +} stomp_t; + +typedef struct _stomp_frame { + char *command; + int command_length; + HashTable *headers; + char *body; + int body_length; +} stomp_frame_t; + +stomp_t *stomp_new(const char *host, unsigned short port, long timeout_sec, long timeout_usec TSRMLS_DC); +int stomp_connect(stomp_t *stomp TSRMLS_DC); +int stomp_close(stomp_t *stomp TSRMLS_DC); +int stomp_send(stomp_t *connection, stomp_frame_t *frame TSRMLS_DC); +stomp_frame_t *stomp_read_frame(stomp_t *connection); +int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame); +int stomp_select(stomp_t *connection); +void stomp_set_error(stomp_t *stomp, const char *error, int errnum); +void frame_destroy(stomp_frame_t *frame); +#endif /* _STOMP_H_ */ + +/* + * Local variables: + * tab-width: 4 + * c-basic-offset: 4 + * End: + * vim600: noet sw=4 ts=4 fdm=marker + * vim<600: noet sw=4 ts=4 + */ diff --git a/tests/001-stomp.phpt b/tests/001-stomp.phpt new file mode 100644 index 0000000..5a37654 --- /dev/null +++ b/tests/001-stomp.phpt @@ -0,0 +1,52 @@ +--TEST-- +Check stomp +--SKIPIF-- + +--FILE-- +' . $msg['body'] . PHP_EOL; + var_dump(stomp_ack($stomp, $msg['headers']['message-id'])); + var_dump(stomp_read_frame($stomp)); + var_dump(stomp_unsubscribe($stomp, $queue)); + var_dump(stomp_close($stomp)); +} + +echo PHP_EOL; + +$stomp = new Stomp('tcp://localhost:61613'); +try { + var_dump($stomp->send($queue, 'test')); + var_dump($stomp->subscribe($queue)); + $msg = $stomp->readFrame(); + echo $msg->command . '=>' . $msg->body . PHP_EOL; + var_dump($stomp->ack($msg->headers['message-id'])); + var_dump($stomp->readFrame()); + var_dump($stomp->unsubscribe($queue)); + var_dump($stomp->disconnect()); +} catch(StompException $e) { +} + +?> +--EXPECT-- +bool(true) +bool(true) +MESSAGE=>test +bool(true) +bool(false) +bool(true) +bool(true) + +bool(true) +bool(true) +MESSAGE=>test +bool(true) +bool(false) +bool(true) +bool(true) diff --git a/tests/002-version.phpt b/tests/002-version.phpt new file mode 100644 index 0000000..b4b4aa0 --- /dev/null +++ b/tests/002-version.phpt @@ -0,0 +1,10 @@ +--TEST-- +Check stomp_version +--SKIPIF-- + +--FILE-- + +--EXPECTREGEX-- +[0-9]\.[0-9]\.[0-9] diff --git a/tests/003-connect/001.phpt b/tests/003-connect/001.phpt new file mode 100644 index 0000000..6058eeb --- /dev/null +++ b/tests/003-connect/001.phpt @@ -0,0 +1,19 @@ +--TEST-- +Check stom_connect +--SKIPIF-- + +--FILE-- + +--EXPECTF-- +Warning: stomp_connect(): Invalid Broker URI in %s on line %d + +Warning: stomp_connect(): Invalid Broker URI in %s on line %d + +Warning: stomp_connect(): Invalid Broker URI in %s on line %d + +Warning: stomp_connect(): Invalid Broker URI scheme in %s on line %d diff --git a/tests/004-getSessionId/001.phpt b/tests/004-getSessionId/001.phpt new file mode 100644 index 0000000..1a82367 --- /dev/null +++ b/tests/004-getSessionId/001.phpt @@ -0,0 +1,14 @@ +--TEST-- +Check stomp_get_session_id +--SKIPIF-- + +--FILE-- + +--EXPECTF-- +%s diff --git a/tests/005-close/001.phpt b/tests/005-close/001.phpt new file mode 100644 index 0000000..8473542 --- /dev/null +++ b/tests/005-close/001.phpt @@ -0,0 +1,14 @@ +--TEST-- +Check stomp_close +--SKIPIF-- + +--FILE-- + +--EXPECT-- +close diff --git a/tests/006-send/001.phpt b/tests/006-send/001.phpt new file mode 100644 index 0000000..f6e806f --- /dev/null +++ b/tests/006-send/001.phpt @@ -0,0 +1,21 @@ +--TEST-- +Check stomp_send +--SKIPIF-- + +--FILE-- +send('', array()); +} catch(StompException $e) { + echo $e->getMessage(); +} + +$s->send('/queue/test', array()); +?> +--EXPECTF-- +Destination can not be empty +Fatal error: Stomp::send(): Expects parameter 2 to be a string or a StompFrame object. in %s on line %d diff --git a/tests/007-subscribe/001.phpt b/tests/007-subscribe/001.phpt new file mode 100644 index 0000000..74bf414 --- /dev/null +++ b/tests/007-subscribe/001.phpt @@ -0,0 +1,21 @@ +--TEST-- +Check stomp_subscribe +--SKIPIF-- + +--FILE-- +subscribe('', array()); +} catch(StompException $e) { + echo $e->getMessage(); +} + +$s->subscribe('/queue/test', 'string'); +?> +--EXPECTF-- +Destination can not be empty +Warning: Stomp::subscribe() expects parameter 2 to be array, string given in %s on line %d diff --git a/tests/008-unsubscribe/001.phpt b/tests/008-unsubscribe/001.phpt new file mode 100644 index 0000000..58e4d1f --- /dev/null +++ b/tests/008-unsubscribe/001.phpt @@ -0,0 +1,21 @@ +--TEST-- +Check stomp_unsubscribe +--SKIPIF-- + +--FILE-- +unsubscribe('', array()); +} catch(StompException $e) { + echo $e->getMessage(); +} + +$s->unsubscribe('/queue/test', 'string'); +?> +--EXPECTF-- +Destination can not be empty +Warning: Stomp::unsubscribe() expects parameter 2 to be array, string given in %s on line %d