diff options
author | Aidan Skinner <aidan@apache.org> | 2008-02-13 11:44:34 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-02-13 11:44:34 +0000 |
commit | f3f64b6b056c866558eef7d9dd1944b92eb03fc1 (patch) | |
tree | a062d8acd67dafb97c4aa25e1ca5ffc9aaf2936b /qpid | |
parent | eff51b9febb3dc299e206ac339a84f28d5e96531 (diff) | |
download | qpid-python-f3f64b6b056c866558eef7d9dd1944b92eb03fc1.tar.gz |
Merged revisions 619974,620011,620014,620016-620017,620479,620481,620566,620584,620619,620622,620831,620854,620861-620862,620889,627128,627133,627141,627153-627155,627157,627171,627187 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk
........
r619974 | rhs | 2008-02-08 18:30:04 +0000 (Fri, 08 Feb 2008) | 1 line
made xa tests run, and made QpidTestCase more robust
........
r620011 | rhs | 2008-02-08 22:05:50 +0000 (Fri, 08 Feb 2008) | 1 line
whitespace cleanup
........
r620014 | aconway | 2008-02-08 22:23:23 +0000 (Fri, 08 Feb 2008) | 7 lines
cpp/examples/direct, fanout: Converted listener.cpp to SubscriptionManager.
All python/cpp combos run as part of cpp/examples make check.
Fixed problems with verify scripts and VPATH builds.
........
r620016 | cctrieloff | 2008-02-08 22:24:33 +0000 (Fri, 08 Feb 2008) | 2 lines
yum correction
........
r620017 | aconway | 2008-02-08 22:27:38 +0000 (Fri, 08 Feb 2008) | 10 lines
From Ted Ross, https://issues.apache.org/jira/browse/QPID-782
The attached patch makes the following changes:
The --load-dir option has been renamed to --module-dir
The --no-modules option and been replaced by the --no-module-dir option. This new option suppresses ONLY the loading of modules from the directory.
The --no-data-dir option has been added to suppress the use of a data directory.
Logging has been added for data directory lock and unlock.
........
r620479 | gsim | 2008-02-11 13:00:45 +0000 (Mon, 11 Feb 2008) | 3 lines
Check valid listener (or handler) exist and log error if not. See QPID-783.
........
r620481 | gsim | 2008-02-11 13:10:38 +0000 (Mon, 11 Feb 2008) | 3 lines
Added a test (currently disabled) that highlights a deadlock in the client when commands are sent to the broker concurrently with acks (e.g. when the dispatcher thread is running with auto-acking and messages are sent on another thread).
........
r620566 | rhs | 2008-02-11 18:23:12 +0000 (Mon, 11 Feb 2008) | 1 line
bumped release for Beta 3
........
r620584 | rhs | 2008-02-11 19:21:01 +0000 (Mon, 11 Feb 2008) | 1 line
fixed computation of ranged acks, fix needed for failing RecoverTest
........
r620619 | aconway | 2008-02-11 21:43:32 +0000 (Mon, 11 Feb 2008) | 1 line
Fix errors in verify scripts.
........
r620622 | aconway | 2008-02-11 21:50:17 +0000 (Mon, 11 Feb 2008) | 1 line
Remove dependency on sys::Socket for management ID of connections.
........
r620831 | rhs | 2008-02-12 15:44:14 +0000 (Tue, 12 Feb 2008) | 1 line
added help text for the clean-results target
........
r620854 | gsim | 2008-02-12 16:35:45 +0000 (Tue, 12 Feb 2008) | 3 lines
Explicitly reset shared pointer; brokers destructor not called if started through -d otherwise it seems...
........
r620861 | gsim | 2008-02-12 16:54:05 +0000 (Tue, 12 Feb 2008) | 3 lines
Fixed typo
........
r620862 | aconway | 2008-02-12 16:54:42 +0000 (Tue, 12 Feb 2008) | 2 lines
Create /var/lib/qpidd correctly.
........
r620889 | aconway | 2008-02-12 18:04:11 +0000 (Tue, 12 Feb 2008) | 1 line
Quote all non-printable ASCII characters (not just control characters)
........
r627128 | aconway | 2008-02-12 21:39:55 +0000 (Tue, 12 Feb 2008) | 1 line
Create a tar file of verify scripts suitable for untarring into and installed examples directory.
........
r627133 | aconway | 2008-02-12 21:48:52 +0000 (Tue, 12 Feb 2008) | 2 lines
Add -g to build flags to get debug info.
........
r627141 | aconway | 2008-02-12 21:57:36 +0000 (Tue, 12 Feb 2008) | 2 lines
Fix verify error in Makefile.am
........
r627153 | rhs | 2008-02-12 22:21:20 +0000 (Tue, 12 Feb 2008) | 1 line
increased the test timeout
........
r627154 | rhs | 2008-02-12 22:22:20 +0000 (Tue, 12 Feb 2008) | 1 line
synchronize access to lastWrite future
........
r627155 | rhs | 2008-02-12 22:23:02 +0000 (Tue, 12 Feb 2008) | 1 line
added default getConnection()
........
r627157 | rhs | 2008-02-12 22:26:26 +0000 (Tue, 12 Feb 2008) | 1 line
added a test for message send followed by immediate connection close; fixed connection close handshaking
........
r627171 | aconway | 2008-02-12 23:09:06 +0000 (Tue, 12 Feb 2008) | 6 lines
Patches from Ted Ross:
Fix for bignumber problem in the management console.
Fix for broker crash when sessions are closed via management.
........
r627187 | rhs | 2008-02-12 23:52:01 +0000 (Tue, 12 Feb 2008) | 1 line
applied patch from rajith to fix reply-to
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@627359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
61 files changed, 550 insertions, 356 deletions
diff --git a/qpid/bin/verify b/qpid/bin/verify index f35db898cf..844e99b765 100755 --- a/qpid/bin/verify +++ b/qpid/bin/verify @@ -36,11 +36,16 @@ background() { waitfor $out "$pattern" } +name() { + for x in $*; do name="$name `basename $x`"; done + echo $name; +} + outputs() { wait 2> /dev/null # Wait for all backgroud processes to complete rm -f $script.out for f in "$@"; do - { echo "==== $f"; eval "cat $f"; } >> $script.out || fail + { echo "==== `name $f`"; eval "cat $f"; } >> $script.out || fail done } @@ -49,6 +54,7 @@ verify() { if [ -d $1 ]; then dir=$1; script=verify; else dir=`dirname $1`; script=`basename $1`; fi cd $dir || return 1 + rm -f *.out { source ./$script && diff -ac $script.out $script.in ; } || fail test -z "$FAIL" && rm -f *.out return $FAIL @@ -59,7 +65,7 @@ remove_uuid() { sed "s/$HEX\{8\}-$HEX\{4\}-$HEX\{4\}-$HEX\{4\}-$HEX\{12\}//g" $* } remove_uuid64() { - sed 's/[-A-Za-z0-9_]\{22\}==$//' $* + sed 's/[-A-Za-z0-9_]\{22\}==//g' $* } # Start private broker if QPIDD is set. diff --git a/qpid/bin/verify_all b/qpid/bin/verify_all deleted file mode 100755 index 6be460c701..0000000000 --- a/qpid/bin/verify_all +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/sh -# Find example verify scripts and run them. -# Usage: verify_all search-dir - -if [ `basename $1` = examples ]; then exdirs=$1 -else exdirs=`find $1 -name examples -a -type d`; fi -scripts=`find $exdirs -name verify -o -name verify_cpp_python -o -name verify_cpp_java` -verify=`dirname $0`/verify -for s in $scripts; do $verify $s; done - - - diff --git a/qpid/cpp/README b/qpid/cpp/README index 7c34293007..7de7fd3f98 100644 --- a/qpid/cpp/README +++ b/qpid/cpp/README @@ -60,7 +60,7 @@ all of the above plus: On linux most packages can be installed using your distribution's package management tool. For example on Fedora: - # yum install pkgconfig e2fsprogs boost-devel cppunit-devel openais ruby + # yum install pkgconfig e2fsprogs boost-devel cppunit-devel openais-devel ruby # yum install make gcc-c++ autoconf automake libtool doxygen help2man graphviz # yum install e2fsprogs-devel diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am index fdbea3b878..92625550e2 100644 --- a/qpid/cpp/examples/Makefile.am +++ b/qpid/cpp/examples/Makefile.am @@ -17,7 +17,36 @@ nobase_pkgdata_DATA= \ examples/direct/listener.cpp \ examples/direct/declare_queues.cpp -EXTRA_DIST=$(nobase_pkgdata_DATA) +VERIFY_FILES= verify verify_all \ + examples/request-response/verify \ + examples/request-response/verify.in \ + examples/request-response/verify_cpp_python \ + examples/request-response/verify_cpp_python.in \ + examples/request-response/verify_python_cpp \ + examples/request-response/verify_python_cpp.in \ + examples/fanout/verify \ + examples/fanout/verify.in \ + examples/fanout/verify_cpp_python \ + examples/fanout/verify_cpp_python.in \ + examples/fanout/verify_python_cpp \ + examples/fanout/verify_python_cpp.in \ + examples/pub-sub/verify \ + examples/pub-sub/verify.in \ + examples/pub-sub/verify_cpp_python \ + examples/pub-sub/verify_cpp_python.in \ + examples/pub-sub/verify_python_cpp \ + examples/pub-sub/verify_python_cpp.in \ + examples/direct/verify \ + examples/direct/verify.in \ + examples/direct/verify_cpp_python \ + examples/direct/verify_cpp_python.in \ + examples/direct/verify_python_cpp \ + examples/direct/verify_python_cpp.in + +EXTRA_DIST=$(nobase_pkgdata_DATA) $(VERIFY_FILES) + +verify: + cp $(top_srcdir)/../bin/verify $@ # Note: we don't use normal automake SUBDIRS because the example # makefiles don't understand all the recursive automake targets. @@ -29,31 +58,23 @@ clean-local: abs_top_builddir=@abs_top_builddir@ abs_top_srcdir=@abs_top_srcdir@ -VERIFY=$(top_srcdir)/../bin/verify -PYTHON_EXAMPLES=$(top_srcdir)/../python/examples -EXAMPLES= \ - examples/pub-sub \ - examples/fanout \ - examples/direct \ - examples/request-response \ - $(PYTHON_EXAMPLES)/pubsub \ - $(PYTHON_EXAMPLES)/fanout \ - $(PYTHON_EXAMPLES)/direct \ - $(PYTHON_EXAMPLES)/request-response - -# Build the examples in the source tree. +# Build the examples - copy sources to the build tree in VPATH build. all-local: + test -d examples || cp -R $(srcdir)/examples . cd examples && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(CXXFLAGS) -I../../$(top_srcdir)/src -I../../$(top_srcdir)/src/gen -I../../$(top_builddir)/src/gen -L../../$(top_builddir)/src/.libs -Wl,-rpath,$(abs_top_builddir)/src/.libs" all # Verify the examples in the buid tree. -check-local: all-local - QPID_DATA_DIR= QPIDD=$(top_builddir)/src/qpidd $(VERIFY) $(EXAMPLES) - -# Build and verify the installed examples, then clean up to avoid rpmbuild warnings. -EXAMPLE_FLAGS=-I$(DESTDIR)$(includedir) -L$(DESTDIR)$(libdir) -Wl,-rpath,$(DESTDIR)$(libdir) -EXAMPLE_DIR=$(DESTDIR)$(pkgdatadir)/examples -installcheck-local: - cd $(EXAMPLE_DIR) && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(EXAMPLE_FLAGS)" all - cd $(EXAMPLE_DIR) && QPIDD=$(sbindir)/qpidd $(VERIFY) - cd $(EXAMPLE_DIR) && $(MAKE) clean +check-local: all-local verify + $(srcdir)/verify_all $(abs_top_srcdir) $(abs_top_builddir) + +# TODO: +# create a tarball for testing installed examples. +# installcheck-local to use the tarball on installed example and clean up after. +# Build and verify installed C++ examples, clean up to avoid rpmbuild warnings. +# EXAMPLE_FLAGS=-I$(DESTDIR)$(includedir) -L$(DESTDIR)$(libdir) -Wl,-rpath,$(DESTDIR)$(libdir) +# EXAMPLE_DIR=$(DESTDIR)$(pkgdatadir)/examples/cpp +# installcheck-local: +# cd $(EXAMPLE_DIR) && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(EXAMPLE_FLAGS)" all +# cd $(EXAMPLE_DIR) && QPIDD=$(sbindir)/qpidd $(srcdir)/verify * +# cd $(EXAMPLE_DIR) && $(MAKE) clean diff --git a/qpid/cpp/examples/examples/direct/listener.cpp b/qpid/cpp/examples/examples/direct/listener.cpp index 3f92d189de..91b5123c68 100644 --- a/qpid/cpp/examples/examples/direct/listener.cpp +++ b/qpid/cpp/examples/examples/direct/listener.cpp @@ -20,32 +20,15 @@ */ /** - * listener.cpp: - * - * This program is one of three programs designed to be used - * together. These programs do not specify the exchange type - the - * default exchange type is the direct exchange. - * - * declare_queues.cpp: - * - * Creates a queue on a broker, binding a routing key to route - * messages to that queue. - * - * direct_producer.cpp: - * - * Publishes to a broker, specifying a routing key. - * - * listener.cpp (this program): - * - * Reads from a queue on the broker using a message listener. - * + * listener.cpp: This program reads messages fro a queue on + * the broker using a message listener. */ #include <qpid/client/Dispatcher.h> #include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> -#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> #include <unistd.h> #include <cstdlib> @@ -56,42 +39,25 @@ using namespace qpid::framing; class Listener : public MessageListener{ -private: - std::string destination_name; - Dispatcher dispatcher; -public: - Listener(Session& session, string destination_name): - destination_name(destination_name), - dispatcher(session) - {}; - - virtual void listen(); - virtual void received(Message& message); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating listener for: " <<destination_name << std::endl; - dispatcher.listen(destination_name, this); - - // The following line gives up control - - dispatcher.run(); -} - +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) +{} void Listener::received(Message& message) { std::cout << "Message: " << message.getData() << std::endl; - if (message.getData() == "That's all, folks!") { - std::cout << "Shutting down listener for " <<destination_name << std::endl; - dispatcher.stop(); + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); } } - - int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; @@ -103,25 +69,15 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- + SubscriptionManager subscriptions(session); + // Create a listener and subscribe it to the queue named "message_queue" + Listener listener(subscriptions); + subscriptions.subscribe(listener, "message_queue"); + // Deliver messages until the subscription is cancelled + // by Listener::received() + subscriptions.run(); - // Subscribe to the queue, route it to a client destination for - // the listener. (The destination name merely identifies the - // destination in the listener, you can use any name as long as - // you use the same name for the listener). - - session.messageSubscribe(arg::queue="message_queue", arg::destination="listener_destination"); - - //############## - session.messageFlow(arg::destination="listener_destination", arg::unit=0, arg::value=1);//messages ### Define a constant? - session.messageFlow(arg::destination="listener_destination", arg::unit=1, arg::value=0xFFFFFFFF);//bytes ###### Define a constant? - - // Tell the listener to listen to the destination we just - // created above. - - Listener listener(session, "listener_destination"); - listener.listen(); - - //----------------------------------------------------------------------------- + //--------------------------------------------------------------------------- connection.close(); return 0; diff --git a/qpid/cpp/examples/examples/direct/verify.in b/qpid/cpp/examples/examples/direct/verify.in index afe36335c7..d1e95f1151 100644 --- a/qpid/cpp/examples/examples/direct/verify.in +++ b/qpid/cpp/examples/examples/direct/verify.in @@ -1,7 +1,6 @@ -==== ./declare_queues.out -==== ./direct_producer.out -==== ./listener.out -Activating listener for: listener_destination +==== declare_queues.out +==== direct_producer.out +==== listener.out Message: Message 0 Message: Message 1 Message: Message 2 @@ -13,4 +12,4 @@ Message: Message 7 Message: Message 8 Message: Message 9 Message: That's all, folks! -Shutting down listener for listener_destination +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/direct/verify_cpp_python b/qpid/cpp/examples/examples/direct/verify_cpp_python index 5ce3681d90..4dc445ba27 100644 --- a/qpid/cpp/examples/examples/direct/verify_cpp_python +++ b/qpid/cpp/examples/examples/direct/verify_cpp_python @@ -1,4 +1,4 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/direct +py=$PYTHON_EXAMPLES/direct clients ./declare_queues ./direct_producer $py/direct_consumer.py outputs ./declare_queues.out ./direct_producer.out $py/direct_consumer.py.out diff --git a/qpid/cpp/examples/examples/direct/verify_cpp_python.in b/qpid/cpp/examples/examples/direct/verify_cpp_python.in index 0952dbe405..1a329be59a 100644 --- a/qpid/cpp/examples/examples/direct/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/direct/verify_cpp_python.in @@ -1,6 +1,6 @@ -==== ./declare_queues.out -==== ./direct_producer.out -==== ../../../../python/examples/direct/direct_consumer.py.out +==== declare_queues.out +==== direct_producer.out +==== direct_consumer.py.out Message 0 Message 1 Message 2 diff --git a/qpid/cpp/examples/examples/direct/verify_python_cpp b/qpid/cpp/examples/examples/direct/verify_python_cpp index 43c5339150..fe4893e120 100644 --- a/qpid/cpp/examples/examples/direct/verify_python_cpp +++ b/qpid/cpp/examples/examples/direct/verify_python_cpp @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/direct +py=$PYTHON_EXAMPLES/direct clients $py/declare_queues.py $py/direct_producer.py ./listener outputs $py/declare_queues.py.out $py/direct_producer.py.out ./listener.out diff --git a/qpid/cpp/examples/examples/direct/verify_python_cpp.in b/qpid/cpp/examples/examples/direct/verify_python_cpp.in index b1907e6c3b..6f35255b18 100644 --- a/qpid/cpp/examples/examples/direct/verify_python_cpp.in +++ b/qpid/cpp/examples/examples/direct/verify_python_cpp.in @@ -1,7 +1,6 @@ -==== ../../../../python/examples/direct/declare_queues.py.out -==== ../../../../python/examples/direct/direct_producer.py.out -==== ./listener.out -Activating listener for: listener_destination +==== declare_queues.py.out +==== direct_producer.py.out +==== listener.out Message: message 0 Message: message 1 Message: message 2 @@ -13,4 +12,4 @@ Message: message 7 Message: message 8 Message: message 9 Message: That's all, folks! -Shutting down listener for listener_destination +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/fanout/declare_queues.cpp b/qpid/cpp/examples/examples/fanout/declare_queues.cpp index 6b1eabfeec..c76ed54730 100644 --- a/qpid/cpp/examples/examples/fanout/declare_queues.cpp +++ b/qpid/cpp/examples/examples/fanout/declare_queues.cpp @@ -20,21 +20,16 @@ */ /** - * direct_config_queues.cpp + * declare_queues.cpp (this program): * * This program is one of three programs designed to be used - * together. These programs use the "amq.direct" exchange. + * together. These programs use the "amq.fanout" exchange. * - * direct_config_queues.cpp (this program): - * - * Creates a queue on a broker, binding a routing key to route - * messages to that queue. - * - * direct_publisher.cpp: + * fanout_producer.cpp: * * Publishes to a broker, specifying a routing key. * - * direct_listener.cpp + * listener.cpp * * Reads from a queue on the broker using a message listener. * @@ -65,9 +60,7 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- - // Create a queue named "message_queue", and route all messages whose - // routing key is "routing_key to this newly created queue. - + // Create and bind a queue named "message_queue". session.queueDeclare(arg::queue="message_queue"); session.queueBind(arg::queue="message_queue", arg::exchange="amq.fanout"); diff --git a/qpid/cpp/examples/examples/fanout/fanout_producer.cpp b/qpid/cpp/examples/examples/fanout/fanout_producer.cpp index a5904e6731..7521724920 100644 --- a/qpid/cpp/examples/examples/fanout/fanout_producer.cpp +++ b/qpid/cpp/examples/examples/fanout/fanout_producer.cpp @@ -21,22 +21,22 @@ /** - * direct_publisher.cpp: + * fanout_producer.cpp: * * This program is one of three programs designed to be used * together. These programs do not specify the exchange type - the * default exchange type is the direct exchange. * - * direct_config_queues.cpp: + * declare_queues.cpp: * * Creates a queue on a broker, binding a routing key to route * messages to that queue. * - * direct_publisher.cpp (this program): + * fanout_producer.cpp (this program): * * Publishes to a broker, specifying a routing key. * - * direct_listener.cpp + * listener.cpp * * Reads from a queue on the broker using a message listener. * diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp index 3f94f73f48..91b5123c68 100644 --- a/qpid/cpp/examples/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/examples/fanout/listener.cpp @@ -20,32 +20,15 @@ */ /** - * direct_listener.cpp: - * - * This program is one of three programs designed to be used - * together. These programs do not specify the exchange type - the - * default exchange type is the direct exchange. - * - * direct_config_queues.cpp: - * - * Creates a queue on a broker, binding a routing key to route - * messages to that queue. - * - * direct_publisher.cpp: - * - * Publishes to a broker, specifying a routing key. - * - * direct_listener.cpp (this program): - * - * Reads from a queue on the broker using a message listener. - * + * listener.cpp: This program reads messages fro a queue on + * the broker using a message listener. */ #include <qpid/client/Dispatcher.h> #include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> -#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> #include <unistd.h> #include <cstdlib> @@ -56,43 +39,25 @@ using namespace qpid::framing; class Listener : public MessageListener{ -private: - std::string destination_name; - Dispatcher dispatcher; -public: - Listener(Session& session, string destination_name): - destination_name(destination_name), - dispatcher(session) - {}; - - virtual void listen(); - virtual void received(Message& message); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating listener for: " <<destination_name << std::endl; - dispatcher.listen(destination_name, this); - - // ### The following line gives up control - it should be possible - // ### to listen without giving up control! - - dispatcher.run(); -} - +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) +{} void Listener::received(Message& message) { std::cout << "Message: " << message.getData() << std::endl; - if (message.getData() == "That's all, folks!") { - std::cout << "Shutting down listener for " <<destination_name << std::endl; - dispatcher.stop(); + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); } } - - int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; @@ -104,24 +69,15 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- + SubscriptionManager subscriptions(session); + // Create a listener and subscribe it to the queue named "message_queue" + Listener listener(subscriptions); + subscriptions.subscribe(listener, "message_queue"); + // Deliver messages until the subscription is cancelled + // by Listener::received() + subscriptions.run(); - // Subscribe to the queue, route it to a client destination for - // the listener. (The destination name merely identifies the - // destination in the listener, you can use any name as long as - // you use the same name for the listener). - - session.messageSubscribe(arg::queue="message_queue", arg::destination="listener_destination"); - - session.messageFlow(arg::destination="listener_destination", arg::unit=0, arg::value=1);//messages ### Define a constant? - session.messageFlow(arg::destination="listener_destination", arg::unit=1, arg::value=0xFFFFFFFF);//bytes ###### Define a constant? - - // Tell the listener to listen to the destination we just - // created above. - - Listener listener(session, "listener_destination"); - listener.listen(); - - //----------------------------------------------------------------------------- + //--------------------------------------------------------------------------- connection.close(); return 0; diff --git a/qpid/cpp/examples/examples/fanout/verify.in b/qpid/cpp/examples/examples/fanout/verify.in index 9d1c1445e1..23d08f38dd 100644 --- a/qpid/cpp/examples/examples/fanout/verify.in +++ b/qpid/cpp/examples/examples/fanout/verify.in @@ -1,7 +1,6 @@ -==== ./declare_queues.out -==== ./fanout_producer.out -==== ./listener.out -Activating listener for: listener_destination +==== declare_queues.out +==== fanout_producer.out +==== listener.out Message: Message 0 Message: Message 1 Message: Message 2 @@ -13,4 +12,4 @@ Message: Message 7 Message: Message 8 Message: Message 9 Message: That's all, folks! -Shutting down listener for listener_destination +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python b/qpid/cpp/examples/examples/fanout/verify_cpp_python index 59fa63478c..f53784ef1c 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/fanout +py=$PYTHON_EXAMPLES/fanout clients ./declare_queues ./fanout_producer $py/fanout_consumer.py outputs ./declare_queues.out ./fanout_producer.out $py/fanout_consumer.py.out diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in index 141e7c6a84..cb9e52cfcc 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in @@ -1,6 +1,6 @@ -==== ./declare_queues.out -==== ./fanout_producer.out -==== ../../../../python/examples/fanout/fanout_consumer.py.out +==== declare_queues.out +==== fanout_producer.out +==== fanout_consumer.py.out Message 0 Message 1 Message 2 diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp b/qpid/cpp/examples/examples/fanout/verify_python_cpp index cef36bd287..00a0727352 100644 --- a/qpid/cpp/examples/examples/fanout/verify_python_cpp +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/fanout -clients $py/declare_queues.py $py/fanout_producer .py ./listener +py=$PYTHON_EXAMPLES/fanout +clients $py/declare_queues.py $py/fanout_producer.py ./listener outputs $py/declare_queues.py.out $py/fanout_producer.py.out ./listener.out diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in new file mode 100644 index 0000000000..87a1694e6c --- /dev/null +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in @@ -0,0 +1,15 @@ +==== declare_queues.py.out +==== fanout_producer.py.out +==== listener.out +Message: message 0 +Message: message 1 +Message: message 2 +Message: message 3 +Message: message 4 +Message: message 5 +Message: message 6 +Message: message 7 +Message: message 8 +Message: message 9 +Message: That's all, folks! +Shutting down listener for message_queue diff --git a/qpid/cpp/examples/examples/pub-sub/verify.in b/qpid/cpp/examples/examples/pub-sub/verify.in index ac51038a1b..6413c5c788 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify.in +++ b/qpid/cpp/examples/examples/pub-sub/verify.in @@ -1,4 +1,4 @@ -==== ./topic_publisher.out +==== topic_publisher.out ==== topic_listener.out | remove_uuid | sort Declaring queue: europe Declaring queue: news diff --git a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python index ff711a310f..ecc573eed3 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python +++ b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/pubsub +py=$PYTHON_EXAMPLES/pubsub background "Queues created" $py/topic_subscriber.py clients ./topic_publisher outputs ./topic_publisher.out "$py/topic_subscriber.py.out | remove_uuid64 | sort" diff --git a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in index 9e033dc25c..b3c9e750f5 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in @@ -1,5 +1,5 @@ -==== ./topic_publisher.out -==== ../../../../python/examples/pubsub/topic_subscriber.py.out | remove_uuid64 | sort +==== topic_publisher.out +==== topic_subscriber.py.out | remove_uuid64 | sort Message 0 Message 0 Message 0 diff --git a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp index e9c72c94d7..2ddaad58c2 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp +++ b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp @@ -1,5 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/pubsub +py=$PYTHON_EXAMPLES/pubsub background "Listening" ./topic_listener clients $py/topic_publisher.py outputs $py/topic_publisher.py.out "topic_listener.out | remove_uuid | sort" diff --git a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in index 6f92458792..97fccf0a32 100644 --- a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in +++ b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in @@ -1,4 +1,4 @@ -==== ../../../../python/examples/pubsub/topic_publisher.py.out +==== topic_publisher.py.out ==== topic_listener.out | remove_uuid | sort Declaring queue: europe Declaring queue: news diff --git a/qpid/cpp/examples/examples/request-response/verify b/qpid/cpp/examples/examples/request-response/verify index a8c942b5e6..76007ff8d2 100644 --- a/qpid/cpp/examples/examples/request-response/verify +++ b/qpid/cpp/examples/examples/request-response/verify @@ -2,4 +2,4 @@ background "Waiting" ./server clients ./client kill %% # Must kill the server. -outputs "./client.out | remove_uuid" " server.out | remove_uuid" +outputs "./client.out | remove_uuid" "server.out | remove_uuid" diff --git a/qpid/cpp/examples/examples/request-response/verify.in b/qpid/cpp/examples/examples/request-response/verify.in index 3767fb3f43..7925dc5671 100644 --- a/qpid/cpp/examples/examples/request-response/verify.in +++ b/qpid/cpp/examples/examples/request-response/verify.in @@ -1,4 +1,4 @@ -==== ./client.out | remove_uuid +==== client.out | remove_uuid Activating response queue listener for: client Request: Twas brillig, and the slithy toves Request: Did gire and gymble in the wabe. @@ -10,7 +10,7 @@ Response: DID GIRE AND GYMBLE IN THE WABE. Response: ALL MIMSY WERE THE BOROGROVES, Response: AND THE MOME RATHS OUTGRABE. Shutting down listener for client -==== server.out | remove_uuid +==== server.out | remove_uuid Activating request queue listener for: request Waiting for requests Request: Twas brillig, and the slithy toves (client) diff --git a/qpid/cpp/examples/examples/request-response/verify_cpp_python b/qpid/cpp/examples/examples/request-response/verify_cpp_python index 9470110bc2..9d71d51c37 100644 --- a/qpid/cpp/examples/examples/request-response/verify_cpp_python +++ b/qpid/cpp/examples/examples/request-response/verify_cpp_python @@ -1,6 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/request-response -background "Request server running" $py/server.py +background "Request server running" $PYTHON_EXAMPLES/request-response/server.py clients ./client kill %% # Must kill the server. -outputs "./client.out | remove_uuid" "$py/server.py.out | remove_uuid64" +outputs "./client.out | remove_uuid" "$PYTHON_EXAMPLES/request-response/server.py.out | remove_uuid64" diff --git a/qpid/cpp/examples/examples/request-response/verify_cpp_python.in b/qpid/cpp/examples/examples/request-response/verify_cpp_python.in index efffc32332..280484bd2a 100644 --- a/qpid/cpp/examples/examples/request-response/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/request-response/verify_cpp_python.in @@ -1,4 +1,4 @@ -==== ./client.out | remove_uuid +==== client.out | remove_uuid Activating response queue listener for: client Request: Twas brillig, and the slithy toves Request: Did gire and gymble in the wabe. @@ -10,6 +10,6 @@ Response: DID GIRE AND GYMBLE IN THE WABE. Response: ALL MIMSY WERE THE BOROGROVES, Response: AND THE MOME RATHS OUTGRABE. Shutting down listener for client -==== ../../../../python/examples/request-response/server.py.out | remove_uuid64 +==== server.py.out | remove_uuid64 Request server running - run your client now. (Times out after 100 seconds ...) diff --git a/qpid/cpp/examples/examples/request-response/verify_python_cpp b/qpid/cpp/examples/examples/request-response/verify_python_cpp index 11f5c4cbc4..9f3f1caaf4 100644 --- a/qpid/cpp/examples/examples/request-response/verify_python_cpp +++ b/qpid/cpp/examples/examples/request-response/verify_python_cpp @@ -1,6 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -py=../../../../python/examples/request-response -background "Request server running" ./server -clients $py/client.py +background "Waiting" ./server +clients $PYTHON_EXAMPLES/request-response/client.py kill %% # Must kill the server. -outputs "./client.py.out | remove_uuid64" " server.out | remove_uuid" +outputs "$PYTHON_EXAMPLES/request-response/client.py.out | remove_uuid64" "server.out | remove_uuid64" diff --git a/qpid/cpp/examples/examples/request-response/verify_python_cpp.in b/qpid/cpp/examples/examples/request-response/verify_python_cpp.in new file mode 100644 index 0000000000..7718d54973 --- /dev/null +++ b/qpid/cpp/examples/examples/request-response/verify_python_cpp.in @@ -0,0 +1,18 @@ +==== client.py.out | remove_uuid64 +Request: Twas brilling, and the slithy toves +Request: Did gyre and gimble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Messages queue: ReplyTo: +Response: TWAS BRILLING, AND THE SLITHY TOVES +Response: DID GYRE AND GIMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +No more messages! +==== server.out | remove_uuid64 +Activating request queue listener for: request +Waiting for requests +Request: Twas brilling, and the slithy toves (ReplyTo:) +Request: Did gyre and gimble in the wabe. (ReplyTo:) +Request: All mimsy were the borogroves, (ReplyTo:) +Request: And the mome raths outgrabe. (ReplyTo:) diff --git a/qpid/cpp/examples/verify_all b/qpid/cpp/examples/verify_all new file mode 100755 index 0000000000..e6d75929b2 --- /dev/null +++ b/qpid/cpp/examples/verify_all @@ -0,0 +1,24 @@ +#!/bin/sh +# Verify all C++/python example combinations. +# + +src=$1 ; build=$2 + +verify=./verify +qpidd=$build/src/qpidd +cpp=$build/examples/examples +python=$src/../python + +trap "$qpidd -q" exit +export QPID_PORT=`$qpidd -dp0 --data-dir ""` +export PYTHON_EXAMPLES=$python/examples +export PYTHONPATH=$python:$PYTHONPATH +export AMQP_SPEC=$src/../specs/amqp.0-10-preview.xml + +find="find $cpp" +test -d $PYTHON_EXAMPLES && find="$find $PYTHON_EXAMPLES" +find="$find -name verify" +test -d $PYTHON_EXAMPLES && \ + find="$find -o -name verify_cpp_python -o -name verify_python_cpp" +$verify `$find` + diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in index 45d67dff6e..3892b1ab40 100644 --- a/qpid/cpp/qpidc.spec.in +++ b/qpid/cpp/qpidc.spec.in @@ -5,7 +5,7 @@ Name: @PACKAGE@ Version: @VERSION@ -Release: 18%{?dist} +Release: 22%{?dist} Summary: Libraries for Qpid C++ client applications Group: System Environment/Libraries License: Apache Software License @@ -73,16 +73,16 @@ Qpid broker daemon. %setup -q %build -%configure --disable-static --without-cpg CXXFLAGS="-O3 -DNDEBUG" +%configure --disable-static --without-cpg CXXFLAGS="-g -O3 -DNDEBUG" make %{?_smp_mflags} # Remove this generated perl file, we don't need it and it upsets rpmlint. rm docs/api/html/installdox %install rm -rf %{buildroot} -make install DESTDIR=%{buildroot} -install -Dp -m0755 etc/qpidd %{buildroot}%{_initrddir}/qpidd -install -d -m0755 %{buildroot}%{_localstatedir}/qpidd +make install-strip DESTDIR=%{buildroot} +install -Dp -m0755 etc/qpidd %{buildroot}%{_initrddir}/qpidd +install -d -m0755 %{buildroot}%{_localstatedir}/lib/qpidd rm -f %{buildroot}%_libdir/*.a rm -f %{buildroot}%_libdir/*.la @@ -99,7 +99,6 @@ make check %_libdir/libqpidcommon.so.0.1.0 %_libdir/libqpidclient.so.0 %_libdir/libqpidclient.so.0.1.0 -%_localstatedir/qpidd %config(noreplace) %_sysconfdir/qpidd.conf %files devel @@ -123,6 +122,7 @@ make check %_libdir/libqpidcluster.so.0.1.0 %_sbindir/%{qpidd} %{_initrddir}/%{qpidd} +%_localstatedir/lib/qpidd %doc %_mandir/man1/%{qpidd}.* %files -n %{qpidd}-devel @@ -156,6 +156,14 @@ fi /sbin/ldconfig %changelog +* Tue Feb 12 2008 Alan Conway <aconway@redhat.com> - 0.2-22 +- Added -g to compile flags for debug symbols. + +* Tue Feb 12 2008 Alan Conway <aconway@redhat.com> - 0.2-21 +- Create /var/lib/qpidd correctly. + +* Mon Feb 11 2008 Rafael Schloming <rafaels@redhat.com> - 0.2-20 +- bumped for Beta 3 * Mon Jan 21 2008 Gordon Sim <gsim@redhat.com> - 0.2-18 - bump up rev for recent changes to plugin modules & mgmt diff --git a/qpid/cpp/src/qpid/DataDir.cpp b/qpid/cpp/src/qpid/DataDir.cpp index baf536c109..abf9b061e4 100644 --- a/qpid/cpp/src/qpid/DataDir.cpp +++ b/qpid/cpp/src/qpid/DataDir.cpp @@ -20,6 +20,7 @@ #include "Exception.h" #include "DataDir.h" +#include "qpid/log/Statement.h" #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> @@ -32,7 +33,10 @@ DataDir::DataDir (std::string path) : dirPath (path) { if (!enabled) + { + QPID_LOG (info, "No data directory - Disabling persistent configuration"); return; + } const char *cpath = dirPath.c_str (); struct stat s; @@ -55,14 +59,20 @@ DataDir::DataDir (std::string path) : oss << "Error locking data directory: errno=" << errno; throw Exception (oss.str ()); } + + QPID_LOG (info, "Locked data directory: " << dirPath); } DataDir::~DataDir () { + if (dirPath.empty ()) + return; + std::string lockFile (dirPath); lockFile = lockFile + "/lock"; ::unlink (lockFile.c_str ()); + QPID_LOG (info, "Unlocked data directory: " << dirPath); } } // namespace qpid diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 1d55db0c0f..117a93b571 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -60,6 +60,7 @@ namespace broker { Broker::Options::Options(const std::string& name) : qpid::Options(name), + noDataDir(0), dataDir("/var/lib/qpidd"), port(DEFAULT_PORT), workerThreads(5), @@ -75,6 +76,8 @@ Broker::Options::Options(const std::string& name) : addOptions() ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") + ("no-data-dir", optValue(noDataDir), + "Don't use a data directory. No persistent configuration will be loaded or stored") ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") ("worker-threads", optValue(workerThreads, "N"), @@ -103,7 +106,7 @@ const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : config(conf), store(0), - dataDir(conf.dataDir), + dataDir(conf.noDataDir ? std::string () : conf.dataDir), factory(*this), sessionManager(conf.ack) { diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 852cb2da42..68dbf570f0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -61,6 +61,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M struct Options : public qpid::Options { Options(const std::string& name="Broker Options"); + bool noDataDir; std::string dataDir; uint16_t port; int workerThreads; diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index 72df9c44e9..9577853de4 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -36,9 +36,9 @@ ConnectionFactory::~ConnectionFactory() qpid::sys::ConnectionInputHandler* ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out, - const qpid::sys::Socket& s) + const std::string& id) { - return new Connection(out, broker, s.getPeerAddress()); + return new Connection(out, broker, id); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.h b/qpid/cpp/src/qpid/broker/ConnectionFactory.h index 23fba5c1ab..53fb160279 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.h +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.h @@ -32,8 +32,8 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory public: ConnectionFactory(Broker& b); - virtual qpid::sys::ConnectionInputHandler* create - (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s); + virtual qpid::sys::ConnectionInputHandler* + create(qpid::sys::ConnectionOutputHandler* out, const std::string& id); virtual ~ConnectionFactory(); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 1021cca1b1..80fafe0386 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -147,12 +147,14 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, break; case management::Session::METHOD_CLOSE : + /* if (handler != 0) { handler->getConnection().closeChannel(handler->getChannel()); } status = Manageable::STATUS_OK; break; + */ case management::Session::METHOD_SOLICITACK : case management::Session::METHOD_RESETLIFESPAN : diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index 0783d5bc55..8df4637c88 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -75,11 +75,18 @@ void Dispatcher::run() if (content->isA<MessageTransferBody>()) { Message msg(*content, session); Subscriber::shared_ptr listener = find(msg.getDestination()); - assert(listener); - listener->received(msg); + if (!listener) { + QPID_LOG(error, "No listener found for destination " << msg.getDestination()); + } else { + assert(listener); + listener->received(msg); + } } else { - assert (handler.get()); - handler->handle(*content); + if (handler.get()) { + handler->handle(*content); + } else { + QPID_LOG(error, "No handler found for " << *(content->getMethod())); + } } } } catch (const ClosedException&) { diff --git a/qpid/cpp/src/qpid/log/Statement.cpp b/qpid/cpp/src/qpid/log/Statement.cpp index 2935de9071..9b6fb7feaf 100644 --- a/qpid/cpp/src/qpid/log/Statement.cpp +++ b/qpid/cpp/src/qpid/log/Statement.cpp @@ -22,6 +22,7 @@ #include <stdexcept> #include <algorithm> #include <syslog.h> +#include <ctype.h> namespace qpid { namespace log { @@ -29,21 +30,21 @@ namespace log { namespace { using namespace std; -struct IsControl { bool operator()(unsigned char c) { return c < 32; } }; +struct NonPrint { bool operator()(unsigned char c) { return !isprint(c); } }; -bool isClean(const std::string& str) { - return std::find_if(str.begin(), str.end(), IsControl()) == str.end(); -} +char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; std::string quote(const std::string& str) { - IsControl isControl; - size_t n = std::count_if(str.begin(), str.end(), isControl); + NonPrint nonPrint; + size_t n = std::count_if(str.begin(), str.end(), nonPrint); + if (n==0) return str; std::string ret; - ret.reserve(str.size()+n); // Avoid extra allocations. + ret.reserve(str.size()+2*n); // Avoid extra allocations. for (string::const_iterator i = str.begin(); i != str.end(); ++i) { - if (isControl(*i)) { - ret.push_back('^'); - ret.push_back((*i)+64); + if (nonPrint(*i)) { + ret.push_back('\\'); + ret.push_back(hex[((*i) >> 4)&0xf]); + ret.push_back(hex[(*i) & 0xf]); } else ret.push_back(*i); } @@ -53,7 +54,7 @@ std::string quote(const std::string& str) { } void Statement::log(const std::string& message) { - Logger::instance().log(*this, isClean(message) ? message : quote(message)); + Logger::instance().log(*this, quote(message)); } Statement::Initializer::Initializer(Statement& s) : statement(s) { diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 39fab270af..709f2a0ecd 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -74,16 +74,16 @@ void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, } void ManagementAgent::addObject (ManagementObject::shared_ptr object, - uint64_t persistenceId, - uint64_t idOffset) + uint64_t /*persistenceId*/, + uint64_t /*idOffset*/) { RWlock::ScopedWlock writeLock (userLock); uint64_t objectId; - if (persistenceId == 0) +// if (persistenceId == 0) objectId = nextObjectId++; - else - objectId = 0x8000000000000000ULL | (persistenceId + idOffset); +// else +// objectId = 0x8000000000000000ULL | (persistenceId + idOffset); object->setObjectId (objectId); managementObjects[objectId] = object; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 65d805e492..9fd32add72 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -140,7 +140,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s); + ConnectionInputHandler* handler = f->create(async, s.getPeerAddress()); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -194,7 +194,7 @@ void AsynchIOAcceptor::connect(const std::string& host, int16_t port, Connection socket->connect(host, port); AsynchIOHandler* async = new AsynchIOHandler; async->setClient(); - ConnectionInputHandler* handler = f->create(async, *socket); + ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress()); AsynchIO* aio = new AsynchIO(*socket, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h index 4a90f8b736..2b309b5758 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h +++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h @@ -22,7 +22,7 @@ #define _ConnectionInputHandlerFactory_ #include <boost/noncopyable.hpp> -#include "qpid/sys/Socket.h" +#include <string> namespace qpid { namespace sys { @@ -37,7 +37,13 @@ class ConnectionInputHandler; class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0; + /** + *@param out handler for connection output. + *@param id identify the connection for management purposes. + */ + virtual ConnectionInputHandler* create(ConnectionOutputHandler* out, + const std::string& id) = 0; + virtual ~ConnectionInputHandlerFactory(){} }; diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index 6c20ebc58f..08b907cbe2 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -48,9 +48,9 @@ struct ModuleOptions : public qpid::Options { ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false) { addOptions() - ("load-dir", optValue(loadDir, "DIR"), "Load all modules from this directory") - ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded") - ("no-modules", optValue(noLoad), "Don't load any modules"); + ("module-dir", optValue(loadDir, "DIR"), "Load all .so modules in this directory") + ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded") + ("no-module-dir", optValue(noLoad), "Don't load modules from module directory"); } }; @@ -122,7 +122,7 @@ auto_ptr<QpiddOptions> options; void shutdownHandler(int /*signal*/){ // Note: do not call any async-signal unsafe functions here. - // Do any extra shtudown actions in main() after broker->run() + // Do any extra shutdown actions in main() after broker->run() brokerPtr->shutdown(); } @@ -140,6 +140,7 @@ struct QpiddDaemon : public Daemon { uint16_t port=brokerPtr->getPort(); ready(port); // Notify parent. brokerPtr->run(); + brokerPtr.reset(); } }; @@ -188,12 +189,13 @@ int main(int argc, char* argv[]) // be re-parsed with all of the module-supplied options. bootOptions.parse (argc, argv, bootOptions.common.config, true); qpid::log::Logger::instance().configure(bootOptions.log, argv[0]); - if (!bootOptions.module.noLoad) { - for (vector<string>::iterator iter = bootOptions.module.load.begin(); - iter != bootOptions.module.load.end(); - iter++) - tryShlib (iter->data(), false); + for (vector<string>::iterator iter = bootOptions.module.load.begin(); + iter != bootOptions.module.load.end(); + iter++) + tryShlib (iter->data(), false); + + if (!bootOptions.module.noLoad) { bool isDefault = defaultPath == bootOptions.module.loadDir; loadModuleDir (bootOptions.module.loadDir, isDefault); } diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 60cfe04510..87a4f59999 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -21,6 +21,7 @@ #include "unit_test.h" #include "BrokerFixture.h" #include "qpid/client/Dispatcher.h" +#include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/client/Session_0_10.h" @@ -38,6 +39,7 @@ using namespace qpid::client; using namespace qpid::client::arg; using namespace qpid::framing; using namespace qpid; +using qpid::sys::Monitor; using std::string; using std::cout; using std::endl; @@ -67,6 +69,27 @@ struct DummyListener : public sys::Runnable, public MessageListener { } }; +struct SimpleListener : public MessageListener +{ + Monitor lock; + std::vector<Message> messages; + + void received(Message& msg) + { + Monitor::ScopedLock l(lock); + messages.push_back(msg); + lock.notifyAll(); + } + + void waitFor(const uint n) + { + Monitor::ScopedLock l(lock); + while (messages.size() < n) { + lock.wait(); + } + } +}; + struct ClientSessionFixture : public ProxySessionFixture { void declareSubscribe(const string& q="my-queue", @@ -167,22 +190,31 @@ BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture) BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); } +/** + * Currently broken due to a deadlock in SessionCore + * BOOST_FIXTURE_TEST_CASE(testSendToSelf, SessionFixture) { - // https://bugzilla.redhat.com/show_bug.cgi?id=410551 // Deadlock if SubscriptionManager run() concurrent with session ack. - LocalQueue myq; + SimpleListener mylistener; session.queueDeclare(queue="myq", exclusive=true, autoDelete=true); - subs.subscribe(myq, "myq"); + subs.subscribe(mylistener, "myq", "myq"); + sys::Thread runner(subs);//start dispatcher thread string data("msg"); Message msg(data, "myq"); - const int count=100; // Verified with count=100000 in a loop. - for (int i = 0; i < count; ++i) + const uint count=10000; + for (uint i = 0; i < count; ++i) { session.messageTransfer(content=msg); - for (int j = 0; j < count; ++j) { - Message m=myq.pop(); - BOOST_CHECK_EQUAL(m.getData(), data); + } + mylistener.waitFor(count); + subs.cancel("myq"); + subs.stop(); + session.close(); + BOOST_CHECK_EQUAL(mylistener.messages.size(), count); + for (uint j = 0; j < count; ++j) { + BOOST_CHECK_EQUAL(mylistener.messages[j].getData(), data); } } +*/ QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/logging.cpp b/qpid/cpp/src/tests/logging.cpp index 4969c3d6a9..2c0ed08105 100644 --- a/qpid/cpp/src/tests/logging.cpp +++ b/qpid/cpp/src/tests/logging.cpp @@ -367,7 +367,7 @@ BOOST_AUTO_TEST_CASE(testLoggerConfigure) { unlink("logging.tmp"); } -BOOST_AUTO_TEST_CASE(testQuoteControlChars) { +BOOST_AUTO_TEST_CASE(testQuoteNonPrintable) { Logger& l=Logger::instance(); l.clear(); Options opts; @@ -375,13 +375,13 @@ BOOST_AUTO_TEST_CASE(testQuoteControlChars) { opts.outputs.push_back("logging.tmp"); opts.time=false; l.configure(opts, "test"); - char s[] = "null\0tab\tspace newline\nret\r"; + char s[] = "null\0tab\tspace newline\nret\r\x80\x99\xff"; string str(s, sizeof(s)); QPID_LOG(critical, str); ifstream log("logging.tmp"); string line; getline(log, line); - string expect="critical null^@tab^Ispace newline^Jret^M^@"; + string expect="critical null\\00tab\\09space newline\\0Aret\\0D\\80\\99\\FF\\00"; BOOST_CHECK_EQUAL(expect, line); log.close(); unlink("logging.tmp"); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 6c7575429a..0e36a09bbd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -231,6 +231,14 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append('?'); + if (_routingKey != null) + { + sb.append(BindingURL.OPTION_ROUTING_KEY); + sb.append("='"); + sb.append(_routingKey).append("'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + if (_isDurable) { sb.append(BindingURL.OPTION_DURABLE); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index 51a052bed5..ad47e14fde 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -16,6 +16,7 @@ import org.apache.qpidity.nclient.impl.ClientSessionDelegate; import org.apache.qpidity.transport.Channel; import org.apache.qpidity.transport.Connection; import org.apache.qpidity.transport.ConnectionClose; +import org.apache.qpidity.transport.ConnectionCloseOk; import org.apache.qpidity.transport.ConnectionDelegate; import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.ProtocolHeader; @@ -33,6 +34,9 @@ public class Client implements org.apache.qpidity.nclient.Connection private ClosedListener _closedListner; private final Lock _lock = new ReentrantLock(); private static Logger _logger = LoggerFactory.getLogger(Client.class); + private Condition closeOk; + private boolean closed = false; + /** * * @return returns a new connection to the broker. @@ -45,6 +49,7 @@ public class Client implements org.apache.qpidity.nclient.Connection public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException { Condition negotiationComplete = _lock.newCondition(); + closeOk = _lock.newCondition(); _lock.lock(); ConnectionDelegate connectionDelegate = new ConnectionDelegate() @@ -76,6 +81,21 @@ public class Client implements org.apache.qpidity.nclient.Connection } } + @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct) + { + _lock.lock(); + try + { + closed = true; + this.receivedClose = true; + closeOk.signalAll(); + } + finally + { + _lock.unlock(); + } + } + @Override public void connectionClose(Channel context, ConnectionClose connectionClose) { ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode()); @@ -179,6 +199,24 @@ public class Client implements org.apache.qpidity.nclient.Connection { Channel ch = _conn.getChannel(0); ch.connectionClose(0, "client is closing", 0, 0); + _lock.lock(); + try + { + try { + while (!closed) + { + closeOk.await(); + } + } + catch (InterruptedException e) + { + // do nothing + } + } + finally + { + _lock.unlock(); + } _conn.close(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java new file mode 100644 index 0000000000..20443944d2 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.testutil.QpidTestCase; +import org.apache.qpidity.transport.util.Logger; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * ConnectionCloseTest + * + */ + +public class ConnectionCloseTest extends QpidTestCase +{ + + private static final Logger log = Logger.get(ConnectionCloseTest.class); + + public void testSendReceiveClose() throws Exception + { + for (int i = 0; i < 500; i++) + { + if ((i % 10) == 0) + { + log.warn("%d messages sent and received", i); + } + + Connection receiver = getConnection(); + receiver.start(); + Session rssn = receiver.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = rssn.createQueue("connection-close-test-queue"); + MessageConsumer cons = rssn.createConsumer(queue); + + Connection sender = getConnection(); + sender.start(); + Session sssn = sender.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = sssn.createProducer(queue); + prod.send(sssn.createTextMessage("test")); + sender.close(); + + TextMessage m = (TextMessage) cons.receive(2000); + assertNotNull("message was lost", m); + assertEquals(m.getText(), "test"); + receiver.close(); + } + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java index ba4ebae258..7c03e16258 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java @@ -28,7 +28,7 @@ import javax.jms.*; * * */ -public abstract class AbstractXATest extends QpidTestCase +public abstract class AbstractXATestCase extends QpidTestCase { protected static final String _sequenceNumberPropertyName = "seqNumber"; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java index cd5b228f76..a703432efb 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java @@ -26,10 +26,10 @@ import junit.framework.TestSuite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QueueTests extends AbstractXATest +public class QueueTest extends AbstractXATestCase { /* this clas logger */ - private static final Logger _logger = LoggerFactory.getLogger(QueueTests.class); + private static final Logger _logger = LoggerFactory.getLogger(QueueTest.class); /** * the queue use by all the tests @@ -66,7 +66,7 @@ public class QueueTests extends AbstractXATest */ public static TestSuite getSuite() { - return new TestSuite(QueueTests.class); + return new TestSuite(QueueTest.class); } /** diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java index 30b3b09449..5ea059b166 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java @@ -33,10 +33,10 @@ import org.slf4j.LoggerFactory; * * */ -public class TopicTests extends AbstractXATest +public class TopicTest extends AbstractXATestCase { /* this clas logger */ - private static final Logger _logger = LoggerFactory.getLogger(TopicTests.class); + private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class); /** * the topic use by all the tests @@ -81,7 +81,7 @@ public class TopicTests extends AbstractXATest */ public static TestSuite getSuite() { - return new TestSuite(TopicTests.class); + return new TestSuite(TopicTest.class); } /** @@ -1671,12 +1671,12 @@ public class TopicTests extends AbstractXATest long seq = 0; try { - seq = message.getLongProperty(TopicTests._sequenceNumberPropertyName); + seq = message.getLongProperty(TopicTest._sequenceNumberPropertyName); } catch (JMSException e) { e.printStackTrace(); - TopicTests.failure(); + TopicTest.failure(); _lock.set(false); synchronized (_lock) { @@ -1686,7 +1686,7 @@ public class TopicTests extends AbstractXATest if (seq != _counter) { System.out.println("received message " + seq + " expected " + _counter); - TopicTests.failure(); + TopicTest.failure(); _lock.set(false); synchronized (_lock) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java index e7c09fca65..084137b38e 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java @@ -59,16 +59,27 @@ public class QpidTestCase extends TestCase private InitialContext _initialContext; private AMQConnectionFactory _connectionFactory; - protected void setUp() throws Exception + public void runBare() throws Throwable { - super.setUp(); + String name = getClass().getSimpleName() + "." + getName(); + _logger.info("========== start " + name + " =========="); startBroker(); - } - - protected void tearDown() throws Exception - { - stopBroker(); - super.tearDown(); + try + { + super.runBare(); + } + finally + { + try + { + stopBroker(); + } + catch (Exception e) + { + _logger.error("exception stopping broker", e); + } + _logger.info("========== stop " + name + " =========="); + } } public void startBroker() throws Exception @@ -102,7 +113,8 @@ public class QpidTestCase extends TestCase } catch (IOException e) { - _logger.info("redirector", e); + // this seems to happen regularly even when + // exits are normal } } }.start(); @@ -190,6 +202,11 @@ public class QpidTestCase extends TestCase return _connectionFactory; } + public Connection getConnection() throws Exception + { + return getConnection("guest", "guest"); + } + /** * Get a connection (remote or in-VM) * diff --git a/qpid/java/common.xml b/qpid/java/common.xml index 8e44693c63..cfaab2cc84 100644 --- a/qpid/java/common.xml +++ b/qpid/java/common.xml @@ -198,6 +198,13 @@ that modules build root from underneath the project build root: ${build}/<module> + + ant clean-results + + The clean-results target removes all test output from the test + results directory: + + ${build.results} </echo> </target> diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 08adb99c47..9f0af7cfa1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -63,6 +63,7 @@ public class Session extends Invoker private long commandsIn = 0; // completed incoming commands private final RangeSet processed = new RangeSet(); + private long processedMark = -1; private Range syncPoint = null; // outgoing command count @@ -132,25 +133,25 @@ public class Session extends Invoker public void flushProcessed() { - long mark = -1; boolean first = true; RangeSet rest = new RangeSet(); synchronized (processed) { for (Range r: processed) { - if (first) + if (first && r.includes(processedMark)) { - first = false; - mark = r.getUpper(); + processedMark = r.getUpper(); } else { rest.add(r); } + + first = false; } } - executionComplete(mark, rest); + executionComplete(processedMark, rest); } void syncPoint() diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java index 17ae7ea0f7..f0f5731037 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java @@ -51,16 +51,23 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> { throw new TransportException("attempted to write to a closed socket"); } - lastWrite = session.write(ByteBuffer.wrap(buf)); + + synchronized (this) + { + lastWrite = session.write(ByteBuffer.wrap(buf)); + } } - public void close() + public synchronized void close() { // MINA will sometimes throw away in-progress writes when you // ask it to close - if (lastWrite != null) + synchronized (this) { - lastWrite.join(); + if (lastWrite != null) + { + lastWrite.join(); + } } CloseFuture closed = session.close(); closed.join(); diff --git a/qpid/java/module.xml b/qpid/java/module.xml index d80a2c0f56..af943e3b1f 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -176,7 +176,7 @@ <target name="test" depends="compile-tests" if="module.test.src.exists" description="execute unit tests"> - <junit fork="yes" haltonfailure="no" printsummary="on" timeout="90000"> + <junit fork="yes" haltonfailure="no" printsummary="on" timeout="600000"> <sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/> <sysproperty key="root.logging.level" value="${root.logging.level}"/> diff --git a/qpid/python/examples/direct/verify.in b/qpid/python/examples/direct/verify.in index e0dca33039..5e691619d9 100644 --- a/qpid/python/examples/direct/verify.in +++ b/qpid/python/examples/direct/verify.in @@ -1,6 +1,6 @@ -==== ./declare_queues.py.out -==== ./direct_producer.py.out -==== ./direct_consumer.py.out +==== declare_queues.py.out +==== direct_producer.py.out +==== direct_consumer.py.out message 0 message 1 message 2 diff --git a/qpid/python/examples/fanout/verify.in b/qpid/python/examples/fanout/verify.in index c625c30773..a5f57f0b4b 100644 --- a/qpid/python/examples/fanout/verify.in +++ b/qpid/python/examples/fanout/verify.in @@ -1,6 +1,6 @@ -==== ./declare_queues.py.out -==== ./fanout_producer.py.out -==== ./fanout_consumer.py.out +==== declare_queues.py.out +==== fanout_producer.py.out +==== fanout_consumer.py.out message 0 message 1 message 2 diff --git a/qpid/python/examples/pubsub/verify.in b/qpid/python/examples/pubsub/verify.in index 19dcf88312..69de08d17c 100644 --- a/qpid/python/examples/pubsub/verify.in +++ b/qpid/python/examples/pubsub/verify.in @@ -1,4 +1,4 @@ -==== ./topic_publisher.py.out +==== topic_publisher.py.out ==== topic_subscriber.py.out | remove_uuid64 | sort message 0 message 0 diff --git a/qpid/python/examples/request-response/verify.in b/qpid/python/examples/request-response/verify.in index c02a423bcb..f681253b3c 100644 --- a/qpid/python/examples/request-response/verify.in +++ b/qpid/python/examples/request-response/verify.in @@ -1,4 +1,4 @@ -==== ./client.py.out | remove_uuid64 +==== client.py.out | remove_uuid64 Request: Twas brilling, and the slithy toves Request: Did gyre and gimble in the wabe. Request: All mimsy were the borogroves, @@ -9,6 +9,6 @@ Response: DID GYRE AND GIMBLE IN THE WABE. Response: ALL MIMSY WERE THE BOROGROVES, Response: AND THE MOME RATHS OUTGRABE. No more messages! -==== server.py.out | remove_uuid64 +==== server.py.out | remove_uuid64 Request server running - run your client now. (Times out after 100 seconds ...) diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index 9ec1cc270c..c251e6aca0 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,7 +47,7 @@ class MessageTests(TestBase): msg = included.get(timeout=1) self.assertEqual("consume_no_local", msg.content.body) try: - excluded.get(timeout=1) + excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") except Empty: None @@ -59,9 +59,9 @@ class MessageTests(TestBase): could be left on the queue, possibly never being consumed (this is the case for example in the qpid JMS mapping of topics). This test excercises a Qpid C++ broker hack that - deletes such messages. + deletes such messages. """ - + channel = self.channel #setup: channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) @@ -84,7 +84,7 @@ class MessageTests(TestBase): msg = excluded.get(timeout=1) self.assertEqual("foreign", msg.content.body) try: - excluded.get(timeout=1) + excluded.get(timeout=1) self.fail("Received extra message") except Empty: None #check queue is empty @@ -107,7 +107,7 @@ class MessageTests(TestBase): except Closed, e: self.assertChannelException(403, e.args[0]) - #open new channel and cleanup last consumer: + #open new channel and cleanup last consumer: channel = self.client.channel(2) channel.session_open() @@ -173,7 +173,7 @@ class MessageTests(TestBase): msg = myqueue.get(timeout=1) self.assertEqual("One", msg.content.body) try: - msg = myqueue.get(timeout=1) + msg = myqueue.get(timeout=1) self.fail("Got message after cancellation: " + msg) except Empty: None @@ -188,7 +188,7 @@ class MessageTests(TestBase): """ channel = self.channel channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True) - + self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") @@ -197,13 +197,13 @@ class MessageTests(TestBase): channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) - + msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) msg3 = queue.get(timeout=1) msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - + self.assertEqual("One", msg1.content.body) self.assertEqual("Two", msg2.content.body) self.assertEqual("Three", msg3.content.body) @@ -214,10 +214,10 @@ class MessageTests(TestBase): msg4.complete(cumulative=False) channel.message_recover(requeue=False) - + msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) - + self.assertEqual("Three", msg3b.content.body) self.assertEqual("Five", msg5b.content.body) @@ -236,7 +236,7 @@ class MessageTests(TestBase): channel.queue_bind(exchange="amq.fanout", queue="queue-a") channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) channel.queue_bind(exchange="amq.fanout", queue="queue-b") - + self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1) self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0) confirmed = self.client.queue("confirmed") @@ -246,10 +246,10 @@ class MessageTests(TestBase): for d in data: channel.message_transfer(destination="amq.fanout", content=Content(body=d)) - for q in [confirmed, unconfirmed]: + for q in [confirmed, unconfirmed]: for d in data: self.assertEqual(d, q.get(timeout=1).content.body) - self.assertEmpty(q) + self.assertEmpty(q) channel.message_recover(requeue=False) @@ -265,7 +265,7 @@ class MessageTests(TestBase): data.remove(msg.content.body) msg.complete(cumulative=False) channel.message_recover(requeue=False) - + def test_recover_requeue(self): """ @@ -273,7 +273,7 @@ class MessageTests(TestBase): """ channel = self.channel channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True) - + self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") @@ -282,13 +282,13 @@ class MessageTests(TestBase): channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) - + msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) msg3 = queue.get(timeout=1) msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - + self.assertEqual("One", msg1.content.body) self.assertEqual("Two", msg2.content.body) self.assertEqual("Three", msg3.content.body) @@ -307,10 +307,10 @@ class MessageTests(TestBase): self.subscribe(queue="test-requeue", destination="consumer_tag") queue2 = self.client.queue("consumer_tag") - + msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) - + self.assertEqual("Three", msg3b.content.body) self.assertEqual("Five", msg5b.content.body) @@ -327,8 +327,8 @@ class MessageTests(TestBase): extra = queue.get(timeout=1) self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None - - + + def test_qos_prefetch_count(self): """ Test that the prefetch count specified is honoured @@ -370,7 +370,7 @@ class MessageTests(TestBase): except Empty: None - + def test_qos_prefetch_size(self): """ Test that the prefetch size specified is honoured @@ -448,7 +448,7 @@ class MessageTests(TestBase): #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) - + #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit @@ -458,7 +458,7 @@ class MessageTests(TestBase): for i in range(1, 6): self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) - + #increase credit again and check more are received for i in range(6, 11): channel.message_flow(unit = 0, value = 1, destination = "c") @@ -512,7 +512,7 @@ class MessageTests(TestBase): #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) - + #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit @@ -523,7 +523,7 @@ class MessageTests(TestBase): msg = q.get(timeout = 1) self.assertDataEquals(channel, msg, "Message %d" % i) self.assertEmpty(q) - + #acknowledge messages and check more are received msg.complete(cumulative=True) for i in range(6, 11): @@ -560,7 +560,7 @@ class MessageTests(TestBase): msgs.append(msg) self.assertDataEquals(channel, msg, "abcdefgh") self.assertEmpty(q) - + #ack each message individually and check more are received for i in range(5): msg = msgs.pop() @@ -650,7 +650,7 @@ class MessageTests(TestBase): channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = self.client.queue("a") first = queue.get(timeout = 1) - for i in range (2, 10): + for i in range (2, 10): self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) last = queue.get(timeout = 1) self.assertEmpty(queue) @@ -672,7 +672,7 @@ class MessageTests(TestBase): channel.message_flow(unit = 0, value = 10, destination = "a") channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = self.client.queue("a") - for i in range (1, 11): + for i in range (1, 11): self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body) self.assertEmpty(queue) @@ -683,14 +683,14 @@ class MessageTests(TestBase): self.assertEmpty(queue) def test_subscribe_not_acquired_2(self): - channel = self.channel + channel = self.channel #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) - #consume some of them + #consume some of them channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) channel.message_flow_mode(mode = 0, destination = "a") channel.message_flow(unit = 0, value = 5, destination = "a") |