From f3f64b6b056c866558eef7d9dd1944b92eb03fc1 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 13 Feb 2008 11:44:34 +0000 Subject: Merged revisions 619974,620011,620014,620016-620017,620479,620481,620566,620584,620619,620622,620831,620854,620861-620862,620889,627128,627133,627141,627153-627155,627157,627171,627187 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/trunk ........ r619974 | rhs | 2008-02-08 18:30:04 +0000 (Fri, 08 Feb 2008) | 1 line made xa tests run, and made QpidTestCase more robust ........ r620011 | rhs | 2008-02-08 22:05:50 +0000 (Fri, 08 Feb 2008) | 1 line whitespace cleanup ........ r620014 | aconway | 2008-02-08 22:23:23 +0000 (Fri, 08 Feb 2008) | 7 lines cpp/examples/direct, fanout: Converted listener.cpp to SubscriptionManager. All python/cpp combos run as part of cpp/examples make check. Fixed problems with verify scripts and VPATH builds. ........ r620016 | cctrieloff | 2008-02-08 22:24:33 +0000 (Fri, 08 Feb 2008) | 2 lines yum correction ........ r620017 | aconway | 2008-02-08 22:27:38 +0000 (Fri, 08 Feb 2008) | 10 lines From Ted Ross, https://issues.apache.org/jira/browse/QPID-782 The attached patch makes the following changes: The --load-dir option has been renamed to --module-dir The --no-modules option and been replaced by the --no-module-dir option. This new option suppresses ONLY the loading of modules from the directory. The --no-data-dir option has been added to suppress the use of a data directory. Logging has been added for data directory lock and unlock. ........ r620479 | gsim | 2008-02-11 13:00:45 +0000 (Mon, 11 Feb 2008) | 3 lines Check valid listener (or handler) exist and log error if not. See QPID-783. ........ r620481 | gsim | 2008-02-11 13:10:38 +0000 (Mon, 11 Feb 2008) | 3 lines Added a test (currently disabled) that highlights a deadlock in the client when commands are sent to the broker concurrently with acks (e.g. when the dispatcher thread is running with auto-acking and messages are sent on another thread). ........ r620566 | rhs | 2008-02-11 18:23:12 +0000 (Mon, 11 Feb 2008) | 1 line bumped release for Beta 3 ........ r620584 | rhs | 2008-02-11 19:21:01 +0000 (Mon, 11 Feb 2008) | 1 line fixed computation of ranged acks, fix needed for failing RecoverTest ........ r620619 | aconway | 2008-02-11 21:43:32 +0000 (Mon, 11 Feb 2008) | 1 line Fix errors in verify scripts. ........ r620622 | aconway | 2008-02-11 21:50:17 +0000 (Mon, 11 Feb 2008) | 1 line Remove dependency on sys::Socket for management ID of connections. ........ r620831 | rhs | 2008-02-12 15:44:14 +0000 (Tue, 12 Feb 2008) | 1 line added help text for the clean-results target ........ r620854 | gsim | 2008-02-12 16:35:45 +0000 (Tue, 12 Feb 2008) | 3 lines Explicitly reset shared pointer; brokers destructor not called if started through -d otherwise it seems... ........ r620861 | gsim | 2008-02-12 16:54:05 +0000 (Tue, 12 Feb 2008) | 3 lines Fixed typo ........ r620862 | aconway | 2008-02-12 16:54:42 +0000 (Tue, 12 Feb 2008) | 2 lines Create /var/lib/qpidd correctly. ........ r620889 | aconway | 2008-02-12 18:04:11 +0000 (Tue, 12 Feb 2008) | 1 line Quote all non-printable ASCII characters (not just control characters) ........ r627128 | aconway | 2008-02-12 21:39:55 +0000 (Tue, 12 Feb 2008) | 1 line Create a tar file of verify scripts suitable for untarring into and installed examples directory. ........ r627133 | aconway | 2008-02-12 21:48:52 +0000 (Tue, 12 Feb 2008) | 2 lines Add -g to build flags to get debug info. ........ r627141 | aconway | 2008-02-12 21:57:36 +0000 (Tue, 12 Feb 2008) | 2 lines Fix verify error in Makefile.am ........ r627153 | rhs | 2008-02-12 22:21:20 +0000 (Tue, 12 Feb 2008) | 1 line increased the test timeout ........ r627154 | rhs | 2008-02-12 22:22:20 +0000 (Tue, 12 Feb 2008) | 1 line synchronize access to lastWrite future ........ r627155 | rhs | 2008-02-12 22:23:02 +0000 (Tue, 12 Feb 2008) | 1 line added default getConnection() ........ r627157 | rhs | 2008-02-12 22:26:26 +0000 (Tue, 12 Feb 2008) | 1 line added a test for message send followed by immediate connection close; fixed connection close handshaking ........ r627171 | aconway | 2008-02-12 23:09:06 +0000 (Tue, 12 Feb 2008) | 6 lines Patches from Ted Ross: Fix for bignumber problem in the management console. Fix for broker crash when sessions are closed via management. ........ r627187 | rhs | 2008-02-12 23:52:01 +0000 (Tue, 12 Feb 2008) | 1 line applied patch from rajith to fix reply-to ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@627359 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/bin/verify | 10 +- qpid/bin/verify_all | 12 - qpid/cpp/README | 2 +- qpid/cpp/examples/Makefile.am | 69 +- qpid/cpp/examples/examples/direct/listener.cpp | 86 +- qpid/cpp/examples/examples/direct/verify.in | 9 +- .../cpp/examples/examples/direct/verify_cpp_python | 2 +- .../examples/examples/direct/verify_cpp_python.in | 6 +- .../cpp/examples/examples/direct/verify_python_cpp | 2 +- .../examples/examples/direct/verify_python_cpp.in | 9 +- .../examples/examples/fanout/declare_queues.cpp | 17 +- .../examples/examples/fanout/fanout_producer.cpp | 8 +- qpid/cpp/examples/examples/fanout/listener.cpp | 86 +- qpid/cpp/examples/examples/fanout/verify.in | 9 +- .../cpp/examples/examples/fanout/verify_cpp_python | 2 +- .../examples/examples/fanout/verify_cpp_python.in | 6 +- .../cpp/examples/examples/fanout/verify_python_cpp | 4 +- .../examples/examples/fanout/verify_python_cpp.in | 15 + qpid/cpp/examples/examples/pub-sub/verify.in | 2 +- .../examples/examples/pub-sub/verify_cpp_python | 2 +- .../examples/examples/pub-sub/verify_cpp_python.in | 4 +- .../examples/examples/pub-sub/verify_python_cpp | 2 +- .../examples/examples/pub-sub/verify_python_cpp.in | 2 +- qpid/cpp/examples/examples/request-response/verify | 2 +- .../examples/examples/request-response/verify.in | 4 +- .../examples/request-response/verify_cpp_python | 5 +- .../examples/request-response/verify_cpp_python.in | 4 +- .../examples/request-response/verify_python_cpp | 7 +- .../examples/request-response/verify_python_cpp.in | 18 + qpid/cpp/examples/verify_all | 24 + qpid/cpp/qpidc.spec.in | 20 +- qpid/cpp/src/qpid/DataDir.cpp | 10 + qpid/cpp/src/qpid/broker/Broker.cpp | 5 +- qpid/cpp/src/qpid/broker/Broker.h | 1 + qpid/cpp/src/qpid/broker/ConnectionFactory.cpp | 4 +- qpid/cpp/src/qpid/broker/ConnectionFactory.h | 4 +- qpid/cpp/src/qpid/broker/SessionState.cpp | 2 + qpid/cpp/src/qpid/client/Dispatcher.cpp | 15 +- qpid/cpp/src/qpid/log/Statement.cpp | 23 +- qpid/cpp/src/qpid/management/ManagementAgent.cpp | 10 +- qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 4 +- .../src/qpid/sys/ConnectionInputHandlerFactory.h | 10 +- qpid/cpp/src/qpidd.cpp | 20 +- qpid/cpp/src/tests/ClientSessionTest.cpp | 48 +- qpid/cpp/src/tests/logging.cpp | 6 +- .../org/apache/qpid/client/AMQDestination.java | 8 + .../java/org/apache/qpidity/nclient/Client.java | 38 + .../client/connection/ConnectionCloseTest.java | 72 + .../apache/qpid/test/unit/xa/AbstractXATest.java | 131 -- .../qpid/test/unit/xa/AbstractXATestCase.java | 131 ++ .../org/apache/qpid/test/unit/xa/QueueTest.java | 643 ++++++++ .../org/apache/qpid/test/unit/xa/QueueTests.java | 643 -------- .../org/apache/qpid/test/unit/xa/TopicTest.java | 1708 ++++++++++++++++++++ .../org/apache/qpid/test/unit/xa/TopicTests.java | 1708 -------------------- .../org/apache/qpid/testutil/QpidTestCase.java | 35 +- qpid/java/common.xml | 7 + .../java/org/apache/qpidity/transport/Session.java | 11 +- .../qpidity/transport/network/mina/MinaSender.java | 15 +- qpid/java/module.xml | 2 +- qpid/python/examples/direct/verify.in | 6 +- qpid/python/examples/fanout/verify.in | 6 +- qpid/python/examples/pubsub/verify.in | 2 +- qpid/python/examples/request-response/verify.in | 4 +- qpid/python/tests_0-10/message.py | 68 +- 64 files changed, 3022 insertions(+), 2828 deletions(-) delete mode 100755 qpid/bin/verify_all create mode 100644 qpid/cpp/examples/examples/fanout/verify_python_cpp.in create mode 100644 qpid/cpp/examples/examples/request-response/verify_python_cpp.in create mode 100755 qpid/cpp/examples/verify_all create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java delete mode 100644 qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java delete mode 100644 qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java delete mode 100644 qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java (limited to 'qpid') diff --git a/qpid/bin/verify b/qpid/bin/verify index f35db898cf..844e99b765 100755 --- a/qpid/bin/verify +++ b/qpid/bin/verify @@ -36,11 +36,16 @@ background() { waitfor $out "$pattern" } +name() { + for x in $*; do name="$name `basename $x`"; done + echo $name; +} + outputs() { wait 2> /dev/null # Wait for all backgroud processes to complete rm -f $script.out for f in "$@"; do - { echo "==== $f"; eval "cat $f"; } >> $script.out || fail + { echo "==== `name $f`"; eval "cat $f"; } >> $script.out || fail done } @@ -49,6 +54,7 @@ verify() { if [ -d $1 ]; then dir=$1; script=verify; else dir=`dirname $1`; script=`basename $1`; fi cd $dir || return 1 + rm -f *.out { source ./$script && diff -ac $script.out $script.in ; } || fail test -z "$FAIL" && rm -f *.out return $FAIL @@ -59,7 +65,7 @@ remove_uuid() { sed "s/$HEX\{8\}-$HEX\{4\}-$HEX\{4\}-$HEX\{4\}-$HEX\{12\}//g" $* } remove_uuid64() { - sed 's/[-A-Za-z0-9_]\{22\}==$//' $* + sed 's/[-A-Za-z0-9_]\{22\}==//g' $* } # Start private broker if QPIDD is set. diff --git a/qpid/bin/verify_all b/qpid/bin/verify_all deleted file mode 100755 index 6be460c701..0000000000 --- a/qpid/bin/verify_all +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/sh -# Find example verify scripts and run them. -# Usage: verify_all search-dir - -if [ `basename $1` = examples ]; then exdirs=$1 -else exdirs=`find $1 -name examples -a -type d`; fi -scripts=`find $exdirs -name verify -o -name verify_cpp_python -o -name verify_cpp_java` -verify=`dirname $0`/verify -for s in $scripts; do $verify $s; done - - - diff --git a/qpid/cpp/README b/qpid/cpp/README index 7c34293007..7de7fd3f98 100644 --- a/qpid/cpp/README +++ b/qpid/cpp/README @@ -60,7 +60,7 @@ all of the above plus: On linux most packages can be installed using your distribution's package management tool. For example on Fedora: - # yum install pkgconfig e2fsprogs boost-devel cppunit-devel openais ruby + # yum install pkgconfig e2fsprogs boost-devel cppunit-devel openais-devel ruby # yum install make gcc-c++ autoconf automake libtool doxygen help2man graphviz # yum install e2fsprogs-devel diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am index fdbea3b878..92625550e2 100644 --- a/qpid/cpp/examples/Makefile.am +++ b/qpid/cpp/examples/Makefile.am @@ -17,7 +17,36 @@ nobase_pkgdata_DATA= \ examples/direct/listener.cpp \ examples/direct/declare_queues.cpp -EXTRA_DIST=$(nobase_pkgdata_DATA) +VERIFY_FILES= verify verify_all \ + examples/request-response/verify \ + examples/request-response/verify.in \ + examples/request-response/verify_cpp_python \ + examples/request-response/verify_cpp_python.in \ + examples/request-response/verify_python_cpp \ + examples/request-response/verify_python_cpp.in \ + examples/fanout/verify \ + examples/fanout/verify.in \ + examples/fanout/verify_cpp_python \ + examples/fanout/verify_cpp_python.in \ + examples/fanout/verify_python_cpp \ + examples/fanout/verify_python_cpp.in \ + examples/pub-sub/verify \ + examples/pub-sub/verify.in \ + examples/pub-sub/verify_cpp_python \ + examples/pub-sub/verify_cpp_python.in \ + examples/pub-sub/verify_python_cpp \ + examples/pub-sub/verify_python_cpp.in \ + examples/direct/verify \ + examples/direct/verify.in \ + examples/direct/verify_cpp_python \ + examples/direct/verify_cpp_python.in \ + examples/direct/verify_python_cpp \ + examples/direct/verify_python_cpp.in + +EXTRA_DIST=$(nobase_pkgdata_DATA) $(VERIFY_FILES) + +verify: + cp $(top_srcdir)/../bin/verify $@ # Note: we don't use normal automake SUBDIRS because the example # makefiles don't understand all the recursive automake targets. @@ -29,31 +58,23 @@ clean-local: abs_top_builddir=@abs_top_builddir@ abs_top_srcdir=@abs_top_srcdir@ -VERIFY=$(top_srcdir)/../bin/verify -PYTHON_EXAMPLES=$(top_srcdir)/../python/examples -EXAMPLES= \ - examples/pub-sub \ - examples/fanout \ - examples/direct \ - examples/request-response \ - $(PYTHON_EXAMPLES)/pubsub \ - $(PYTHON_EXAMPLES)/fanout \ - $(PYTHON_EXAMPLES)/direct \ - $(PYTHON_EXAMPLES)/request-response - -# Build the examples in the source tree. +# Build the examples - copy sources to the build tree in VPATH build. all-local: + test -d examples || cp -R $(srcdir)/examples . cd examples && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(CXXFLAGS) -I../../$(top_srcdir)/src -I../../$(top_srcdir)/src/gen -I../../$(top_builddir)/src/gen -L../../$(top_builddir)/src/.libs -Wl,-rpath,$(abs_top_builddir)/src/.libs" all # Verify the examples in the buid tree. -check-local: all-local - QPID_DATA_DIR= QPIDD=$(top_builddir)/src/qpidd $(VERIFY) $(EXAMPLES) - -# Build and verify the installed examples, then clean up to avoid rpmbuild warnings. -EXAMPLE_FLAGS=-I$(DESTDIR)$(includedir) -L$(DESTDIR)$(libdir) -Wl,-rpath,$(DESTDIR)$(libdir) -EXAMPLE_DIR=$(DESTDIR)$(pkgdatadir)/examples -installcheck-local: - cd $(EXAMPLE_DIR) && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(EXAMPLE_FLAGS)" all - cd $(EXAMPLE_DIR) && QPIDD=$(sbindir)/qpidd $(VERIFY) - cd $(EXAMPLE_DIR) && $(MAKE) clean +check-local: all-local verify + $(srcdir)/verify_all $(abs_top_srcdir) $(abs_top_builddir) + +# TODO: +# create a tarball for testing installed examples. +# installcheck-local to use the tarball on installed example and clean up after. +# Build and verify installed C++ examples, clean up to avoid rpmbuild warnings. +# EXAMPLE_FLAGS=-I$(DESTDIR)$(includedir) -L$(DESTDIR)$(libdir) -Wl,-rpath,$(DESTDIR)$(libdir) +# EXAMPLE_DIR=$(DESTDIR)$(pkgdatadir)/examples/cpp +# installcheck-local: +# cd $(EXAMPLE_DIR) && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(EXAMPLE_FLAGS)" all +# cd $(EXAMPLE_DIR) && QPIDD=$(sbindir)/qpidd $(srcdir)/verify * +# cd $(EXAMPLE_DIR) && $(MAKE) clean diff --git a/qpid/cpp/examples/examples/direct/listener.cpp b/qpid/cpp/examples/examples/direct/listener.cpp index 3f92d189de..91b5123c68 100644 --- a/qpid/cpp/examples/examples/direct/listener.cpp +++ b/qpid/cpp/examples/examples/direct/listener.cpp @@ -20,32 +20,15 @@ */ /** - * listener.cpp: - * - * This program is one of three programs designed to be used - * together. These programs do not specify the exchange type - the - * default exchange type is the direct exchange. - * - * declare_queues.cpp: - * - * Creates a queue on a broker, binding a routing key to route - * messages to that queue. - * - * direct_producer.cpp: - * - * Publishes to a broker, specifying a routing key. - * - * listener.cpp (this program): - * - * Reads from a queue on the broker using a message listener. - * + * listener.cpp: This program reads messages fro a queue on + * the broker using a message listener. */ #include #include #include #include -#include +#include #include #include @@ -56,42 +39,25 @@ using namespace qpid::framing; class Listener : public MessageListener{ -private: - std::string destination_name; - Dispatcher dispatcher; -public: - Listener(Session& session, string destination_name): - destination_name(destination_name), - dispatcher(session) - {}; - - virtual void listen(); - virtual void received(Message& message); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating listener for: " <1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; @@ -103,25 +69,15 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- + SubscriptionManager subscriptions(session); + // Create a listener and subscribe it to the queue named "message_queue" + Listener listener(subscriptions); + subscriptions.subscribe(listener, "message_queue"); + // Deliver messages until the subscription is cancelled + // by Listener::received() + subscriptions.run(); - // Subscribe to the queue, route it to a client destination for - // the listener. (The destination name merely identifies the - // destination in the listener, you can use any name as long as - // you use the same name for the listener). - - session.messageSubscribe(arg::queue="message_queue", arg::destination="listener_destination"); - - //############## - session.messageFlow(arg::destination="listener_destination", arg::unit=0, arg::value=1);//messages ### Define a constant? - session.messageFlow(arg::destination="listener_destination", arg::unit=1, arg::value=0xFFFFFFFF);//bytes ###### Define a constant? - - // Tell the listener to listen to the destination we just - // created above. - - Listener listener(session, "listener_destination"); - listener.listen(); - - //----------------------------------------------------------------------------- + //--------------------------------------------------------------------------- connection.close(); return 0; diff --git a/qpid/cpp/examples/examples/direct/verify.in b/qpid/cpp/examples/examples/direct/verify.in index afe36335c7..d1e95f1151 100644 --- a/qpid/cpp/examples/examples/direct/verify.in +++ b/qpid/cpp/examples/examples/direct/verify.in @@ -1,7 +1,6 @@ -==== ./declare_queues.out -==== ./direct_producer.out -==== ./listener.out -Activating listener for: listener_destination +==== declare_queues.out +==== direct_producer.out +==== listener.out Message: Message 0 Message: Message 1 Message: Message 2 @@ -13,4 +12,4 @@ Message: Message 7 Message: Message 8 Message: Message 9 Message: That's all, folks! -Shutting down listener for listener_destination +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/direct/verify_cpp_python b/qpid/cpp/examples/examples/direct/verify_cpp_python index 5ce3681d90..4dc445ba27 100644 --- a/qpid/cpp/examples/examples/direct/verify_cpp_python +++ b/qpid/cpp/examples/examples/direct/verify_cpp_python @@ -1,4 +1,4 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/direct +py=$PYTHON_EXAMPLES/direct clients ./declare_queues ./direct_producer $py/direct_consumer.py outputs ./declare_queues.out ./direct_producer.out $py/direct_consumer.py.out diff --git a/qpid/cpp/examples/examples/direct/verify_cpp_python.in b/qpid/cpp/examples/examples/direct/verify_cpp_python.in index 0952dbe405..1a329be59a 100644 --- a/qpid/cpp/examples/examples/direct/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/direct/verify_cpp_python.in @@ -1,6 +1,6 @@ -==== ./declare_queues.out -==== ./direct_producer.out -==== ../../../../python/examples/direct/direct_consumer.py.out +==== declare_queues.out +==== direct_producer.out +==== direct_consumer.py.out Message 0 Message 1 Message 2 diff --git a/qpid/cpp/examples/examples/direct/verify_python_cpp b/qpid/cpp/examples/examples/direct/verify_python_cpp index 43c5339150..fe4893e120 100644 --- a/qpid/cpp/examples/examples/direct/verify_python_cpp +++ b/qpid/cpp/examples/examples/direct/verify_python_cpp @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/direct +py=$PYTHON_EXAMPLES/direct clients $py/declare_queues.py $py/direct_producer.py ./listener outputs $py/declare_queues.py.out $py/direct_producer.py.out ./listener.out diff --git a/qpid/cpp/examples/examples/direct/verify_python_cpp.in b/qpid/cpp/examples/examples/direct/verify_python_cpp.in index b1907e6c3b..6f35255b18 100644 --- a/qpid/cpp/examples/examples/direct/verify_python_cpp.in +++ b/qpid/cpp/examples/examples/direct/verify_python_cpp.in @@ -1,7 +1,6 @@ -==== ../../../../python/examples/direct/declare_queues.py.out -==== ../../../../python/examples/direct/direct_producer.py.out -==== ./listener.out -Activating listener for: listener_destination +==== declare_queues.py.out +==== direct_producer.py.out +==== listener.out Message: message 0 Message: message 1 Message: message 2 @@ -13,4 +12,4 @@ Message: message 7 Message: message 8 Message: message 9 Message: That's all, folks! -Shutting down listener for listener_destination +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/fanout/declare_queues.cpp b/qpid/cpp/examples/examples/fanout/declare_queues.cpp index 6b1eabfeec..c76ed54730 100644 --- a/qpid/cpp/examples/examples/fanout/declare_queues.cpp +++ b/qpid/cpp/examples/examples/fanout/declare_queues.cpp @@ -20,21 +20,16 @@ */ /** - * direct_config_queues.cpp + * declare_queues.cpp (this program): * * This program is one of three programs designed to be used - * together. These programs use the "amq.direct" exchange. + * together. These programs use the "amq.fanout" exchange. * - * direct_config_queues.cpp (this program): - * - * Creates a queue on a broker, binding a routing key to route - * messages to that queue. - * - * direct_publisher.cpp: + * fanout_producer.cpp: * * Publishes to a broker, specifying a routing key. * - * direct_listener.cpp + * listener.cpp * * Reads from a queue on the broker using a message listener. * @@ -65,9 +60,7 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- - // Create a queue named "message_queue", and route all messages whose - // routing key is "routing_key to this newly created queue. - + // Create and bind a queue named "message_queue". session.queueDeclare(arg::queue="message_queue"); session.queueBind(arg::queue="message_queue", arg::exchange="amq.fanout"); diff --git a/qpid/cpp/examples/examples/fanout/fanout_producer.cpp b/qpid/cpp/examples/examples/fanout/fanout_producer.cpp index a5904e6731..7521724920 100644 --- a/qpid/cpp/examples/examples/fanout/fanout_producer.cpp +++ b/qpid/cpp/examples/examples/fanout/fanout_producer.cpp @@ -21,22 +21,22 @@ /** - * direct_publisher.cpp: + * fanout_producer.cpp: * * This program is one of three programs designed to be used * together. These programs do not specify the exchange type - the * default exchange type is the direct exchange. * - * direct_config_queues.cpp: + * declare_queues.cpp: * * Creates a queue on a broker, binding a routing key to route * messages to that queue. * - * direct_publisher.cpp (this program): + * fanout_producer.cpp (this program): * * Publishes to a broker, specifying a routing key. * - * direct_listener.cpp + * listener.cpp * * Reads from a queue on the broker using a message listener. * diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp index 3f94f73f48..91b5123c68 100644 --- a/qpid/cpp/examples/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/examples/fanout/listener.cpp @@ -20,32 +20,15 @@ */ /** - * direct_listener.cpp: - * - * This program is one of three programs designed to be used - * together. These programs do not specify the exchange type - the - * default exchange type is the direct exchange. - * - * direct_config_queues.cpp: - * - * Creates a queue on a broker, binding a routing key to route - * messages to that queue. - * - * direct_publisher.cpp: - * - * Publishes to a broker, specifying a routing key. - * - * direct_listener.cpp (this program): - * - * Reads from a queue on the broker using a message listener. - * + * listener.cpp: This program reads messages fro a queue on + * the broker using a message listener. */ #include #include #include #include -#include +#include #include #include @@ -56,43 +39,25 @@ using namespace qpid::framing; class Listener : public MessageListener{ -private: - std::string destination_name; - Dispatcher dispatcher; -public: - Listener(Session& session, string destination_name): - destination_name(destination_name), - dispatcher(session) - {}; - - virtual void listen(); - virtual void received(Message& message); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating listener for: " <1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; @@ -104,24 +69,15 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- + SubscriptionManager subscriptions(session); + // Create a listener and subscribe it to the queue named "message_queue" + Listener listener(subscriptions); + subscriptions.subscribe(listener, "message_queue"); + // Deliver messages until the subscription is cancelled + // by Listener::received() + subscriptions.run(); - // Subscribe to the queue, route it to a client destination for - // the listener. (The destination name merely identifies the - // destination in the listener, you can use any name as long as - // you use the same name for the listener). - - session.messageSubscribe(arg::queue="message_queue", arg::destination="listener_destination"); - - session.messageFlow(arg::destination="listener_destination", arg::unit=0, arg::value=1);//messages ### Define a constant? - session.messageFlow(arg::destination="listener_destination", arg::unit=1, arg::value=0xFFFFFFFF);//bytes ###### Define a constant? - - // Tell the listener to listen to the destination we just - // created above. - - Listener listener(session, "listener_destination"); - listener.listen(); - - //----------------------------------------------------------------------------- + //--------------------------------------------------------------------------- connection.close(); return 0; diff --git a/qpid/cpp/examples/examples/fanout/verify.in b/qpid/cpp/examples/examples/fanout/verify.in index 9d1c1445e1..23d08f38dd 100644 --- a/qpid/cpp/examples/examples/fanout/verify.in +++ b/qpid/cpp/examples/examples/fanout/verify.in @@ -1,7 +1,6 @@ -==== ./declare_queues.out -==== ./fanout_producer.out -==== ./listener.out -Activating listener for: listener_destination +==== declare_queues.out +==== fanout_producer.out +==== listener.out Message: Message 0 Message: Message 1 Message: Message 2 @@ -13,4 +12,4 @@ Message: Message 7 Message: Message 8 Message: Message 9 Message: That's all, folks! -Shutting down listener for listener_destination +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python b/qpid/cpp/examples/examples/fanout/verify_cpp_python index 59fa63478c..f53784ef1c 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/fanout +py=$PYTHON_EXAMPLES/fanout clients ./declare_queues ./fanout_producer $py/fanout_consumer.py outputs ./declare_queues.out ./fanout_producer.out $py/fanout_consumer.py.out diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in index 141e7c6a84..cb9e52cfcc 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in @@ -1,6 +1,6 @@ -==== ./declare_queues.out -==== ./fanout_producer.out -==== ../../../../python/examples/fanout/fanout_consumer.py.out +==== declare_queues.out +==== fanout_producer.out +==== fanout_consumer.py.out Message 0 Message 1 Message 2 diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp b/qpid/cpp/examples/examples/fanout/verify_python_cpp index cef36bd287..00a0727352 100644 --- a/qpid/cpp/examples/examples/fanout/verify_python_cpp +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/fanout -clients $py/declare_queues.py $py/fanout_producer .py ./listener +py=$PYTHON_EXAMPLES/fanout +clients $py/declare_queues.py $py/fanout_producer.py ./listener outputs $py/declare_queues.py.out $py/fanout_producer.py.out ./listener.out diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in new file mode 100644 index 0000000000..87a1694e6c --- /dev/null +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in @@ -0,0 +1,15 @@ +==== declare_queues.py.out +==== fanout_producer.py.out +==== listener.out +Message: message 0 +Message: message 1 +Message: message 2 +Message: message 3 +Message: message 4 +Message: message 5 +Message: message 6 +Message: message 7 +Message: message 8 +Message: message 9 +Message: That's all, folks! +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/pub-sub/verify.in b/qpid/cpp/examples/examples/pub-sub/verify.in index ac51038a1b..6413c5c788 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify.in +++ b/qpid/cpp/examples/examples/pub-sub/verify.in @@ -1,4 +1,4 @@ -==== ./topic_publisher.out +==== topic_publisher.out ==== topic_listener.out | remove_uuid | sort Declaring queue: europe Declaring queue: news diff --git a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python index ff711a310f..ecc573eed3 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python +++ b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/pubsub +py=$PYTHON_EXAMPLES/pubsub background "Queues created" $py/topic_subscriber.py clients ./topic_publisher outputs ./topic_publisher.out "$py/topic_subscriber.py.out | remove_uuid64 | sort" diff --git a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in index 9e033dc25c..b3c9e750f5 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in @@ -1,5 +1,5 @@ -==== ./topic_publisher.out -==== ../../../../python/examples/pubsub/topic_subscriber.py.out | remove_uuid64 | sort +==== topic_publisher.out +==== topic_subscriber.py.out | remove_uuid64 | sort Message 0 Message 0 Message 0 diff --git a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp index e9c72c94d7..2ddaad58c2 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp +++ b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/pubsub +py=$PYTHON_EXAMPLES/pubsub background "Listening" ./topic_listener clients $py/topic_publisher.py outputs $py/topic_publisher.py.out "topic_listener.out | remove_uuid | sort" diff --git a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in index 6f92458792..97fccf0a32 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in +++ b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in @@ -1,4 +1,4 @@ -==== ../../../../python/examples/pubsub/topic_publisher.py.out +==== topic_publisher.py.out ==== topic_listener.out | remove_uuid | sort Declaring queue: europe Declaring queue: news diff --git a/qpid/cpp/examples/examples/request-response/verify b/qpid/cpp/examples/examples/request-response/verify index a8c942b5e6..76007ff8d2 100644 --- a/qpid/cpp/examples/examples/request-response/verify +++ b/qpid/cpp/examples/examples/request-response/verify @@ -2,4 +2,4 @@ background "Waiting" ./server clients ./client kill %% # Must kill the server. -outputs "./client.out | remove_uuid" " server.out | remove_uuid" +outputs "./client.out | remove_uuid" "server.out | remove_uuid" diff --git a/qpid/cpp/examples/examples/request-response/verify.in b/qpid/cpp/examples/examples/request-response/verify.in index 3767fb3f43..7925dc5671 100644 --- a/qpid/cpp/examples/examples/request-response/verify.in +++ b/qpid/cpp/examples/examples/request-response/verify.in @@ -1,4 +1,4 @@ -==== ./client.out | remove_uuid +==== client.out | remove_uuid Activating response queue listener for: client Request: Twas brillig, and the slithy toves Request: Did gire and gymble in the wabe. @@ -10,7 +10,7 @@ Response: DID GIRE AND GYMBLE IN THE WABE. Response: ALL MIMSY WERE THE BOROGROVES, Response: AND THE MOME RATHS OUTGRABE. Shutting down listener for client -==== server.out | remove_uuid +==== server.out | remove_uuid Activating request queue listener for: request Waiting for requests Request: Twas brillig, and the slithy toves (client) diff --git a/qpid/cpp/examples/examples/request-response/verify_cpp_python b/qpid/cpp/examples/examples/request-response/verify_cpp_python index 9470110bc2..9d71d51c37 100644 --- a/qpid/cpp/examples/examples/request-response/verify_cpp_python +++ b/qpid/cpp/examples/examples/request-response/verify_cpp_python @@ -1,6 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/request-response -background "Request server running" $py/server.py +background "Request server running" $PYTHON_EXAMPLES/request-response/server.py clients ./client kill %% # Must kill the server. -outputs "./client.out | remove_uuid" "$py/server.py.out | remove_uuid64" +outputs "./client.out | remove_uuid" "$PYTHON_EXAMPLES/request-response/server.py.out | remove_uuid64" diff --git a/qpid/cpp/examples/examples/request-response/verify_cpp_python.in b/qpid/cpp/examples/examples/request-response/verify_cpp_python.in index efffc32332..280484bd2a 100644 --- a/qpid/cpp/examples/examples/request-response/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/request-response/verify_cpp_python.in @@ -1,4 +1,4 @@ -==== ./client.out | remove_uuid +==== client.out | remove_uuid Activating response queue listener for: client Request: Twas brillig, and the slithy toves Request: Did gire and gymble in the wabe. @@ -10,6 +10,6 @@ Response: DID GIRE AND GYMBLE IN THE WABE. Response: ALL MIMSY WERE THE BOROGROVES, Response: AND THE MOME RATHS OUTGRABE. Shutting down listener for client -==== ../../../../python/examples/request-response/server.py.out | remove_uuid64 +==== server.py.out | remove_uuid64 Request server running - run your client now. (Times out after 100 seconds ...) diff --git a/qpid/cpp/examples/examples/request-response/verify_python_cpp b/qpid/cpp/examples/examples/request-response/verify_python_cpp index 11f5c4cbc4..9f3f1caaf4 100644 --- a/qpid/cpp/examples/examples/request-response/verify_python_cpp +++ b/qpid/cpp/examples/examples/request-response/verify_python_cpp @@ -1,6 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/request-response -background "Request server running" ./server -clients $py/client.py +background "Waiting" ./server +clients $PYTHON_EXAMPLES/request-response/client.py kill %% # Must kill the server. -outputs "./client.py.out | remove_uuid64" " server.out | remove_uuid" +outputs "$PYTHON_EXAMPLES/request-response/client.py.out | remove_uuid64" "server.out | remove_uuid64" diff --git a/qpid/cpp/examples/examples/request-response/verify_python_cpp.in b/qpid/cpp/examples/examples/request-response/verify_python_cpp.in new file mode 100644 index 0000000000..7718d54973 --- /dev/null +++ b/qpid/cpp/examples/examples/request-response/verify_python_cpp.in @@ -0,0 +1,18 @@ +==== client.py.out | remove_uuid64 +Request: Twas brilling, and the slithy toves +Request: Did gyre and gimble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Messages queue: ReplyTo: +Response: TWAS BRILLING, AND THE SLITHY TOVES +Response: DID GYRE AND GIMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +No more messages! +==== server.out | remove_uuid64 +Activating request queue listener for: request +Waiting for requests +Request: Twas brilling, and the slithy toves (ReplyTo:) +Request: Did gyre and gimble in the wabe. (ReplyTo:) +Request: All mimsy were the borogroves, (ReplyTo:) +Request: And the mome raths outgrabe. (ReplyTo:) diff --git a/qpid/cpp/examples/verify_all b/qpid/cpp/examples/verify_all new file mode 100755 index 0000000000..e6d75929b2 --- /dev/null +++ b/qpid/cpp/examples/verify_all @@ -0,0 +1,24 @@ +#!/bin/sh +# Verify all C++/python example combinations. +# + +src=$1 ; build=$2 + +verify=./verify +qpidd=$build/src/qpidd +cpp=$build/examples/examples +python=$src/../python + +trap "$qpidd -q" exit +export QPID_PORT=`$qpidd -dp0 --data-dir ""` +export PYTHON_EXAMPLES=$python/examples +export PYTHONPATH=$python:$PYTHONPATH +export AMQP_SPEC=$src/../specs/amqp.0-10-preview.xml + +find="find $cpp" +test -d $PYTHON_EXAMPLES && find="$find $PYTHON_EXAMPLES" +find="$find -name verify" +test -d $PYTHON_EXAMPLES && \ + find="$find -o -name verify_cpp_python -o -name verify_python_cpp" +$verify `$find` + diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in index 45d67dff6e..3892b1ab40 100644 --- a/qpid/cpp/qpidc.spec.in +++ b/qpid/cpp/qpidc.spec.in @@ -5,7 +5,7 @@ Name: @PACKAGE@ Version: @VERSION@ -Release: 18%{?dist} +Release: 22%{?dist} Summary: Libraries for Qpid C++ client applications Group: System Environment/Libraries License: Apache Software License @@ -73,16 +73,16 @@ Qpid broker daemon. %setup -q %build -%configure --disable-static --without-cpg CXXFLAGS="-O3 -DNDEBUG" +%configure --disable-static --without-cpg CXXFLAGS="-g -O3 -DNDEBUG" make %{?_smp_mflags} # Remove this generated perl file, we don't need it and it upsets rpmlint. rm docs/api/html/installdox %install rm -rf %{buildroot} -make install DESTDIR=%{buildroot} -install -Dp -m0755 etc/qpidd %{buildroot}%{_initrddir}/qpidd -install -d -m0755 %{buildroot}%{_localstatedir}/qpidd +make install-strip DESTDIR=%{buildroot} +install -Dp -m0755 etc/qpidd %{buildroot}%{_initrddir}/qpidd +install -d -m0755 %{buildroot}%{_localstatedir}/lib/qpidd rm -f %{buildroot}%_libdir/*.a rm -f %{buildroot}%_libdir/*.la @@ -99,7 +99,6 @@ make check %_libdir/libqpidcommon.so.0.1.0 %_libdir/libqpidclient.so.0 %_libdir/libqpidclient.so.0.1.0 -%_localstatedir/qpidd %config(noreplace) %_sysconfdir/qpidd.conf %files devel @@ -123,6 +122,7 @@ make check %_libdir/libqpidcluster.so.0.1.0 %_sbindir/%{qpidd} %{_initrddir}/%{qpidd} +%_localstatedir/lib/qpidd %doc %_mandir/man1/%{qpidd}.* %files -n %{qpidd}-devel @@ -156,6 +156,14 @@ fi /sbin/ldconfig %changelog +* Tue Feb 12 2008 Alan Conway - 0.2-22 +- Added -g to compile flags for debug symbols. + +* Tue Feb 12 2008 Alan Conway - 0.2-21 +- Create /var/lib/qpidd correctly. + +* Mon Feb 11 2008 Rafael Schloming - 0.2-20 +- bumped for Beta 3 * Mon Jan 21 2008 Gordon Sim - 0.2-18 - bump up rev for recent changes to plugin modules & mgmt diff --git a/qpid/cpp/src/qpid/DataDir.cpp b/qpid/cpp/src/qpid/DataDir.cpp index baf536c109..abf9b061e4 100644 --- a/qpid/cpp/src/qpid/DataDir.cpp +++ b/qpid/cpp/src/qpid/DataDir.cpp @@ -20,6 +20,7 @@ #include "Exception.h" #include "DataDir.h" +#include "qpid/log/Statement.h" #include #include #include @@ -32,7 +33,10 @@ DataDir::DataDir (std::string path) : dirPath (path) { if (!enabled) + { + QPID_LOG (info, "No data directory - Disabling persistent configuration"); return; + } const char *cpath = dirPath.c_str (); struct stat s; @@ -55,14 +59,20 @@ DataDir::DataDir (std::string path) : oss << "Error locking data directory: errno=" << errno; throw Exception (oss.str ()); } + + QPID_LOG (info, "Locked data directory: " << dirPath); } DataDir::~DataDir () { + if (dirPath.empty ()) + return; + std::string lockFile (dirPath); lockFile = lockFile + "/lock"; ::unlink (lockFile.c_str ()); + QPID_LOG (info, "Unlocked data directory: " << dirPath); } } // namespace qpid diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 1d55db0c0f..117a93b571 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -60,6 +60,7 @@ namespace broker { Broker::Options::Options(const std::string& name) : qpid::Options(name), + noDataDir(0), dataDir("/var/lib/qpidd"), port(DEFAULT_PORT), workerThreads(5), @@ -75,6 +76,8 @@ Broker::Options::Options(const std::string& name) : addOptions() ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") + ("no-data-dir", optValue(noDataDir), + "Don't use a data directory. No persistent configuration will be loaded or stored") ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") ("worker-threads", optValue(workerThreads, "N"), @@ -103,7 +106,7 @@ const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : config(conf), store(0), - dataDir(conf.dataDir), + dataDir(conf.noDataDir ? std::string () : conf.dataDir), factory(*this), sessionManager(conf.ack) { diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 852cb2da42..68dbf570f0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -61,6 +61,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M struct Options : public qpid::Options { Options(const std::string& name="Broker Options"); + bool noDataDir; std::string dataDir; uint16_t port; int workerThreads; diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index 72df9c44e9..9577853de4 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -36,9 +36,9 @@ ConnectionFactory::~ConnectionFactory() qpid::sys::ConnectionInputHandler* ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out, - const qpid::sys::Socket& s) + const std::string& id) { - return new Connection(out, broker, s.getPeerAddress()); + return new Connection(out, broker, id); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.h b/qpid/cpp/src/qpid/broker/ConnectionFactory.h index 23fba5c1ab..53fb160279 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.h +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.h @@ -32,8 +32,8 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory public: ConnectionFactory(Broker& b); - virtual qpid::sys::ConnectionInputHandler* create - (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s); + virtual qpid::sys::ConnectionInputHandler* + create(qpid::sys::ConnectionOutputHandler* out, const std::string& id); virtual ~ConnectionFactory(); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 1021cca1b1..80fafe0386 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -147,12 +147,14 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, break; case management::Session::METHOD_CLOSE : + /* if (handler != 0) { handler->getConnection().closeChannel(handler->getChannel()); } status = Manageable::STATUS_OK; break; + */ case management::Session::METHOD_SOLICITACK : case management::Session::METHOD_RESETLIFESPAN : diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index 0783d5bc55..8df4637c88 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -75,11 +75,18 @@ void Dispatcher::run() if (content->isA()) { Message msg(*content, session); Subscriber::shared_ptr listener = find(msg.getDestination()); - assert(listener); - listener->received(msg); + if (!listener) { + QPID_LOG(error, "No listener found for destination " << msg.getDestination()); + } else { + assert(listener); + listener->received(msg); + } } else { - assert (handler.get()); - handler->handle(*content); + if (handler.get()) { + handler->handle(*content); + } else { + QPID_LOG(error, "No handler found for " << *(content->getMethod())); + } } } } catch (const ClosedException&) { diff --git a/qpid/cpp/src/qpid/log/Statement.cpp b/qpid/cpp/src/qpid/log/Statement.cpp index 2935de9071..9b6fb7feaf 100644 --- a/qpid/cpp/src/qpid/log/Statement.cpp +++ b/qpid/cpp/src/qpid/log/Statement.cpp @@ -22,6 +22,7 @@ #include #include #include +#include namespace qpid { namespace log { @@ -29,21 +30,21 @@ namespace log { namespace { using namespace std; -struct IsControl { bool operator()(unsigned char c) { return c < 32; } }; +struct NonPrint { bool operator()(unsigned char c) { return !isprint(c); } }; -bool isClean(const std::string& str) { - return std::find_if(str.begin(), str.end(), IsControl()) == str.end(); -} +char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; std::string quote(const std::string& str) { - IsControl isControl; - size_t n = std::count_if(str.begin(), str.end(), isControl); + NonPrint nonPrint; + size_t n = std::count_if(str.begin(), str.end(), nonPrint); + if (n==0) return str; std::string ret; - ret.reserve(str.size()+n); // Avoid extra allocations. + ret.reserve(str.size()+2*n); // Avoid extra allocations. for (string::const_iterator i = str.begin(); i != str.end(); ++i) { - if (isControl(*i)) { - ret.push_back('^'); - ret.push_back((*i)+64); + if (nonPrint(*i)) { + ret.push_back('\\'); + ret.push_back(hex[((*i) >> 4)&0xf]); + ret.push_back(hex[(*i) & 0xf]); } else ret.push_back(*i); } @@ -53,7 +54,7 @@ std::string quote(const std::string& str) { } void Statement::log(const std::string& message) { - Logger::instance().log(*this, isClean(message) ? message : quote(message)); + Logger::instance().log(*this, quote(message)); } Statement::Initializer::Initializer(Statement& s) : statement(s) { diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 39fab270af..709f2a0ecd 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -74,16 +74,16 @@ void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, } void ManagementAgent::addObject (ManagementObject::shared_ptr object, - uint64_t persistenceId, - uint64_t idOffset) + uint64_t /*persistenceId*/, + uint64_t /*idOffset*/) { RWlock::ScopedWlock writeLock (userLock); uint64_t objectId; - if (persistenceId == 0) +// if (persistenceId == 0) objectId = nextObjectId++; - else - objectId = 0x8000000000000000ULL | (persistenceId + idOffset); +// else +// objectId = 0x8000000000000000ULL | (persistenceId + idOffset); object->setObjectId (objectId); managementObjects[objectId] = object; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 65d805e492..9fd32add72 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -140,7 +140,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s); + ConnectionInputHandler* handler = f->create(async, s.getPeerAddress()); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -194,7 +194,7 @@ void AsynchIOAcceptor::connect(const std::string& host, int16_t port, Connection socket->connect(host, port); AsynchIOHandler* async = new AsynchIOHandler; async->setClient(); - ConnectionInputHandler* handler = f->create(async, *socket); + ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress()); AsynchIO* aio = new AsynchIO(*socket, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h index 4a90f8b736..2b309b5758 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h +++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h @@ -22,7 +22,7 @@ #define _ConnectionInputHandlerFactory_ #include -#include "qpid/sys/Socket.h" +#include namespace qpid { namespace sys { @@ -37,7 +37,13 @@ class ConnectionInputHandler; class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0; + /** + *@param out handler for connection output. + *@param id identify the connection for management purposes. + */ + virtual ConnectionInputHandler* create(ConnectionOutputHandler* out, + const std::string& id) = 0; + virtual ~ConnectionInputHandlerFactory(){} }; diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index 6c20ebc58f..08b907cbe2 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -48,9 +48,9 @@ struct ModuleOptions : public qpid::Options { ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false) { addOptions() - ("load-dir", optValue(loadDir, "DIR"), "Load all modules from this directory") - ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded") - ("no-modules", optValue(noLoad), "Don't load any modules"); + ("module-dir", optValue(loadDir, "DIR"), "Load all .so modules in this directory") + ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded") + ("no-module-dir", optValue(noLoad), "Don't load modules from module directory"); } }; @@ -122,7 +122,7 @@ auto_ptr options; void shutdownHandler(int /*signal*/){ // Note: do not call any async-signal unsafe functions here. - // Do any extra shtudown actions in main() after broker->run() + // Do any extra shutdown actions in main() after broker->run() brokerPtr->shutdown(); } @@ -140,6 +140,7 @@ struct QpiddDaemon : public Daemon { uint16_t port=brokerPtr->getPort(); ready(port); // Notify parent. brokerPtr->run(); + brokerPtr.reset(); } }; @@ -188,12 +189,13 @@ int main(int argc, char* argv[]) // be re-parsed with all of the module-supplied options. bootOptions.parse (argc, argv, bootOptions.common.config, true); qpid::log::Logger::instance().configure(bootOptions.log, argv[0]); - if (!bootOptions.module.noLoad) { - for (vector::iterator iter = bootOptions.module.load.begin(); - iter != bootOptions.module.load.end(); - iter++) - tryShlib (iter->data(), false); + for (vector::iterator iter = bootOptions.module.load.begin(); + iter != bootOptions.module.load.end(); + iter++) + tryShlib (iter->data(), false); + + if (!bootOptions.module.noLoad) { bool isDefault = defaultPath == bootOptions.module.loadDir; loadModuleDir (bootOptions.module.loadDir, isDefault); } diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 60cfe04510..87a4f59999 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -21,6 +21,7 @@ #include "unit_test.h" #include "BrokerFixture.h" #include "qpid/client/Dispatcher.h" +#include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/client/Session_0_10.h" @@ -38,6 +39,7 @@ using namespace qpid::client; using namespace qpid::client::arg; using namespace qpid::framing; using namespace qpid; +using qpid::sys::Monitor; using std::string; using std::cout; using std::endl; @@ -67,6 +69,27 @@ struct DummyListener : public sys::Runnable, public MessageListener { } }; +struct SimpleListener : public MessageListener +{ + Monitor lock; + std::vector messages; + + void received(Message& msg) + { + Monitor::ScopedLock l(lock); + messages.push_back(msg); + lock.notifyAll(); + } + + void waitFor(const uint n) + { + Monitor::ScopedLock l(lock); + while (messages.size() < n) { + lock.wait(); + } + } +}; + struct ClientSessionFixture : public ProxySessionFixture { void declareSubscribe(const string& q="my-queue", @@ -167,22 +190,31 @@ BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture) BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); } +/** + * Currently broken due to a deadlock in SessionCore + * BOOST_FIXTURE_TEST_CASE(testSendToSelf, SessionFixture) { - // https://bugzilla.redhat.com/show_bug.cgi?id=410551 // Deadlock if SubscriptionManager run() concurrent with session ack. - LocalQueue myq; + SimpleListener mylistener; session.queueDeclare(queue="myq", exclusive=true, autoDelete=true); - subs.subscribe(myq, "myq"); + subs.subscribe(mylistener, "myq", "myq"); + sys::Thread runner(subs);//start dispatcher thread string data("msg"); Message msg(data, "myq"); - const int count=100; // Verified with count=100000 in a loop. - for (int i = 0; i < count; ++i) + const uint count=10000; + for (uint i = 0; i < count; ++i) { session.messageTransfer(content=msg); - for (int j = 0; j < count; ++j) { - Message m=myq.pop(); - BOOST_CHECK_EQUAL(m.getData(), data); + } + mylistener.waitFor(count); + subs.cancel("myq"); + subs.stop(); + session.close(); + BOOST_CHECK_EQUAL(mylistener.messages.size(), count); + for (uint j = 0; j < count; ++j) { + BOOST_CHECK_EQUAL(mylistener.messages[j].getData(), data); } } +*/ QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/logging.cpp b/qpid/cpp/src/tests/logging.cpp index 4969c3d6a9..2c0ed08105 100644 --- a/qpid/cpp/src/tests/logging.cpp +++ b/qpid/cpp/src/tests/logging.cpp @@ -367,7 +367,7 @@ BOOST_AUTO_TEST_CASE(testLoggerConfigure) { unlink("logging.tmp"); } -BOOST_AUTO_TEST_CASE(testQuoteControlChars) { +BOOST_AUTO_TEST_CASE(testQuoteNonPrintable) { Logger& l=Logger::instance(); l.clear(); Options opts; @@ -375,13 +375,13 @@ BOOST_AUTO_TEST_CASE(testQuoteControlChars) { opts.outputs.push_back("logging.tmp"); opts.time=false; l.configure(opts, "test"); - char s[] = "null\0tab\tspace newline\nret\r"; + char s[] = "null\0tab\tspace newline\nret\r\x80\x99\xff"; string str(s, sizeof(s)); QPID_LOG(critical, str); ifstream log("logging.tmp"); string line; getline(log, line); - string expect="critical null^@tab^Ispace newline^Jret^M^@"; + string expect="critical null\\00tab\\09space newline\\0Aret\\0D\\80\\99\\FF\\00"; BOOST_CHECK_EQUAL(expect, line); log.close(); unlink("logging.tmp"); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 6c7575429a..0e36a09bbd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -231,6 +231,14 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append('?'); + if (_routingKey != null) + { + sb.append(BindingURL.OPTION_ROUTING_KEY); + sb.append("='"); + sb.append(_routingKey).append("'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + if (_isDurable) { sb.append(BindingURL.OPTION_DURABLE); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index 51a052bed5..ad47e14fde 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -16,6 +16,7 @@ import org.apache.qpidity.nclient.impl.ClientSessionDelegate; import org.apache.qpidity.transport.Channel; import org.apache.qpidity.transport.Connection; import org.apache.qpidity.transport.ConnectionClose; +import org.apache.qpidity.transport.ConnectionCloseOk; import org.apache.qpidity.transport.ConnectionDelegate; import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.ProtocolHeader; @@ -33,6 +34,9 @@ public class Client implements org.apache.qpidity.nclient.Connection private ClosedListener _closedListner; private final Lock _lock = new ReentrantLock(); private static Logger _logger = LoggerFactory.getLogger(Client.class); + private Condition closeOk; + private boolean closed = false; + /** * * @return returns a new connection to the broker. @@ -45,6 +49,7 @@ public class Client implements org.apache.qpidity.nclient.Connection public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException { Condition negotiationComplete = _lock.newCondition(); + closeOk = _lock.newCondition(); _lock.lock(); ConnectionDelegate connectionDelegate = new ConnectionDelegate() @@ -76,6 +81,21 @@ public class Client implements org.apache.qpidity.nclient.Connection } } + @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct) + { + _lock.lock(); + try + { + closed = true; + this.receivedClose = true; + closeOk.signalAll(); + } + finally + { + _lock.unlock(); + } + } + @Override public void connectionClose(Channel context, ConnectionClose connectionClose) { ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode()); @@ -179,6 +199,24 @@ public class Client implements org.apache.qpidity.nclient.Connection { Channel ch = _conn.getChannel(0); ch.connectionClose(0, "client is closing", 0, 0); + _lock.lock(); + try + { + try { + while (!closed) + { + closeOk.await(); + } + } + catch (InterruptedException e) + { + // do nothing + } + } + finally + { + _lock.unlock(); + } _conn.close(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java new file mode 100644 index 0000000000..20443944d2 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.testutil.QpidTestCase; +import org.apache.qpidity.transport.util.Logger; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * ConnectionCloseTest + * + */ + +public class ConnectionCloseTest extends QpidTestCase +{ + + private static final Logger log = Logger.get(ConnectionCloseTest.class); + + public void testSendReceiveClose() throws Exception + { + for (int i = 0; i < 500; i++) + { + if ((i % 10) == 0) + { + log.warn("%d messages sent and received", i); + } + + Connection receiver = getConnection(); + receiver.start(); + Session rssn = receiver.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = rssn.createQueue("connection-close-test-queue"); + MessageConsumer cons = rssn.createConsumer(queue); + + Connection sender = getConnection(); + sender.start(); + Session sssn = sender.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = sssn.createProducer(queue); + prod.send(sssn.createTextMessage("test")); + sender.close(); + + TextMessage m = (TextMessage) cons.receive(2000); + assertNotNull("message was lost", m); + assertEquals(m.getText(), "test"); + receiver.close(); + } + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java deleted file mode 100644 index ba4ebae258..0000000000 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.test.unit.xa; - -import org.apache.qpidity.dtx.XidImpl; -import org.apache.qpid.testutil.QpidTestCase; - -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAResource; -import javax.jms.*; - -/** - * - * - */ -public abstract class AbstractXATest extends QpidTestCase -{ - protected static final String _sequenceNumberPropertyName = "seqNumber"; - - /** - * the xaResource associated with the standard session - */ - protected static XAResource _xaResource = null; - - /** - * producer registered with the standard session - */ - protected static MessageProducer _producer = null; - - /** - * consumer registered with the standard session - */ - protected static MessageConsumer _consumer = null; - - /** - * a standard message - */ - protected static TextMessage _message = null; - - /** - * xid counter - */ - private static int _xidCounter = 0; - - - protected void setUp() throws Exception - { - super.setUp(); - init(); - } - - public abstract void init(); - - - - /** - * construct a new Xid - * - * @return a new Xid - */ - protected Xid getNewXid() - { - byte[] branchQualifier; - byte[] globalTransactionID; - int format = _xidCounter; - String branchQualifierSt = "branchQualifier" + _xidCounter; - String globalTransactionIDSt = "globalTransactionID" + _xidCounter; - branchQualifier = branchQualifierSt.getBytes(); - globalTransactionID = globalTransactionIDSt.getBytes(); - _xidCounter++; - return new XidImpl(branchQualifier, format, globalTransactionID); - } - - public void init(XASession session, Destination destination) - { - // get the xaResource - try - { - _xaResource = session.getXAResource(); - } - catch (Exception e) - { - fail("cannot access the xa resource: " + e.getMessage()); - } - // create standard producer - try - { - _producer = session.createProducer(destination); - _producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("cannot create message producer: " + e.getMessage()); - } - // create standard consumer - try - { - _consumer = session.createConsumer(destination); - } - catch (JMSException e) - { - fail("cannot create message consumer: " + e.getMessage()); - } - // create a standard message - try - { - _message = session.createTextMessage(); - _message.setText("test XA"); - } - catch (JMSException e) - { - fail("cannot create standard message: " + e.getMessage()); - } - } -} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java new file mode 100644 index 0000000000..7c03e16258 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java @@ -0,0 +1,131 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.xa; + +import org.apache.qpidity.dtx.XidImpl; +import org.apache.qpid.testutil.QpidTestCase; + +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAResource; +import javax.jms.*; + +/** + * + * + */ +public abstract class AbstractXATestCase extends QpidTestCase +{ + protected static final String _sequenceNumberPropertyName = "seqNumber"; + + /** + * the xaResource associated with the standard session + */ + protected static XAResource _xaResource = null; + + /** + * producer registered with the standard session + */ + protected static MessageProducer _producer = null; + + /** + * consumer registered with the standard session + */ + protected static MessageConsumer _consumer = null; + + /** + * a standard message + */ + protected static TextMessage _message = null; + + /** + * xid counter + */ + private static int _xidCounter = 0; + + + protected void setUp() throws Exception + { + super.setUp(); + init(); + } + + public abstract void init(); + + + + /** + * construct a new Xid + * + * @return a new Xid + */ + protected Xid getNewXid() + { + byte[] branchQualifier; + byte[] globalTransactionID; + int format = _xidCounter; + String branchQualifierSt = "branchQualifier" + _xidCounter; + String globalTransactionIDSt = "globalTransactionID" + _xidCounter; + branchQualifier = branchQualifierSt.getBytes(); + globalTransactionID = globalTransactionIDSt.getBytes(); + _xidCounter++; + return new XidImpl(branchQualifier, format, globalTransactionID); + } + + public void init(XASession session, Destination destination) + { + // get the xaResource + try + { + _xaResource = session.getXAResource(); + } + catch (Exception e) + { + fail("cannot access the xa resource: " + e.getMessage()); + } + // create standard producer + try + { + _producer = session.createProducer(destination); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("cannot create message producer: " + e.getMessage()); + } + // create standard consumer + try + { + _consumer = session.createConsumer(destination); + } + catch (JMSException e) + { + fail("cannot create message consumer: " + e.getMessage()); + } + // create a standard message + try + { + _message = session.createTextMessage(); + _message.setText("test XA"); + } + catch (JMSException e) + { + fail("cannot create standard message: " + e.getMessage()); + } + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java new file mode 100644 index 0000000000..a703432efb --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java @@ -0,0 +1,643 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.xa; + +import javax.jms.*; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAException; + +import junit.framework.TestSuite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueueTest extends AbstractXATestCase +{ + /* this clas logger */ + private static final Logger _logger = LoggerFactory.getLogger(QueueTest.class); + + /** + * the queue use by all the tests + */ + private static Queue _queue = null; + /** + * the queue connection factory used by all tests + */ + private static XAQueueConnectionFactory _queueFactory = null; + /** + * standard queue connection + */ + private static XAQueueConnection _queueConnection = null; + + /** + * standard queue session created from the standard connection + */ + private static QueueSession _nonXASession = null; + + /** + * the queue name + */ + private static final String QUEUENAME = "xaQueue"; + + /** ----------------------------------------------------------------------------------- **/ + /** + * ----------------------------- JUnit support ----------------------------------------- * + */ + + /** + * Gets the test suite tests + * + * @return the test suite tests + */ + public static TestSuite getSuite() + { + return new TestSuite(QueueTest.class); + } + + /** + * Run the test suite. + * + * @param args Any command line arguments specified to this class. + */ + public static void main(String args[]) + { + junit.textui.TestRunner.run(getSuite()); + } + + public void tearDown() throws Exception + { + if (!isBroker08()) + { + try + { + _queueConnection.stop(); + _queueConnection.close(); + } + catch (Exception e) + { + fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); + } + } + super.tearDown(); + } + + /** + * Initialize standard actors + */ + public void init() + { + if (!isBroker08()) + { + // lookup test queue + try + { + _queue = (Queue) getInitialContext().lookup(QUEUENAME); + } + catch (Exception e) + { + fail("cannot lookup test queue " + e.getMessage()); + } + + // lookup connection factory + try + { + _queueFactory = getConnectionFactory(); + } + catch (Exception e) + { + fail("enable to lookup connection factory "); + } + // create standard connection + try + { + _queueConnection = getNewQueueXAConnection(); + } + catch (JMSException e) + { + fail("cannot create queue connection: " + e.getMessage()); + } + // create xa session + XAQueueSession session = null; + try + { + session = _queueConnection.createXAQueueSession(); + } + catch (JMSException e) + { + fail("cannot create queue session: " + e.getMessage()); + } + // create a standard session + try + { + _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); + } + catch (JMSException e) + { + fail("cannot create queue session: " + e.getMessage()); + } + init(session, _queue); + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Test Suite -------------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * Uses two transactions respectively with xid1 and xid2 that are used to send a message + * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. + * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. + */ + public void testProducer() + { + if (!isBroker08()) + { + _logger.debug("running testProducer"); + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + e.printStackTrace(); + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _queueConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUSPEND); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // start the xaResource for xid2 + try + { + _xaResource.start(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid2: " + e.getMessage()); + } + try + { + // produce a message + _message.setLongProperty(_sequenceNumberPropertyName, 2); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send second persistent message: " + e.getMessage()); + } + // end xid2 and start xid1 + try + { + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.start(xid1, XAResource.TMRESUME); + } + catch (XAException e) + { + fail("Exception when ending and starting transactions: " + e.getMessage()); + } + // two phases commit transaction with xid2 + try + { + int resPrepare = _xaResource.prepare(xid2); + if (resPrepare != XAResource.XA_OK) + { + fail("prepare returned: " + resPrepare); + } + _xaResource.commit(xid2, false); + } + catch (XAException e) + { + fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); + } + // receive a message from queue test we expect it to be the second one + try + { + TextMessage message = (TextMessage) _consumer.receiveNoWait(); + if (message == null) + { + fail("did not receive second message as expected "); + } + else + { + if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("receive wrong message its sequence number is: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + } + catch (JMSException e) + { + fail("Exception when receiving second message: " + e.getMessage()); + } + // end and one phase commit the first transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + _xaResource.commit(xid1, true); + } + catch (XAException e) + { + fail("Exception thrown when commiting transaction with xid1"); + } + // We should now be able to receive the first message + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("did not receive first message as expected "); + } + else + { + if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("receive wrong message its sequence number is: " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + } + // commit that transacted session + nonXASession.commit(); + // the queue should be now empty + message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 != null) + { + fail("receive an unexpected message "); + } + } + catch (JMSException e) + { + fail("Exception thrown when emptying the queue: " + e.getMessage()); + } + } + } + + /** + * strategy: Produce a message within Tx1 and prepare tx1. crash the server then commit tx1 and consume the message + */ + public void testSendAndRecover() + { + if (!isBroker08()) + { + _logger.debug("running testSendAndRecover"); + Xid xid1 = getNewXid(); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _queueConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid1); + } + catch (XAException e) + { + fail("Exception when preparing xid1: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + _logger.debug("stopping broker"); + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + // get the list of in doubt transactions + try + { + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 1) + { + fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); + } + + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid1)) + { + System.out.println("commit xid1 "); + try + { + _xaResource.commit(anInDoubt, false); + } + catch (Exception e) + { + System.out.println("PB when aborted xid1"); + } + } + else + { + fail("did not receive right xid "); + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + // the queue should contain the first message! + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); + _queueConnection.start(); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + + if (message1 == null) + { + fail("queue does not contain any message!"); + } + if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("Wrong message returned! Sequence number is " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + nonXASession.commit(); + } + catch (JMSException e) + { + fail("Exception thrown when testin that queue test is not empty: " + e.getMessage()); + } + } + } + + /** + * strategy: Produce a message within Tx1 and prepare tx1. Produce a standard message and consume + * it within tx2 and prepare tx2. Shutdown the server and get the list of in doubt transactions: + * we expect tx1 and tx2! Then, Tx1 is aborted and tx2 is committed so we expect the test's queue to be empty! + */ + public void testRecover() + { + if (!isBroker08()) + { + _logger.debug("running testRecover"); + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _queueConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid1); + } + catch (XAException e) + { + fail("Exception when preparing xid1: " + e.getMessage()); + } + + // send a message using the standard session + try + { + Session nonXASession = _nonXASession; + MessageProducer nonXAProducer = nonXASession.createProducer(_queue); + TextMessage message2 = nonXASession.createTextMessage(); + message2.setText("non XA "); + message2.setLongProperty(_sequenceNumberPropertyName, 2); + nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + nonXAProducer.send(message2); + // commit that transacted session + nonXASession.commit(); + } + catch (Exception e) + { + fail("Exception thrown when emptying the queue: " + e.getMessage()); + } + // start the xaResource for xid2 + try + { + _xaResource.start(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + // receive a message from queue test we expect it to be the second one + try + { + TextMessage message = (TextMessage) _consumer.receiveNoWait(); + if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("did not receive second message as expected "); + } + } + catch (JMSException e) + { + fail("Exception when receiving second message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid2: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid2); + } + catch (XAException e) + { + fail("Exception when preparing xid2: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + _logger.debug("stopping broker"); + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + // get the list of in doubt transactions + try + { + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 2) + { + fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); + } + + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid1)) + { + _logger.debug("rollback xid1 "); + try + { + _xaResource.rollback(anInDoubt); + } + catch (Exception e) + { + System.out.println("PB when aborted xid1"); + } + } + else if (anInDoubt.equals(xid2)) + { + _logger.debug("commit xid2 "); + try + { + _xaResource.commit(anInDoubt, false); + } + catch (Exception e) + { + System.out.println("PB when commiting xid2"); + } + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + // the queue should be empty + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); + _queueConnection.start(); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 != null) + { + fail("The queue is not empty! "); + } + } + catch (JMSException e) + { + fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); + } + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Utility methods --------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * get a new queue connection + * + * @return a new queue connection + * @throws JMSException If the JMS provider fails to create the queue connection + * due to some internal error or in case of authentication failure + */ + private XAQueueConnection getNewQueueXAConnection() throws JMSException + { + return _queueFactory.createXAQueueConnection("guest", "guest"); + } + + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java deleted file mode 100644 index cd5b228f76..0000000000 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java +++ /dev/null @@ -1,643 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.test.unit.xa; - -import javax.jms.*; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAException; - -import junit.framework.TestSuite; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class QueueTests extends AbstractXATest -{ - /* this clas logger */ - private static final Logger _logger = LoggerFactory.getLogger(QueueTests.class); - - /** - * the queue use by all the tests - */ - private static Queue _queue = null; - /** - * the queue connection factory used by all tests - */ - private static XAQueueConnectionFactory _queueFactory = null; - /** - * standard queue connection - */ - private static XAQueueConnection _queueConnection = null; - - /** - * standard queue session created from the standard connection - */ - private static QueueSession _nonXASession = null; - - /** - * the queue name - */ - private static final String QUEUENAME = "xaQueue"; - - /** ----------------------------------------------------------------------------------- **/ - /** - * ----------------------------- JUnit support ----------------------------------------- * - */ - - /** - * Gets the test suite tests - * - * @return the test suite tests - */ - public static TestSuite getSuite() - { - return new TestSuite(QueueTests.class); - } - - /** - * Run the test suite. - * - * @param args Any command line arguments specified to this class. - */ - public static void main(String args[]) - { - junit.textui.TestRunner.run(getSuite()); - } - - public void tearDown() throws Exception - { - if (!isBroker08()) - { - try - { - _queueConnection.stop(); - _queueConnection.close(); - } - catch (Exception e) - { - fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); - } - } - super.tearDown(); - } - - /** - * Initialize standard actors - */ - public void init() - { - if (!isBroker08()) - { - // lookup test queue - try - { - _queue = (Queue) getInitialContext().lookup(QUEUENAME); - } - catch (Exception e) - { - fail("cannot lookup test queue " + e.getMessage()); - } - - // lookup connection factory - try - { - _queueFactory = getConnectionFactory(); - } - catch (Exception e) - { - fail("enable to lookup connection factory "); - } - // create standard connection - try - { - _queueConnection = getNewQueueXAConnection(); - } - catch (JMSException e) - { - fail("cannot create queue connection: " + e.getMessage()); - } - // create xa session - XAQueueSession session = null; - try - { - session = _queueConnection.createXAQueueSession(); - } - catch (JMSException e) - { - fail("cannot create queue session: " + e.getMessage()); - } - // create a standard session - try - { - _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); - } - catch (JMSException e) - { - fail("cannot create queue session: " + e.getMessage()); - } - init(session, _queue); - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Test Suite -------------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * Uses two transactions respectively with xid1 and xid2 that are used to send a message - * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. - * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. - */ - public void testProducer() - { - if (!isBroker08()) - { - _logger.debug("running testProducer"); - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - e.printStackTrace(); - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _queueConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUSPEND); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // start the xaResource for xid2 - try - { - _xaResource.start(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid2: " + e.getMessage()); - } - try - { - // produce a message - _message.setLongProperty(_sequenceNumberPropertyName, 2); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send second persistent message: " + e.getMessage()); - } - // end xid2 and start xid1 - try - { - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.start(xid1, XAResource.TMRESUME); - } - catch (XAException e) - { - fail("Exception when ending and starting transactions: " + e.getMessage()); - } - // two phases commit transaction with xid2 - try - { - int resPrepare = _xaResource.prepare(xid2); - if (resPrepare != XAResource.XA_OK) - { - fail("prepare returned: " + resPrepare); - } - _xaResource.commit(xid2, false); - } - catch (XAException e) - { - fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); - } - // receive a message from queue test we expect it to be the second one - try - { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); - if (message == null) - { - fail("did not receive second message as expected "); - } - else - { - if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("receive wrong message its sequence number is: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - } - catch (JMSException e) - { - fail("Exception when receiving second message: " + e.getMessage()); - } - // end and one phase commit the first transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - _xaResource.commit(xid1, true); - } - catch (XAException e) - { - fail("Exception thrown when commiting transaction with xid1"); - } - // We should now be able to receive the first message - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("did not receive first message as expected "); - } - else - { - if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("receive wrong message its sequence number is: " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - } - // commit that transacted session - nonXASession.commit(); - // the queue should be now empty - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 != null) - { - fail("receive an unexpected message "); - } - } - catch (JMSException e) - { - fail("Exception thrown when emptying the queue: " + e.getMessage()); - } - } - } - - /** - * strategy: Produce a message within Tx1 and prepare tx1. crash the server then commit tx1 and consume the message - */ - public void testSendAndRecover() - { - if (!isBroker08()) - { - _logger.debug("running testSendAndRecover"); - Xid xid1 = getNewXid(); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _queueConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid1); - } - catch (XAException e) - { - fail("Exception when preparing xid1: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - _logger.debug("stopping broker"); - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - // get the list of in doubt transactions - try - { - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 1) - { - fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); - } - - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid1)) - { - System.out.println("commit xid1 "); - try - { - _xaResource.commit(anInDoubt, false); - } - catch (Exception e) - { - System.out.println("PB when aborted xid1"); - } - } - else - { - fail("did not receive right xid "); - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - // the queue should contain the first message! - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); - _queueConnection.start(); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - - if (message1 == null) - { - fail("queue does not contain any message!"); - } - if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("Wrong message returned! Sequence number is " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - nonXASession.commit(); - } - catch (JMSException e) - { - fail("Exception thrown when testin that queue test is not empty: " + e.getMessage()); - } - } - } - - /** - * strategy: Produce a message within Tx1 and prepare tx1. Produce a standard message and consume - * it within tx2 and prepare tx2. Shutdown the server and get the list of in doubt transactions: - * we expect tx1 and tx2! Then, Tx1 is aborted and tx2 is committed so we expect the test's queue to be empty! - */ - public void testRecover() - { - if (!isBroker08()) - { - _logger.debug("running testRecover"); - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _queueConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid1); - } - catch (XAException e) - { - fail("Exception when preparing xid1: " + e.getMessage()); - } - - // send a message using the standard session - try - { - Session nonXASession = _nonXASession; - MessageProducer nonXAProducer = nonXASession.createProducer(_queue); - TextMessage message2 = nonXASession.createTextMessage(); - message2.setText("non XA "); - message2.setLongProperty(_sequenceNumberPropertyName, 2); - nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - nonXAProducer.send(message2); - // commit that transacted session - nonXASession.commit(); - } - catch (Exception e) - { - fail("Exception thrown when emptying the queue: " + e.getMessage()); - } - // start the xaResource for xid2 - try - { - _xaResource.start(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - // receive a message from queue test we expect it to be the second one - try - { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); - if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("did not receive second message as expected "); - } - } - catch (JMSException e) - { - fail("Exception when receiving second message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid2: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid2); - } - catch (XAException e) - { - fail("Exception when preparing xid2: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - _logger.debug("stopping broker"); - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - // get the list of in doubt transactions - try - { - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 2) - { - fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); - } - - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid1)) - { - _logger.debug("rollback xid1 "); - try - { - _xaResource.rollback(anInDoubt); - } - catch (Exception e) - { - System.out.println("PB when aborted xid1"); - } - } - else if (anInDoubt.equals(xid2)) - { - _logger.debug("commit xid2 "); - try - { - _xaResource.commit(anInDoubt, false); - } - catch (Exception e) - { - System.out.println("PB when commiting xid2"); - } - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - // the queue should be empty - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); - _queueConnection.start(); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 != null) - { - fail("The queue is not empty! "); - } - } - catch (JMSException e) - { - fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); - } - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Utility methods --------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * get a new queue connection - * - * @return a new queue connection - * @throws JMSException If the JMS provider fails to create the queue connection - * due to some internal error or in case of authentication failure - */ - private XAQueueConnection getNewQueueXAConnection() throws JMSException - { - return _queueFactory.createXAQueueConnection("guest", "guest"); - } - - -} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java new file mode 100644 index 0000000000..5ea059b166 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java @@ -0,0 +1,1708 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.xa; + +import javax.jms.*; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAException; + +import junit.framework.TestSuite; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class TopicTest extends AbstractXATestCase +{ + /* this clas logger */ + private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class); + + /** + * the topic use by all the tests + */ + private static Topic _topic = null; + + /** + * the topic connection factory used by all tests + */ + private static XATopicConnectionFactory _topicFactory = null; + + /** + * standard topic connection + */ + private static XATopicConnection _topicConnection = null; + + /** + * standard topic session created from the standard connection + */ + private static XATopicSession _session = null; + + private static TopicSession _nonXASession = null; + + /** + * the topic name + */ + private static final String TOPICNAME = "xaTopic"; + + /** + * Indicate that a listenere has failed + */ + private static boolean _failure = false; + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- JUnit support ----------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * Gets the test suite tests + * + * @return the test suite tests + */ + public static TestSuite getSuite() + { + return new TestSuite(TopicTest.class); + } + + /** + * Run the test suite. + * + * @param args Any command line arguments specified to this class. + */ + public static void main(String args[]) + { + junit.textui.TestRunner.run(getSuite()); + } + + public void tearDown() throws Exception + { + if (!isBroker08()) + { + try + { + _topicConnection.stop(); + _topicConnection.close(); + } + catch (Exception e) + { + fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); + } + } + super.tearDown(); + } + + /** + * Initialize standard actors + */ + public void init() + { + if (!isBroker08()) + { + // lookup test queue + try + { + _topic = (Topic) getInitialContext().lookup(TOPICNAME); + } + catch (Exception e) + { + fail("cannot lookup test topic " + e.getMessage()); + } + // lookup connection factory + try + { + _topicFactory = getConnectionFactory(); + } + catch (Exception e) + { + fail("enable to lookup connection factory "); + } + // create standard connection + try + { + _topicConnection = getNewTopicXAConnection(); + } + catch (JMSException e) + { + fail("cannot create queue connection: " + e.getMessage()); + } + // create standard session + try + { + _session = _topicConnection.createXATopicSession(); + } + catch (JMSException e) + { + fail("cannot create queue session: " + e.getMessage()); + } + // create a standard session + try + { + _nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use Options | File Templates. + } + init(_session, _topic); + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Test Suite -------------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + + /** + * Uses two transactions respectively with xid1 and xid2 that are use to send a message + * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. + * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. + */ + public void testProducer() + { + if (!isBroker08()) + { + _logger.debug("testProducer"); + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + // start the xaResource for xid1 + try + { + _logger.debug("starting tx branch xid1"); + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + e.printStackTrace(); + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _topicConnection.start(); + _logger.debug("produce a message with sequence number 1"); + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + _logger.debug("suspend the transaction branch xid1"); + try + { + _xaResource.end(xid1, XAResource.TMSUSPEND); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + _logger.debug("start the xaResource for xid2"); + try + { + _xaResource.start(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid2: " + e.getMessage()); + } + try + { + _logger.debug("produce a message"); + _message.setLongProperty(_sequenceNumberPropertyName, 2); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send second persistent message: " + e.getMessage()); + } + _logger.debug("end xid2 and start xid1"); + try + { + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.start(xid1, XAResource.TMRESUME); + } + catch (XAException e) + { + fail("Exception when ending and starting transactions: " + e.getMessage()); + } + _logger.debug("two phases commit transaction with xid2"); + try + { + int resPrepare = _xaResource.prepare(xid2); + if (resPrepare != XAResource.XA_OK) + { + fail("prepare returned: " + resPrepare); + } + _xaResource.commit(xid2, false); + } + catch (XAException e) + { + fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); + } + _logger.debug("receiving a message from topic test we expect it to be the second one"); + try + { + TextMessage message = (TextMessage) _consumer.receiveNoWait(); + if (message == null) + { + fail("did not receive second message as expected "); + } + else + { + if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("receive wrong message its sequence number is: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + } + catch (JMSException e) + { + fail("Exception when receiving second message: " + e.getMessage()); + } + _logger.debug("end and one phase commit the first transaction"); + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + _xaResource.commit(xid1, true); + } + catch (XAException e) + { + fail("Exception thrown when commiting transaction with xid1"); + } + _logger.debug("We should now be able to receive the first and second message"); + try + { + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("did not receive first message as expected "); + } + else + { + if (message1.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("receive wrong message its sequence number is: " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + } + message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("did not receive first message as expected "); + } + else + { + if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("receive wrong message its sequence number is: " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + } + _logger.debug("commit transacted session"); + nonXASession.commit(); + _logger.debug("Test that the topic is now empty"); + message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 != null) + { + fail("receive an unexpected message "); + } + } + catch (JMSException e) + { + fail("Exception thrown when emptying the queue: " + e.getMessage()); + } + } + catch (JMSException e) + { + fail("cannot create standard consumer: " + e.getMessage()); + } + } + } + + + /** + * strategy: Produce a message within Tx1 and commit tx1. consume this message within tx2 and abort tx2. + * Consume the same message within tx3 and commit it. Check that no more message is available. + */ + public void testDurSub() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + String durSubName = "xaSubDurable"; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + _topicConnection.start(); + _logger.debug("start xid1"); + _xaResource.start(xid1, XAResource.TMSUCCESS); + // start the connection + _topicConnection.start(); + _logger.debug("produce a message with sequence number 1"); + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + _logger.debug("2 phases commit xid1"); + _xaResource.end(xid1, XAResource.TMSUCCESS); + if (_xaResource.prepare(xid1) != XAResource.XA_OK) + { + fail("Problem when preparing tx1 "); + } + _xaResource.commit(xid1, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid1: " + e.getMessage()); + } + try + { + _logger.debug("start xid2"); + _xaResource.start(xid2, XAResource.TMSUCCESS); + _logger.debug("receive the previously produced message"); + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + _logger.debug("rollback xid2"); + boolean rollbackOnFailure = false; + try + { + _xaResource.end(xid2, XAResource.TMFAIL); + } + catch (XAException e) + { + if (e.errorCode != XAException.XA_RBROLLBACK) + { + fail("Exception when working with xid2: " + e.getMessage()); + } + rollbackOnFailure = true; + } + if (!rollbackOnFailure) + { + if (_xaResource.prepare(xid2) != XAResource.XA_OK) + { + fail("Problem when preparing tx2 "); + } + _xaResource.rollback(xid2); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid2: " + e.getMessage()); + } + try + { + _logger.debug("start xid3"); + _xaResource.start(xid3, XAResource.TMSUCCESS); + _logger.debug(" receive the previously aborted consumed message"); + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + _logger.debug("commit xid3"); + _xaResource.end(xid3, XAResource.TMSUCCESS); + if (_xaResource.prepare(xid3) != XAResource.XA_OK) + { + fail("Problem when preparing tx3 "); + } + _xaResource.commit(xid3, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid3: " + e.getMessage()); + } + try + { + _logger.debug("start xid4"); + _xaResource.start(xid4, XAResource.TMSUCCESS); + _logger.debug("check that topic is empty"); + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received "); + } + _logger.debug("commit xid4"); + _xaResource.end(xid4, XAResource.TMSUCCESS); + _xaResource.commit(xid4, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid4: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + /** + * strategy: create a XA durable subscriber dusSub, produce 7 messages with the standard session, + * consume 2 messages respectively with tx1, tx2 and tx3 + * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! + * commit tx3 + * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! + * start tx4 and consume messages 1 - 4 and 7 + * commit tx4 + * Now the topic should be empty! + */ + public void testMultiMessagesDurSub() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + Xid xid6 = getNewXid(); + String durSubName = "xaSubDurable"; + TextMessage message; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + Session txSession = _nonXASession; + MessageProducer txProducer = txSession.createProducer(_topic); + _logger.debug("produce 10 persistent messages"); + txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + _topicConnection.start(); + for (int i = 1; i <= 7; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + txProducer.send(_message); + } + // commit txSession + txSession.commit(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Exception thrown when producing messages: " + e.getMessage()); + } + + try + { + _logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3"); + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 1; i <= 2; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid1, XAResource.TMSUSPEND); + //----- start xid2 + _xaResource.start(xid2, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 3; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid2, XAResource.TMSUSPEND); + //----- start xid3 + _xaResource.start(xid3, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 5; i <= 6; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid3, XAResource.TMSUCCESS); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); + } + try + { + _logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7"); + _xaResource.start(xid2, XAResource.TMRESUME); + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.prepare(xid2); + _xaResource.rollback(xid2); + // receive 3 message within tx1: 3, 4 and 7 + _xaResource.start(xid1, XAResource.TMRESUME); + _logger.debug(" 3, 4 and 7"); + for (int i = 1; i <= 3; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 3); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message + .getLongProperty(_sequenceNumberPropertyName) || message + .getLongProperty(_sequenceNumberPropertyName) == 6) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); + } + + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + _logger.debug(" commit tx3"); + _xaResource.commit(xid3, true); + _logger.debug("abort tx1"); + _xaResource.prepare(xid1); + _xaResource.rollback(xid1); + } + catch (XAException e) + { + e.printStackTrace(); + fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); + } + + try + { + // consume messages 1 - 4 + 7 + //----- start xid1 + _xaResource.start(xid4, XAResource.TMSUCCESS); + for (int i = 1; i <= 5; i++) + { + + message = (TextMessage) xaDurSub.receiveNoWait(); + _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message + .getLongProperty(_sequenceNumberPropertyName) == 6) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid4, XAResource.TMSUCCESS); + _xaResource.prepare(xid4); + _xaResource.commit(xid4, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown in last phase: " + e.getMessage()); + } + // now the topic should be empty!! + try + { + // start xid6 + _xaResource.start(xid6, XAResource.TMSUCCESS); + // should now be empty + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid6 + _xaResource.end(xid6, XAResource.TMSUCCESS); + _xaResource.commit(xid6, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid6: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + /** + * strategy: create a XA durable subscriber dusSub, produce 10 messages with the standard session, + * consume 2 messages respectively with tx1, tx2 and tx3 + * prepare xid2 and xid3 + * crash the server + * Redo the job for xid1 that has been aborted by server crash + * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! + * commit tx3 + * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! + * start tx4 and consume messages 1 - 4 + * start tx5 and consume messages 7 - 10 + * abort tx4 + * consume messages 1-4 with tx5 + * commit tx5 + * Now the topic should be empty! + */ + public void testMultiMessagesDurSubCrash() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + Xid xid5 = getNewXid(); + Xid xid6 = getNewXid(); + String durSubName = "xaSubDurable"; + TextMessage message; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + Session txSession = _nonXASession; + MessageProducer txProducer = txSession.createProducer(_topic); + // produce 10 persistent messages + txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + _topicConnection.start(); + for (int i = 1; i <= 10; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + txProducer.send(_message); + } + // commit txSession + txSession.commit(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Exception thrown when producing messages: " + e.getMessage()); + } + try + { + // consume 2 messages respectively with tx1, tx2 and tx3 + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 1; i <= 2; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid1, XAResource.TMSUCCESS); + //----- start xid2 + _xaResource.start(xid2, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 3; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid2, XAResource.TMSUCCESS); + //----- start xid3 + _xaResource.start(xid3, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 5; i <= 6; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid3, XAResource.TMSUCCESS); + // prepare tx2 and tx3 + + _xaResource.prepare(xid2); + _xaResource.prepare(xid3); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); + } + /////// stop the broker now !! + try + { + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + // get the list of in doubt transactions + try + { + _topicConnection.start(); + // reconnect to dursub! + xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 2) + { + fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + try + { + // xid1 has been aborted redo the job! + // consume 2 messages with tx1 + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 1; i <= 2; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid1, XAResource.TMSUSPEND); + // abort tx2, we now expect to receive messages 3 and 4 first! + _xaResource.rollback(xid2); + + // receive 3 message within tx1: 3, 4 and 7 + _xaResource.start(xid1, XAResource.TMRESUME); + // receive messages 3, 4 and 7 + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 3); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected"); + } + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 4); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 4) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected"); + } + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 7); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 7) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected"); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); + } + + try + { + _xaResource.end(xid1, XAResource.TMSUSPEND); + // commit tx3 + _xaResource.commit(xid3, false); + // abort tx1 + _xaResource.prepare(xid1); + _xaResource.rollback(xid1); + } + catch (XAException e) + { + e.printStackTrace(); + fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); + } + + try + { + // consume messages 1 - 4 + //----- start xid1 + _xaResource.start(xid4, XAResource.TMSUCCESS); + for (int i = 1; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid4, XAResource.TMSUSPEND); + // consume messages 8 - 10 + _xaResource.start(xid5, XAResource.TMSUCCESS); + for (int i = 7; i <= 10; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid5, XAResource.TMSUSPEND); + // abort tx4 + _xaResource.prepare(xid4); + _xaResource.rollback(xid4); + // consume messages 1-4 with tx5 + _xaResource.start(xid5, XAResource.TMRESUME); + for (int i = 1; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid5, XAResource.TMSUSPEND); + // commit tx5 + _xaResource.prepare(xid5); + _xaResource.commit(xid5, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown in last phase: " + e.getMessage()); + } + // now the topic should be empty!! + try + { + // start xid6 + _xaResource.start(xid6, XAResource.TMSUCCESS); + // should now be empty + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid6 + _xaResource.end(xid6, XAResource.TMSUSPEND); + _xaResource.commit(xid6, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid6: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + + /** + * strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2 + * that is then prepared. + * Shutdown the server and get the list of in doubt transactions: + * we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty. + */ + public void testDurSubCrash() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + String durSubName = "xaSubDurable"; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + _topicConnection.start(); + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // start the connection + _topicConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + // commit + _xaResource.end(xid1, XAResource.TMSUSPEND); + if (_xaResource.prepare(xid1) != XAResource.XA_OK) + { + fail("Problem when preparing tx1 "); + } + _xaResource.commit(xid1, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid1: " + e.getMessage()); + } + try + { + // start xid2 + _xaResource.start(xid2, XAResource.TMSUCCESS); + // receive the previously produced message + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + // prepare xid2 + _xaResource.end(xid2, XAResource.TMSUSPEND); + if (_xaResource.prepare(xid2) != XAResource.XA_OK) + { + fail("Problem when preparing tx2 "); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid2: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + // get the list of in doubt transactions + try + { + _topicConnection.start(); + // reconnect to dursub! + xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 1) + { + fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); + } + + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid2)) + { + System.out.println("aborting xid2 "); + try + { + _xaResource.rollback(anInDoubt); + } + catch (Exception e) + { + e.printStackTrace(); + fail("exception when aborting xid2 "); + } + } + else + { + System.out.println("XID2 is not in doubt "); + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + + try + { + // start xid3 + _xaResource.start(xid3, XAResource.TMSUCCESS); + // receive the previously produced message and aborted + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid3 + _xaResource.end(xid3, XAResource.TMSUSPEND); + if (_xaResource.prepare(xid3) != XAResource.XA_OK) + { + fail("Problem when preparing tx3 "); + } + _xaResource.commit(xid3, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid3: " + e.getMessage()); + } + try + { + // start xid4 + _xaResource.start(xid4, XAResource.TMSUCCESS); + // should now be empty + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid4 + _xaResource.end(xid4, XAResource.TMSUSPEND); + _xaResource.commit(xid4, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid4: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + /** + * strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions: + * we expect tx1, Tx1 is committed so we expect the test topic not to be empty! + */ + public void testRecover() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + String durSubName = "test1"; + TopicSession nonXASession1; + try + { + // create a dummy durable subscriber to be sure that messages are persisted! + nonXASession1 = _nonXASession; + nonXASession1.createDurableSubscriber(_topic, durSubName); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _topicConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid1); + } + catch (XAException e) + { + fail("Exception when preparing xid1: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + try + { + MessageConsumer nonXAConsumer = nonXASession1.createDurableSubscriber(_topic, durSubName); + _topicConnection.start(); + // get the list of in doubt transactions + try + { + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 1) + { + fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); + } + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid1)) + { + _logger.debug("committing xid1 "); + try + { + _xaResource.commit(anInDoubt, false); + } + catch (Exception e) + { + _logger.debug("PB when aborted xid1"); + e.printStackTrace(); + fail("exception when committing xid1 "); + } + } + else + { + _logger.debug("XID1 is not in doubt "); + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + _logger.debug("the topic should not be empty"); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("The topic is empty! "); + } + } + catch (JMSException e) + { + fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); + } + } + catch (JMSException e) + { + fail("cannot create dummy durable subscriber: " + e.getMessage()); + } + finally + { + try + { + // unsubscribe the dummy durable subscriber + TopicSession nonXASession = _nonXASession; + nonXASession.unsubscribe(durSubName); + } + catch (JMSException e) + { + fail("cannot unsubscribe durable subscriber: " + e.getMessage()); + } + } + } + } + + /** + * strategy: + * create a standard durable subscriber + * produce 3 messages + * consume the first message with that durable subscriber + * close the standard session that deactivates the durable subscriber + * migrate the durable subscriber to an xa one + * consume the second message with that xa durable subscriber + * close the xa session that deactivates the durable subscriber + * reconnect to the durable subscriber with a standard session + * consume the two remaining messages and check that the topic is empty! + */ + public void testMigrateDurableSubscriber() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + String durSubName = "DurableSubscriberMigrate"; + try + { + Session stSession = _nonXASession; + MessageProducer producer = stSession.createProducer(_topic); + _logger.debug("Create a standard durable subscriber!"); + TopicSubscriber durSub = stSession.createDurableSubscriber(_topic, durSubName); + TopicSubscriber durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); + TextMessage message; + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + _topicConnection.start(); + _logger.debug("produce 3 messages"); + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + producer.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + stSession.commit(); + } + _logger.debug("consume the first message with that durable subscriber"); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + // commit the standard session + stSession.commit(); + _logger.debug("first message consumed "); + // close the session that deactivates the durable subscriber + stSession.close(); + _logger.debug("migrate the durable subscriber to an xa one"); + _xaResource.start(xid1, XAResource.TMSUCCESS); + durSub = _session.createDurableSubscriber(_topic, durSubName); + _logger.debug(" consume the second message with that xa durable subscriber and abort it"); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("wrong sequence number, 2 expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + _xaResource.end(xid1, XAResource.TMSUCCESS); + _xaResource.prepare(xid1); + _xaResource.rollback(xid1); + _logger.debug("close the session that deactivates the durable subscriber"); + _session.close(); + _logger.debug("create a new standard session"); + stSession = _topicConnection.createTopicSession(true, 1); + _logger.debug("reconnect to the durable subscriber"); + durSub = stSession.createDurableSubscriber(_topic, durSubName); + durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); + _logger.debug("Reconnected to durablse subscribers"); + _logger.debug(" consume the 2 remaining messages"); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("wrong sequence number, 2 expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // consume the third message with that xa durable subscriber + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) + { + fail("wrong sequence number, 3 expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + stSession.commit(); + _logger.debug("the topic should be empty now"); + message = (TextMessage) durSub.receiveNoWait(); + if (message != null) + { + fail("Received unexpected message "); + } + stSession.commit(); + _logger.debug(" use dursub1 to receive all the 3 messages"); + for (int i = 1; i <= 3; i++) + { + message = (TextMessage) durSub1.receiveNoWait(); + if (message == null) + { + _logger.debug("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number, " + i + " expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + stSession.commit(); + // send a non persistent message to check that all persistent messages are deleted + producer = stSession.createProducer(_topic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(_message); + stSession.commit(); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("message not received "); + } + message = (TextMessage) durSub1.receiveNoWait(); + if (message == null) + { + fail("message not received "); + } + stSession.commit(); + stSession.close(); + _logger.debug(" now create a standard non transacted session and reconnect to the durable xubscriber"); + TopicConnection stConnection = + _topicConnection; //_topicFactory.createTopicConnection("guest", "guest"); + TopicSession autoAclSession = stConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher publisher = autoAclSession.createPublisher(_topic); + durSub = autoAclSession.createDurableSubscriber(_topic, durSubName); + stConnection.start(); + // produce 3 persistent messages + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + } + _logger.debug(" use dursub to receive all the 3 messages"); + for (int i = 1; i <= 3; i++) + { + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + System.out.println("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + System.out.println("wrong sequence number, " + i + " expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + + _logger.debug("now set a message listener"); + AtomicBoolean lock = new AtomicBoolean(true); + reset(); + stConnection.stop(); + durSub.setMessageListener(new TopicListener(1, 3, lock)); + _logger.debug(" produce 3 persistent messages"); + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + } + // start the connection + stConnection.start(); + while (lock.get()) + { + synchronized (lock) + { + lock.wait(); + } + } + if (getFailureStatus()) + { + fail("problem with message listener"); + } + stConnection.stop(); + durSub.setMessageListener(null); + _logger.debug(" do the same with an xa session"); + // produce 3 persistent messages + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + } + //stConnection.close(); + + _logger.debug(" migrate the durable subscriber to an xa one"); + _session = _topicConnection.createXATopicSession(); + _xaResource = _session.getXAResource(); + _xaResource.start(xid2, XAResource.TMSUCCESS); + durSub = _session.createDurableSubscriber(_topic, durSubName); + lock = new AtomicBoolean(); + reset(); + _topicConnection.stop(); + durSub.setMessageListener(new TopicListener(1, 3, lock)); + // start the connection + _topicConnection.start(); + while (lock.get()) + { + synchronized (lock) + { + lock.wait(); + } + } + if (getFailureStatus()) + { + fail("problem with XA message listener"); + } + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.commit(xid2, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown: " + e.getMessage()); + } + finally + { + try + { + _topicConnection.createXASession().unsubscribe(durSubName); + _topicConnection.createXASession().unsubscribe(durSubName + "_second"); + } + catch (JMSException e) + { + fail("Exception thrown when unsubscribing durable subscriber " + e.getMessage()); + } + } + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Utility methods --------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * get a new queue connection + * + * @return a new queue connection + * @throws javax.jms.JMSException If the JMS provider fails to create the queue connection + * due to some internal error or in case of authentication failure + */ + private XATopicConnection getNewTopicXAConnection() throws JMSException + { + return _topicFactory.createXATopicConnection("guest", "guest"); + } + + public static void failure() + { + _failure = true; + } + + public static void reset() + { + _failure = false; + } + + public static boolean getFailureStatus() + { + return _failure; + } + + private class TopicListener implements MessageListener + { + private long _counter; + private long _end; + private final AtomicBoolean _lock; + + public TopicListener(long init, long end, AtomicBoolean lock) + { + _counter = init; + _end = end; + _lock = lock; + } + + public void onMessage(Message message) + { + long seq = 0; + try + { + seq = message.getLongProperty(TopicTest._sequenceNumberPropertyName); + } + catch (JMSException e) + { + e.printStackTrace(); + TopicTest.failure(); + _lock.set(false); + synchronized (_lock) + { + _lock.notifyAll(); + } + } + if (seq != _counter) + { + System.out.println("received message " + seq + " expected " + _counter); + TopicTest.failure(); + _lock.set(false); + synchronized (_lock) + { + _lock.notifyAll(); + } + } + _counter++; + if (_counter > _end) + { + _lock.set(false); + synchronized (_lock) + { + _lock.notifyAll(); + } + } + } + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java deleted file mode 100644 index 30b3b09449..0000000000 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java +++ /dev/null @@ -1,1708 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.test.unit.xa; - -import javax.jms.*; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAException; - -import junit.framework.TestSuite; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * - */ -public class TopicTests extends AbstractXATest -{ - /* this clas logger */ - private static final Logger _logger = LoggerFactory.getLogger(TopicTests.class); - - /** - * the topic use by all the tests - */ - private static Topic _topic = null; - - /** - * the topic connection factory used by all tests - */ - private static XATopicConnectionFactory _topicFactory = null; - - /** - * standard topic connection - */ - private static XATopicConnection _topicConnection = null; - - /** - * standard topic session created from the standard connection - */ - private static XATopicSession _session = null; - - private static TopicSession _nonXASession = null; - - /** - * the topic name - */ - private static final String TOPICNAME = "xaTopic"; - - /** - * Indicate that a listenere has failed - */ - private static boolean _failure = false; - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- JUnit support ----------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * Gets the test suite tests - * - * @return the test suite tests - */ - public static TestSuite getSuite() - { - return new TestSuite(TopicTests.class); - } - - /** - * Run the test suite. - * - * @param args Any command line arguments specified to this class. - */ - public static void main(String args[]) - { - junit.textui.TestRunner.run(getSuite()); - } - - public void tearDown() throws Exception - { - if (!isBroker08()) - { - try - { - _topicConnection.stop(); - _topicConnection.close(); - } - catch (Exception e) - { - fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); - } - } - super.tearDown(); - } - - /** - * Initialize standard actors - */ - public void init() - { - if (!isBroker08()) - { - // lookup test queue - try - { - _topic = (Topic) getInitialContext().lookup(TOPICNAME); - } - catch (Exception e) - { - fail("cannot lookup test topic " + e.getMessage()); - } - // lookup connection factory - try - { - _topicFactory = getConnectionFactory(); - } - catch (Exception e) - { - fail("enable to lookup connection factory "); - } - // create standard connection - try - { - _topicConnection = getNewTopicXAConnection(); - } - catch (JMSException e) - { - fail("cannot create queue connection: " + e.getMessage()); - } - // create standard session - try - { - _session = _topicConnection.createXATopicSession(); - } - catch (JMSException e) - { - fail("cannot create queue session: " + e.getMessage()); - } - // create a standard session - try - { - _nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); - } - catch (JMSException e) - { - e.printStackTrace(); //To change body of catch statement use Options | File Templates. - } - init(_session, _topic); - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Test Suite -------------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - - /** - * Uses two transactions respectively with xid1 and xid2 that are use to send a message - * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. - * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. - */ - public void testProducer() - { - if (!isBroker08()) - { - _logger.debug("testProducer"); - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic); - _producer.setDeliveryMode(DeliveryMode.PERSISTENT); - // start the xaResource for xid1 - try - { - _logger.debug("starting tx branch xid1"); - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - e.printStackTrace(); - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _topicConnection.start(); - _logger.debug("produce a message with sequence number 1"); - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - _logger.debug("suspend the transaction branch xid1"); - try - { - _xaResource.end(xid1, XAResource.TMSUSPEND); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - _logger.debug("start the xaResource for xid2"); - try - { - _xaResource.start(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid2: " + e.getMessage()); - } - try - { - _logger.debug("produce a message"); - _message.setLongProperty(_sequenceNumberPropertyName, 2); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send second persistent message: " + e.getMessage()); - } - _logger.debug("end xid2 and start xid1"); - try - { - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.start(xid1, XAResource.TMRESUME); - } - catch (XAException e) - { - fail("Exception when ending and starting transactions: " + e.getMessage()); - } - _logger.debug("two phases commit transaction with xid2"); - try - { - int resPrepare = _xaResource.prepare(xid2); - if (resPrepare != XAResource.XA_OK) - { - fail("prepare returned: " + resPrepare); - } - _xaResource.commit(xid2, false); - } - catch (XAException e) - { - fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); - } - _logger.debug("receiving a message from topic test we expect it to be the second one"); - try - { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); - if (message == null) - { - fail("did not receive second message as expected "); - } - else - { - if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("receive wrong message its sequence number is: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - } - catch (JMSException e) - { - fail("Exception when receiving second message: " + e.getMessage()); - } - _logger.debug("end and one phase commit the first transaction"); - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - _xaResource.commit(xid1, true); - } - catch (XAException e) - { - fail("Exception thrown when commiting transaction with xid1"); - } - _logger.debug("We should now be able to receive the first and second message"); - try - { - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("did not receive first message as expected "); - } - else - { - if (message1.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("receive wrong message its sequence number is: " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - } - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("did not receive first message as expected "); - } - else - { - if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("receive wrong message its sequence number is: " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - } - _logger.debug("commit transacted session"); - nonXASession.commit(); - _logger.debug("Test that the topic is now empty"); - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 != null) - { - fail("receive an unexpected message "); - } - } - catch (JMSException e) - { - fail("Exception thrown when emptying the queue: " + e.getMessage()); - } - } - catch (JMSException e) - { - fail("cannot create standard consumer: " + e.getMessage()); - } - } - } - - - /** - * strategy: Produce a message within Tx1 and commit tx1. consume this message within tx2 and abort tx2. - * Consume the same message within tx3 and commit it. Check that no more message is available. - */ - public void testDurSub() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - String durSubName = "xaSubDurable"; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - _topicConnection.start(); - _logger.debug("start xid1"); - _xaResource.start(xid1, XAResource.TMSUCCESS); - // start the connection - _topicConnection.start(); - _logger.debug("produce a message with sequence number 1"); - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - _logger.debug("2 phases commit xid1"); - _xaResource.end(xid1, XAResource.TMSUCCESS); - if (_xaResource.prepare(xid1) != XAResource.XA_OK) - { - fail("Problem when preparing tx1 "); - } - _xaResource.commit(xid1, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid1: " + e.getMessage()); - } - try - { - _logger.debug("start xid2"); - _xaResource.start(xid2, XAResource.TMSUCCESS); - _logger.debug("receive the previously produced message"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - _logger.debug("rollback xid2"); - boolean rollbackOnFailure = false; - try - { - _xaResource.end(xid2, XAResource.TMFAIL); - } - catch (XAException e) - { - if (e.errorCode != XAException.XA_RBROLLBACK) - { - fail("Exception when working with xid2: " + e.getMessage()); - } - rollbackOnFailure = true; - } - if (!rollbackOnFailure) - { - if (_xaResource.prepare(xid2) != XAResource.XA_OK) - { - fail("Problem when preparing tx2 "); - } - _xaResource.rollback(xid2); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid2: " + e.getMessage()); - } - try - { - _logger.debug("start xid3"); - _xaResource.start(xid3, XAResource.TMSUCCESS); - _logger.debug(" receive the previously aborted consumed message"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - _logger.debug("commit xid3"); - _xaResource.end(xid3, XAResource.TMSUCCESS); - if (_xaResource.prepare(xid3) != XAResource.XA_OK) - { - fail("Problem when preparing tx3 "); - } - _xaResource.commit(xid3, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid3: " + e.getMessage()); - } - try - { - _logger.debug("start xid4"); - _xaResource.start(xid4, XAResource.TMSUCCESS); - _logger.debug("check that topic is empty"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received "); - } - _logger.debug("commit xid4"); - _xaResource.end(xid4, XAResource.TMSUCCESS); - _xaResource.commit(xid4, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid4: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - /** - * strategy: create a XA durable subscriber dusSub, produce 7 messages with the standard session, - * consume 2 messages respectively with tx1, tx2 and tx3 - * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! - * commit tx3 - * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! - * start tx4 and consume messages 1 - 4 and 7 - * commit tx4 - * Now the topic should be empty! - */ - public void testMultiMessagesDurSub() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - Xid xid6 = getNewXid(); - String durSubName = "xaSubDurable"; - TextMessage message; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - Session txSession = _nonXASession; - MessageProducer txProducer = txSession.createProducer(_topic); - _logger.debug("produce 10 persistent messages"); - txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - _topicConnection.start(); - for (int i = 1; i <= 7; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - txProducer.send(_message); - } - // commit txSession - txSession.commit(); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("Exception thrown when producing messages: " + e.getMessage()); - } - - try - { - _logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3"); - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 1; i <= 2; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid1, XAResource.TMSUSPEND); - //----- start xid2 - _xaResource.start(xid2, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 3; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid2, XAResource.TMSUSPEND); - //----- start xid3 - _xaResource.start(xid3, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 5; i <= 6; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid3, XAResource.TMSUCCESS); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); - } - try - { - _logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7"); - _xaResource.start(xid2, XAResource.TMRESUME); - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.prepare(xid2); - _xaResource.rollback(xid2); - // receive 3 message within tx1: 3, 4 and 7 - _xaResource.start(xid1, XAResource.TMRESUME); - _logger.debug(" 3, 4 and 7"); - for (int i = 1; i <= 3; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 3); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message - .getLongProperty(_sequenceNumberPropertyName) || message - .getLongProperty(_sequenceNumberPropertyName) == 6) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); - } - - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - _logger.debug(" commit tx3"); - _xaResource.commit(xid3, true); - _logger.debug("abort tx1"); - _xaResource.prepare(xid1); - _xaResource.rollback(xid1); - } - catch (XAException e) - { - e.printStackTrace(); - fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); - } - - try - { - // consume messages 1 - 4 + 7 - //----- start xid1 - _xaResource.start(xid4, XAResource.TMSUCCESS); - for (int i = 1; i <= 5; i++) - { - - message = (TextMessage) xaDurSub.receiveNoWait(); - _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message - .getLongProperty(_sequenceNumberPropertyName) == 6) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid4, XAResource.TMSUCCESS); - _xaResource.prepare(xid4); - _xaResource.commit(xid4, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown in last phase: " + e.getMessage()); - } - // now the topic should be empty!! - try - { - // start xid6 - _xaResource.start(xid6, XAResource.TMSUCCESS); - // should now be empty - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid6 - _xaResource.end(xid6, XAResource.TMSUCCESS); - _xaResource.commit(xid6, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid6: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - /** - * strategy: create a XA durable subscriber dusSub, produce 10 messages with the standard session, - * consume 2 messages respectively with tx1, tx2 and tx3 - * prepare xid2 and xid3 - * crash the server - * Redo the job for xid1 that has been aborted by server crash - * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! - * commit tx3 - * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! - * start tx4 and consume messages 1 - 4 - * start tx5 and consume messages 7 - 10 - * abort tx4 - * consume messages 1-4 with tx5 - * commit tx5 - * Now the topic should be empty! - */ - public void testMultiMessagesDurSubCrash() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - Xid xid5 = getNewXid(); - Xid xid6 = getNewXid(); - String durSubName = "xaSubDurable"; - TextMessage message; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - Session txSession = _nonXASession; - MessageProducer txProducer = txSession.createProducer(_topic); - // produce 10 persistent messages - txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - _topicConnection.start(); - for (int i = 1; i <= 10; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - txProducer.send(_message); - } - // commit txSession - txSession.commit(); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("Exception thrown when producing messages: " + e.getMessage()); - } - try - { - // consume 2 messages respectively with tx1, tx2 and tx3 - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 1; i <= 2; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid1, XAResource.TMSUCCESS); - //----- start xid2 - _xaResource.start(xid2, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 3; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid2, XAResource.TMSUCCESS); - //----- start xid3 - _xaResource.start(xid3, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 5; i <= 6; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid3, XAResource.TMSUCCESS); - // prepare tx2 and tx3 - - _xaResource.prepare(xid2); - _xaResource.prepare(xid3); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); - } - /////// stop the broker now !! - try - { - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - // get the list of in doubt transactions - try - { - _topicConnection.start(); - // reconnect to dursub! - xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 2) - { - fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - try - { - // xid1 has been aborted redo the job! - // consume 2 messages with tx1 - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 1; i <= 2; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid1, XAResource.TMSUSPEND); - // abort tx2, we now expect to receive messages 3 and 4 first! - _xaResource.rollback(xid2); - - // receive 3 message within tx1: 3, 4 and 7 - _xaResource.start(xid1, XAResource.TMRESUME); - // receive messages 3, 4 and 7 - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 3); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected"); - } - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 4); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 4) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected"); - } - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 7); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 7) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected"); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); - } - - try - { - _xaResource.end(xid1, XAResource.TMSUSPEND); - // commit tx3 - _xaResource.commit(xid3, false); - // abort tx1 - _xaResource.prepare(xid1); - _xaResource.rollback(xid1); - } - catch (XAException e) - { - e.printStackTrace(); - fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); - } - - try - { - // consume messages 1 - 4 - //----- start xid1 - _xaResource.start(xid4, XAResource.TMSUCCESS); - for (int i = 1; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid4, XAResource.TMSUSPEND); - // consume messages 8 - 10 - _xaResource.start(xid5, XAResource.TMSUCCESS); - for (int i = 7; i <= 10; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid5, XAResource.TMSUSPEND); - // abort tx4 - _xaResource.prepare(xid4); - _xaResource.rollback(xid4); - // consume messages 1-4 with tx5 - _xaResource.start(xid5, XAResource.TMRESUME); - for (int i = 1; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid5, XAResource.TMSUSPEND); - // commit tx5 - _xaResource.prepare(xid5); - _xaResource.commit(xid5, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown in last phase: " + e.getMessage()); - } - // now the topic should be empty!! - try - { - // start xid6 - _xaResource.start(xid6, XAResource.TMSUCCESS); - // should now be empty - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid6 - _xaResource.end(xid6, XAResource.TMSUSPEND); - _xaResource.commit(xid6, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid6: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - - /** - * strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2 - * that is then prepared. - * Shutdown the server and get the list of in doubt transactions: - * we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty. - */ - public void testDurSubCrash() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - String durSubName = "xaSubDurable"; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - _topicConnection.start(); - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // start the connection - _topicConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - // commit - _xaResource.end(xid1, XAResource.TMSUSPEND); - if (_xaResource.prepare(xid1) != XAResource.XA_OK) - { - fail("Problem when preparing tx1 "); - } - _xaResource.commit(xid1, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid1: " + e.getMessage()); - } - try - { - // start xid2 - _xaResource.start(xid2, XAResource.TMSUCCESS); - // receive the previously produced message - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - // prepare xid2 - _xaResource.end(xid2, XAResource.TMSUSPEND); - if (_xaResource.prepare(xid2) != XAResource.XA_OK) - { - fail("Problem when preparing tx2 "); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid2: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - // get the list of in doubt transactions - try - { - _topicConnection.start(); - // reconnect to dursub! - xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 1) - { - fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); - } - - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid2)) - { - System.out.println("aborting xid2 "); - try - { - _xaResource.rollback(anInDoubt); - } - catch (Exception e) - { - e.printStackTrace(); - fail("exception when aborting xid2 "); - } - } - else - { - System.out.println("XID2 is not in doubt "); - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - - try - { - // start xid3 - _xaResource.start(xid3, XAResource.TMSUCCESS); - // receive the previously produced message and aborted - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid3 - _xaResource.end(xid3, XAResource.TMSUSPEND); - if (_xaResource.prepare(xid3) != XAResource.XA_OK) - { - fail("Problem when preparing tx3 "); - } - _xaResource.commit(xid3, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid3: " + e.getMessage()); - } - try - { - // start xid4 - _xaResource.start(xid4, XAResource.TMSUCCESS); - // should now be empty - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid4 - _xaResource.end(xid4, XAResource.TMSUSPEND); - _xaResource.commit(xid4, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid4: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - /** - * strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions: - * we expect tx1, Tx1 is committed so we expect the test topic not to be empty! - */ - public void testRecover() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - String durSubName = "test1"; - TopicSession nonXASession1; - try - { - // create a dummy durable subscriber to be sure that messages are persisted! - nonXASession1 = _nonXASession; - nonXASession1.createDurableSubscriber(_topic, durSubName); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _topicConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid1); - } - catch (XAException e) - { - fail("Exception when preparing xid1: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - try - { - MessageConsumer nonXAConsumer = nonXASession1.createDurableSubscriber(_topic, durSubName); - _topicConnection.start(); - // get the list of in doubt transactions - try - { - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 1) - { - fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); - } - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid1)) - { - _logger.debug("committing xid1 "); - try - { - _xaResource.commit(anInDoubt, false); - } - catch (Exception e) - { - _logger.debug("PB when aborted xid1"); - e.printStackTrace(); - fail("exception when committing xid1 "); - } - } - else - { - _logger.debug("XID1 is not in doubt "); - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - _logger.debug("the topic should not be empty"); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("The topic is empty! "); - } - } - catch (JMSException e) - { - fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); - } - } - catch (JMSException e) - { - fail("cannot create dummy durable subscriber: " + e.getMessage()); - } - finally - { - try - { - // unsubscribe the dummy durable subscriber - TopicSession nonXASession = _nonXASession; - nonXASession.unsubscribe(durSubName); - } - catch (JMSException e) - { - fail("cannot unsubscribe durable subscriber: " + e.getMessage()); - } - } - } - } - - /** - * strategy: - * create a standard durable subscriber - * produce 3 messages - * consume the first message with that durable subscriber - * close the standard session that deactivates the durable subscriber - * migrate the durable subscriber to an xa one - * consume the second message with that xa durable subscriber - * close the xa session that deactivates the durable subscriber - * reconnect to the durable subscriber with a standard session - * consume the two remaining messages and check that the topic is empty! - */ - public void testMigrateDurableSubscriber() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - String durSubName = "DurableSubscriberMigrate"; - try - { - Session stSession = _nonXASession; - MessageProducer producer = stSession.createProducer(_topic); - _logger.debug("Create a standard durable subscriber!"); - TopicSubscriber durSub = stSession.createDurableSubscriber(_topic, durSubName); - TopicSubscriber durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); - TextMessage message; - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - _topicConnection.start(); - _logger.debug("produce 3 messages"); - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - producer.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - stSession.commit(); - } - _logger.debug("consume the first message with that durable subscriber"); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - // commit the standard session - stSession.commit(); - _logger.debug("first message consumed "); - // close the session that deactivates the durable subscriber - stSession.close(); - _logger.debug("migrate the durable subscriber to an xa one"); - _xaResource.start(xid1, XAResource.TMSUCCESS); - durSub = _session.createDurableSubscriber(_topic, durSubName); - _logger.debug(" consume the second message with that xa durable subscriber and abort it"); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("wrong sequence number, 2 expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - _xaResource.end(xid1, XAResource.TMSUCCESS); - _xaResource.prepare(xid1); - _xaResource.rollback(xid1); - _logger.debug("close the session that deactivates the durable subscriber"); - _session.close(); - _logger.debug("create a new standard session"); - stSession = _topicConnection.createTopicSession(true, 1); - _logger.debug("reconnect to the durable subscriber"); - durSub = stSession.createDurableSubscriber(_topic, durSubName); - durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); - _logger.debug("Reconnected to durablse subscribers"); - _logger.debug(" consume the 2 remaining messages"); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("wrong sequence number, 2 expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // consume the third message with that xa durable subscriber - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) - { - fail("wrong sequence number, 3 expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - stSession.commit(); - _logger.debug("the topic should be empty now"); - message = (TextMessage) durSub.receiveNoWait(); - if (message != null) - { - fail("Received unexpected message "); - } - stSession.commit(); - _logger.debug(" use dursub1 to receive all the 3 messages"); - for (int i = 1; i <= 3; i++) - { - message = (TextMessage) durSub1.receiveNoWait(); - if (message == null) - { - _logger.debug("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number, " + i + " expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - stSession.commit(); - // send a non persistent message to check that all persistent messages are deleted - producer = stSession.createProducer(_topic); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(_message); - stSession.commit(); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("message not received "); - } - message = (TextMessage) durSub1.receiveNoWait(); - if (message == null) - { - fail("message not received "); - } - stSession.commit(); - stSession.close(); - _logger.debug(" now create a standard non transacted session and reconnect to the durable xubscriber"); - TopicConnection stConnection = - _topicConnection; //_topicFactory.createTopicConnection("guest", "guest"); - TopicSession autoAclSession = stConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicPublisher publisher = autoAclSession.createPublisher(_topic); - durSub = autoAclSession.createDurableSubscriber(_topic, durSubName); - stConnection.start(); - // produce 3 persistent messages - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - } - _logger.debug(" use dursub to receive all the 3 messages"); - for (int i = 1; i <= 3; i++) - { - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - System.out.println("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - System.out.println("wrong sequence number, " + i + " expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - - _logger.debug("now set a message listener"); - AtomicBoolean lock = new AtomicBoolean(true); - reset(); - stConnection.stop(); - durSub.setMessageListener(new TopicListener(1, 3, lock)); - _logger.debug(" produce 3 persistent messages"); - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - } - // start the connection - stConnection.start(); - while (lock.get()) - { - synchronized (lock) - { - lock.wait(); - } - } - if (getFailureStatus()) - { - fail("problem with message listener"); - } - stConnection.stop(); - durSub.setMessageListener(null); - _logger.debug(" do the same with an xa session"); - // produce 3 persistent messages - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - } - //stConnection.close(); - - _logger.debug(" migrate the durable subscriber to an xa one"); - _session = _topicConnection.createXATopicSession(); - _xaResource = _session.getXAResource(); - _xaResource.start(xid2, XAResource.TMSUCCESS); - durSub = _session.createDurableSubscriber(_topic, durSubName); - lock = new AtomicBoolean(); - reset(); - _topicConnection.stop(); - durSub.setMessageListener(new TopicListener(1, 3, lock)); - // start the connection - _topicConnection.start(); - while (lock.get()) - { - synchronized (lock) - { - lock.wait(); - } - } - if (getFailureStatus()) - { - fail("problem with XA message listener"); - } - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.commit(xid2, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown: " + e.getMessage()); - } - finally - { - try - { - _topicConnection.createXASession().unsubscribe(durSubName); - _topicConnection.createXASession().unsubscribe(durSubName + "_second"); - } - catch (JMSException e) - { - fail("Exception thrown when unsubscribing durable subscriber " + e.getMessage()); - } - } - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Utility methods --------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * get a new queue connection - * - * @return a new queue connection - * @throws javax.jms.JMSException If the JMS provider fails to create the queue connection - * due to some internal error or in case of authentication failure - */ - private XATopicConnection getNewTopicXAConnection() throws JMSException - { - return _topicFactory.createXATopicConnection("guest", "guest"); - } - - public static void failure() - { - _failure = true; - } - - public static void reset() - { - _failure = false; - } - - public static boolean getFailureStatus() - { - return _failure; - } - - private class TopicListener implements MessageListener - { - private long _counter; - private long _end; - private final AtomicBoolean _lock; - - public TopicListener(long init, long end, AtomicBoolean lock) - { - _counter = init; - _end = end; - _lock = lock; - } - - public void onMessage(Message message) - { - long seq = 0; - try - { - seq = message.getLongProperty(TopicTests._sequenceNumberPropertyName); - } - catch (JMSException e) - { - e.printStackTrace(); - TopicTests.failure(); - _lock.set(false); - synchronized (_lock) - { - _lock.notifyAll(); - } - } - if (seq != _counter) - { - System.out.println("received message " + seq + " expected " + _counter); - TopicTests.failure(); - _lock.set(false); - synchronized (_lock) - { - _lock.notifyAll(); - } - } - _counter++; - if (_counter > _end) - { - _lock.set(false); - synchronized (_lock) - { - _lock.notifyAll(); - } - } - } - } - -} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java index e7c09fca65..084137b38e 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java @@ -59,16 +59,27 @@ public class QpidTestCase extends TestCase private InitialContext _initialContext; private AMQConnectionFactory _connectionFactory; - protected void setUp() throws Exception + public void runBare() throws Throwable { - super.setUp(); + String name = getClass().getSimpleName() + "." + getName(); + _logger.info("========== start " + name + " =========="); startBroker(); - } - - protected void tearDown() throws Exception - { - stopBroker(); - super.tearDown(); + try + { + super.runBare(); + } + finally + { + try + { + stopBroker(); + } + catch (Exception e) + { + _logger.error("exception stopping broker", e); + } + _logger.info("========== stop " + name + " =========="); + } } public void startBroker() throws Exception @@ -102,7 +113,8 @@ public class QpidTestCase extends TestCase } catch (IOException e) { - _logger.info("redirector", e); + // this seems to happen regularly even when + // exits are normal } } }.start(); @@ -190,6 +202,11 @@ public class QpidTestCase extends TestCase return _connectionFactory; } + public Connection getConnection() throws Exception + { + return getConnection("guest", "guest"); + } + /** * Get a connection (remote or in-VM) * diff --git a/qpid/java/common.xml b/qpid/java/common.xml index 8e44693c63..cfaab2cc84 100644 --- a/qpid/java/common.xml +++ b/qpid/java/common.xml @@ -198,6 +198,13 @@ that modules build root from underneath the project build root: ${build}/<module> + + ant clean-results + + The clean-results target removes all test output from the test + results directory: + + ${build.results} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 08adb99c47..9f0af7cfa1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -63,6 +63,7 @@ public class Session extends Invoker private long commandsIn = 0; // completed incoming commands private final RangeSet processed = new RangeSet(); + private long processedMark = -1; private Range syncPoint = null; // outgoing command count @@ -132,25 +133,25 @@ public class Session extends Invoker public void flushProcessed() { - long mark = -1; boolean first = true; RangeSet rest = new RangeSet(); synchronized (processed) { for (Range r: processed) { - if (first) + if (first && r.includes(processedMark)) { - first = false; - mark = r.getUpper(); + processedMark = r.getUpper(); } else { rest.add(r); } + + first = false; } } - executionComplete(mark, rest); + executionComplete(processedMark, rest); } void syncPoint() diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java index 17ae7ea0f7..f0f5731037 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java @@ -51,16 +51,23 @@ public class MinaSender implements Sender { throw new TransportException("attempted to write to a closed socket"); } - lastWrite = session.write(ByteBuffer.wrap(buf)); + + synchronized (this) + { + lastWrite = session.write(ByteBuffer.wrap(buf)); + } } - public void close() + public synchronized void close() { // MINA will sometimes throw away in-progress writes when you // ask it to close - if (lastWrite != null) + synchronized (this) { - lastWrite.join(); + if (lastWrite != null) + { + lastWrite.join(); + } } CloseFuture closed = session.close(); closed.join(); diff --git a/qpid/java/module.xml b/qpid/java/module.xml index d80a2c0f56..af943e3b1f 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -176,7 +176,7 @@ - + diff --git a/qpid/python/examples/direct/verify.in b/qpid/python/examples/direct/verify.in index e0dca33039..5e691619d9 100644 --- a/qpid/python/examples/direct/verify.in +++ b/qpid/python/examples/direct/verify.in @@ -1,6 +1,6 @@ -==== ./declare_queues.py.out -==== ./direct_producer.py.out -==== ./direct_consumer.py.out +==== declare_queues.py.out +==== direct_producer.py.out +==== direct_consumer.py.out message 0 message 1 message 2 diff --git a/qpid/python/examples/fanout/verify.in b/qpid/python/examples/fanout/verify.in index c625c30773..a5f57f0b4b 100644 --- a/qpid/python/examples/fanout/verify.in +++ b/qpid/python/examples/fanout/verify.in @@ -1,6 +1,6 @@ -==== ./declare_queues.py.out -==== ./fanout_producer.py.out -==== ./fanout_consumer.py.out +==== declare_queues.py.out +==== fanout_producer.py.out +==== fanout_consumer.py.out message 0 message 1 message 2 diff --git a/qpid/python/examples/pubsub/verify.in b/qpid/python/examples/pubsub/verify.in index 19dcf88312..69de08d17c 100644 --- a/qpid/python/examples/pubsub/verify.in +++ b/qpid/python/examples/pubsub/verify.in @@ -1,4 +1,4 @@ -==== ./topic_publisher.py.out +==== topic_publisher.py.out ==== topic_subscriber.py.out | remove_uuid64 | sort message 0 message 0 diff --git a/qpid/python/examples/request-response/verify.in b/qpid/python/examples/request-response/verify.in index c02a423bcb..f681253b3c 100644 --- a/qpid/python/examples/request-response/verify.in +++ b/qpid/python/examples/request-response/verify.in @@ -1,4 +1,4 @@ -==== ./client.py.out | remove_uuid64 +==== client.py.out | remove_uuid64 Request: Twas brilling, and the slithy toves Request: Did gyre and gimble in the wabe. Request: All mimsy were the borogroves, @@ -9,6 +9,6 @@ Response: DID GYRE AND GIMBLE IN THE WABE. Response: ALL MIMSY WERE THE BOROGROVES, Response: AND THE MOME RATHS OUTGRABE. No more messages! -==== server.py.out | remove_uuid64 +==== server.py.out | remove_uuid64 Request server running - run your client now. (Times out after 100 seconds ...) diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index 9ec1cc270c..c251e6aca0 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,7 +47,7 @@ class MessageTests(TestBase): msg = included.get(timeout=1) self.assertEqual("consume_no_local", msg.content.body) try: - excluded.get(timeout=1) + excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") except Empty: None @@ -59,9 +59,9 @@ class MessageTests(TestBase): could be left on the queue, possibly never being consumed (this is the case for example in the qpid JMS mapping of topics). This test excercises a Qpid C++ broker hack that - deletes such messages. + deletes such messages. """ - + channel = self.channel #setup: channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) @@ -84,7 +84,7 @@ class MessageTests(TestBase): msg = excluded.get(timeout=1) self.assertEqual("foreign", msg.content.body) try: - excluded.get(timeout=1) + excluded.get(timeout=1) self.fail("Received extra message") except Empty: None #check queue is empty @@ -107,7 +107,7 @@ class MessageTests(TestBase): except Closed, e: self.assertChannelException(403, e.args[0]) - #open new channel and cleanup last consumer: + #open new channel and cleanup last consumer: channel = self.client.channel(2) channel.session_open() @@ -173,7 +173,7 @@ class MessageTests(TestBase): msg = myqueue.get(timeout=1) self.assertEqual("One", msg.content.body) try: - msg = myqueue.get(timeout=1) + msg = myqueue.get(timeout=1) self.fail("Got message after cancellation: " + msg) except Empty: None @@ -188,7 +188,7 @@ class MessageTests(TestBase): """ channel = self.channel channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True) - + self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") @@ -197,13 +197,13 @@ class MessageTests(TestBase): channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) - + msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) msg3 = queue.get(timeout=1) msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - + self.assertEqual("One", msg1.content.body) self.assertEqual("Two", msg2.content.body) self.assertEqual("Three", msg3.content.body) @@ -214,10 +214,10 @@ class MessageTests(TestBase): msg4.complete(cumulative=False) channel.message_recover(requeue=False) - + msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) - + self.assertEqual("Three", msg3b.content.body) self.assertEqual("Five", msg5b.content.body) @@ -236,7 +236,7 @@ class MessageTests(TestBase): channel.queue_bind(exchange="amq.fanout", queue="queue-a") channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) channel.queue_bind(exchange="amq.fanout", queue="queue-b") - + self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1) self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0) confirmed = self.client.queue("confirmed") @@ -246,10 +246,10 @@ class MessageTests(TestBase): for d in data: channel.message_transfer(destination="amq.fanout", content=Content(body=d)) - for q in [confirmed, unconfirmed]: + for q in [confirmed, unconfirmed]: for d in data: self.assertEqual(d, q.get(timeout=1).content.body) - self.assertEmpty(q) + self.assertEmpty(q) channel.message_recover(requeue=False) @@ -265,7 +265,7 @@ class MessageTests(TestBase): data.remove(msg.content.body) msg.complete(cumulative=False) channel.message_recover(requeue=False) - + def test_recover_requeue(self): """ @@ -273,7 +273,7 @@ class MessageTests(TestBase): """ channel = self.channel channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True) - + self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") @@ -282,13 +282,13 @@ class MessageTests(TestBase): channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) - + msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) msg3 = queue.get(timeout=1) msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - + self.assertEqual("One", msg1.content.body) self.assertEqual("Two", msg2.content.body) self.assertEqual("Three", msg3.content.body) @@ -307,10 +307,10 @@ class MessageTests(TestBase): self.subscribe(queue="test-requeue", destination="consumer_tag") queue2 = self.client.queue("consumer_tag") - + msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) - + self.assertEqual("Three", msg3b.content.body) self.assertEqual("Five", msg5b.content.body) @@ -327,8 +327,8 @@ class MessageTests(TestBase): extra = queue.get(timeout=1) self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None - - + + def test_qos_prefetch_count(self): """ Test that the prefetch count specified is honoured @@ -370,7 +370,7 @@ class MessageTests(TestBase): except Empty: None - + def test_qos_prefetch_size(self): """ Test that the prefetch size specified is honoured @@ -448,7 +448,7 @@ class MessageTests(TestBase): #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) - + #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit @@ -458,7 +458,7 @@ class MessageTests(TestBase): for i in range(1, 6): self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) - + #increase credit again and check more are received for i in range(6, 11): channel.message_flow(unit = 0, value = 1, destination = "c") @@ -512,7 +512,7 @@ class MessageTests(TestBase): #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) - + #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit @@ -523,7 +523,7 @@ class MessageTests(TestBase): msg = q.get(timeout = 1) self.assertDataEquals(channel, msg, "Message %d" % i) self.assertEmpty(q) - + #acknowledge messages and check more are received msg.complete(cumulative=True) for i in range(6, 11): @@ -560,7 +560,7 @@ class MessageTests(TestBase): msgs.append(msg) self.assertDataEquals(channel, msg, "abcdefgh") self.assertEmpty(q) - + #ack each message individually and check more are received for i in range(5): msg = msgs.pop() @@ -650,7 +650,7 @@ class MessageTests(TestBase): channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = self.client.queue("a") first = queue.get(timeout = 1) - for i in range (2, 10): + for i in range (2, 10): self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) last = queue.get(timeout = 1) self.assertEmpty(queue) @@ -672,7 +672,7 @@ class MessageTests(TestBase): channel.message_flow(unit = 0, value = 10, destination = "a") channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = self.client.queue("a") - for i in range (1, 11): + for i in range (1, 11): self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body) self.assertEmpty(queue) @@ -683,14 +683,14 @@ class MessageTests(TestBase): self.assertEmpty(queue) def test_subscribe_not_acquired_2(self): - channel = self.channel + channel = self.channel #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) - #consume some of them + #consume some of them channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) channel.message_flow_mode(mode = 0, destination = "a") channel.message_flow(unit = 0, value = 5, destination = "a") -- cgit v1.2.1