summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-02-13 11:44:34 +0000
committerAidan Skinner <aidan@apache.org>2008-02-13 11:44:34 +0000
commitf3f64b6b056c866558eef7d9dd1944b92eb03fc1 (patch)
treea062d8acd67dafb97c4aa25e1ca5ffc9aaf2936b /qpid
parenteff51b9febb3dc299e206ac339a84f28d5e96531 (diff)
downloadqpid-python-f3f64b6b056c866558eef7d9dd1944b92eb03fc1.tar.gz
Merged revisions 619974,620011,620014,620016-620017,620479,620481,620566,620584,620619,620622,620831,620854,620861-620862,620889,627128,627133,627141,627153-627155,627157,627171,627187 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk ........ r619974 | rhs | 2008-02-08 18:30:04 +0000 (Fri, 08 Feb 2008) | 1 line made xa tests run, and made QpidTestCase more robust ........ r620011 | rhs | 2008-02-08 22:05:50 +0000 (Fri, 08 Feb 2008) | 1 line whitespace cleanup ........ r620014 | aconway | 2008-02-08 22:23:23 +0000 (Fri, 08 Feb 2008) | 7 lines cpp/examples/direct, fanout: Converted listener.cpp to SubscriptionManager. All python/cpp combos run as part of cpp/examples make check. Fixed problems with verify scripts and VPATH builds. ........ r620016 | cctrieloff | 2008-02-08 22:24:33 +0000 (Fri, 08 Feb 2008) | 2 lines yum correction ........ r620017 | aconway | 2008-02-08 22:27:38 +0000 (Fri, 08 Feb 2008) | 10 lines From Ted Ross, https://issues.apache.org/jira/browse/QPID-782 The attached patch makes the following changes: The --load-dir option has been renamed to --module-dir The --no-modules option and been replaced by the --no-module-dir option. This new option suppresses ONLY the loading of modules from the directory. The --no-data-dir option has been added to suppress the use of a data directory. Logging has been added for data directory lock and unlock. ........ r620479 | gsim | 2008-02-11 13:00:45 +0000 (Mon, 11 Feb 2008) | 3 lines Check valid listener (or handler) exist and log error if not. See QPID-783. ........ r620481 | gsim | 2008-02-11 13:10:38 +0000 (Mon, 11 Feb 2008) | 3 lines Added a test (currently disabled) that highlights a deadlock in the client when commands are sent to the broker concurrently with acks (e.g. when the dispatcher thread is running with auto-acking and messages are sent on another thread). ........ r620566 | rhs | 2008-02-11 18:23:12 +0000 (Mon, 11 Feb 2008) | 1 line bumped release for Beta 3 ........ r620584 | rhs | 2008-02-11 19:21:01 +0000 (Mon, 11 Feb 2008) | 1 line fixed computation of ranged acks, fix needed for failing RecoverTest ........ r620619 | aconway | 2008-02-11 21:43:32 +0000 (Mon, 11 Feb 2008) | 1 line Fix errors in verify scripts. ........ r620622 | aconway | 2008-02-11 21:50:17 +0000 (Mon, 11 Feb 2008) | 1 line Remove dependency on sys::Socket for management ID of connections. ........ r620831 | rhs | 2008-02-12 15:44:14 +0000 (Tue, 12 Feb 2008) | 1 line added help text for the clean-results target ........ r620854 | gsim | 2008-02-12 16:35:45 +0000 (Tue, 12 Feb 2008) | 3 lines Explicitly reset shared pointer; brokers destructor not called if started through -d otherwise it seems... ........ r620861 | gsim | 2008-02-12 16:54:05 +0000 (Tue, 12 Feb 2008) | 3 lines Fixed typo ........ r620862 | aconway | 2008-02-12 16:54:42 +0000 (Tue, 12 Feb 2008) | 2 lines Create /var/lib/qpidd correctly. ........ r620889 | aconway | 2008-02-12 18:04:11 +0000 (Tue, 12 Feb 2008) | 1 line Quote all non-printable ASCII characters (not just control characters) ........ r627128 | aconway | 2008-02-12 21:39:55 +0000 (Tue, 12 Feb 2008) | 1 line Create a tar file of verify scripts suitable for untarring into and installed examples directory. ........ r627133 | aconway | 2008-02-12 21:48:52 +0000 (Tue, 12 Feb 2008) | 2 lines Add -g to build flags to get debug info. ........ r627141 | aconway | 2008-02-12 21:57:36 +0000 (Tue, 12 Feb 2008) | 2 lines Fix verify error in Makefile.am ........ r627153 | rhs | 2008-02-12 22:21:20 +0000 (Tue, 12 Feb 2008) | 1 line increased the test timeout ........ r627154 | rhs | 2008-02-12 22:22:20 +0000 (Tue, 12 Feb 2008) | 1 line synchronize access to lastWrite future ........ r627155 | rhs | 2008-02-12 22:23:02 +0000 (Tue, 12 Feb 2008) | 1 line added default getConnection() ........ r627157 | rhs | 2008-02-12 22:26:26 +0000 (Tue, 12 Feb 2008) | 1 line added a test for message send followed by immediate connection close; fixed connection close handshaking ........ r627171 | aconway | 2008-02-12 23:09:06 +0000 (Tue, 12 Feb 2008) | 6 lines Patches from Ted Ross: Fix for bignumber problem in the management console. Fix for broker crash when sessions are closed via management. ........ r627187 | rhs | 2008-02-12 23:52:01 +0000 (Tue, 12 Feb 2008) | 1 line applied patch from rajith to fix reply-to ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@627359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rwxr-xr-xqpid/bin/verify10
-rwxr-xr-xqpid/bin/verify_all12
-rw-r--r--qpid/cpp/README2
-rw-r--r--qpid/cpp/examples/Makefile.am69
-rw-r--r--qpid/cpp/examples/examples/direct/listener.cpp86
-rw-r--r--qpid/cpp/examples/examples/direct/verify.in9
-rw-r--r--qpid/cpp/examples/examples/direct/verify_cpp_python2
-rw-r--r--qpid/cpp/examples/examples/direct/verify_cpp_python.in6
-rw-r--r--qpid/cpp/examples/examples/direct/verify_python_cpp2
-rw-r--r--qpid/cpp/examples/examples/direct/verify_python_cpp.in9
-rw-r--r--qpid/cpp/examples/examples/fanout/declare_queues.cpp17
-rw-r--r--qpid/cpp/examples/examples/fanout/fanout_producer.cpp8
-rw-r--r--qpid/cpp/examples/examples/fanout/listener.cpp86
-rw-r--r--qpid/cpp/examples/examples/fanout/verify.in9
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_cpp_python2
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_cpp_python.in6
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_python_cpp4
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_python_cpp.in15
-rw-r--r--qpid/cpp/examples/examples/pub-sub/verify.in2
-rw-r--r--qpid/cpp/examples/examples/pub-sub/verify_cpp_python2
-rw-r--r--qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in4
-rw-r--r--qpid/cpp/examples/examples/pub-sub/verify_python_cpp2
-rw-r--r--qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in2
-rw-r--r--qpid/cpp/examples/examples/request-response/verify2
-rw-r--r--qpid/cpp/examples/examples/request-response/verify.in4
-rw-r--r--qpid/cpp/examples/examples/request-response/verify_cpp_python5
-rw-r--r--qpid/cpp/examples/examples/request-response/verify_cpp_python.in4
-rw-r--r--qpid/cpp/examples/examples/request-response/verify_python_cpp7
-rw-r--r--qpid/cpp/examples/examples/request-response/verify_python_cpp.in18
-rwxr-xr-xqpid/cpp/examples/verify_all24
-rw-r--r--qpid/cpp/qpidc.spec.in20
-rw-r--r--qpid/cpp/src/qpid/DataDir.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp15
-rw-r--r--qpid/cpp/src/qpid/log/Statement.cpp23
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp10
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp4
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h10
-rw-r--r--qpid/cpp/src/qpidd.cpp20
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp48
-rw-r--r--qpid/cpp/src/tests/logging.cpp6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java38
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java72
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java (renamed from qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java)2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java (renamed from qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java)6
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java (renamed from qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java)12
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java35
-rw-r--r--qpid/java/common.xml7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java15
-rw-r--r--qpid/java/module.xml2
-rw-r--r--qpid/python/examples/direct/verify.in6
-rw-r--r--qpid/python/examples/fanout/verify.in6
-rw-r--r--qpid/python/examples/pubsub/verify.in2
-rw-r--r--qpid/python/examples/request-response/verify.in4
-rw-r--r--qpid/python/tests_0-10/message.py68
61 files changed, 550 insertions, 356 deletions
diff --git a/qpid/bin/verify b/qpid/bin/verify
index f35db898cf..844e99b765 100755
--- a/qpid/bin/verify
+++ b/qpid/bin/verify
@@ -36,11 +36,16 @@ background() {
waitfor $out "$pattern"
}
+name() {
+ for x in $*; do name="$name `basename $x`"; done
+ echo $name;
+}
+
outputs() {
wait 2> /dev/null # Wait for all backgroud processes to complete
rm -f $script.out
for f in "$@"; do
- { echo "==== $f"; eval "cat $f"; } >> $script.out || fail
+ { echo "==== `name $f`"; eval "cat $f"; } >> $script.out || fail
done
}
@@ -49,6 +54,7 @@ verify() {
if [ -d $1 ]; then dir=$1; script=verify;
else dir=`dirname $1`; script=`basename $1`; fi
cd $dir || return 1
+ rm -f *.out
{ source ./$script && diff -ac $script.out $script.in ; } || fail
test -z "$FAIL" && rm -f *.out
return $FAIL
@@ -59,7 +65,7 @@ remove_uuid() {
sed "s/$HEX\{8\}-$HEX\{4\}-$HEX\{4\}-$HEX\{4\}-$HEX\{12\}//g" $*
}
remove_uuid64() {
- sed 's/[-A-Za-z0-9_]\{22\}==$//' $*
+ sed 's/[-A-Za-z0-9_]\{22\}==//g' $*
}
# Start private broker if QPIDD is set.
diff --git a/qpid/bin/verify_all b/qpid/bin/verify_all
deleted file mode 100755
index 6be460c701..0000000000
--- a/qpid/bin/verify_all
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/sh
-# Find example verify scripts and run them.
-# Usage: verify_all search-dir
-
-if [ `basename $1` = examples ]; then exdirs=$1
-else exdirs=`find $1 -name examples -a -type d`; fi
-scripts=`find $exdirs -name verify -o -name verify_cpp_python -o -name verify_cpp_java`
-verify=`dirname $0`/verify
-for s in $scripts; do $verify $s; done
-
-
-
diff --git a/qpid/cpp/README b/qpid/cpp/README
index 7c34293007..7de7fd3f98 100644
--- a/qpid/cpp/README
+++ b/qpid/cpp/README
@@ -60,7 +60,7 @@ all of the above plus:
On linux most packages can be installed using your distribution's package
management tool. For example on Fedora:
- # yum install pkgconfig e2fsprogs boost-devel cppunit-devel openais ruby
+ # yum install pkgconfig e2fsprogs boost-devel cppunit-devel openais-devel ruby
# yum install make gcc-c++ autoconf automake libtool doxygen help2man graphviz
# yum install e2fsprogs-devel
diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am
index fdbea3b878..92625550e2 100644
--- a/qpid/cpp/examples/Makefile.am
+++ b/qpid/cpp/examples/Makefile.am
@@ -17,7 +17,36 @@ nobase_pkgdata_DATA= \
examples/direct/listener.cpp \
examples/direct/declare_queues.cpp
-EXTRA_DIST=$(nobase_pkgdata_DATA)
+VERIFY_FILES= verify verify_all \
+ examples/request-response/verify \
+ examples/request-response/verify.in \
+ examples/request-response/verify_cpp_python \
+ examples/request-response/verify_cpp_python.in \
+ examples/request-response/verify_python_cpp \
+ examples/request-response/verify_python_cpp.in \
+ examples/fanout/verify \
+ examples/fanout/verify.in \
+ examples/fanout/verify_cpp_python \
+ examples/fanout/verify_cpp_python.in \
+ examples/fanout/verify_python_cpp \
+ examples/fanout/verify_python_cpp.in \
+ examples/pub-sub/verify \
+ examples/pub-sub/verify.in \
+ examples/pub-sub/verify_cpp_python \
+ examples/pub-sub/verify_cpp_python.in \
+ examples/pub-sub/verify_python_cpp \
+ examples/pub-sub/verify_python_cpp.in \
+ examples/direct/verify \
+ examples/direct/verify.in \
+ examples/direct/verify_cpp_python \
+ examples/direct/verify_cpp_python.in \
+ examples/direct/verify_python_cpp \
+ examples/direct/verify_python_cpp.in
+
+EXTRA_DIST=$(nobase_pkgdata_DATA) $(VERIFY_FILES)
+
+verify:
+ cp $(top_srcdir)/../bin/verify $@
# Note: we don't use normal automake SUBDIRS because the example
# makefiles don't understand all the recursive automake targets.
@@ -29,31 +58,23 @@ clean-local:
abs_top_builddir=@abs_top_builddir@
abs_top_srcdir=@abs_top_srcdir@
-VERIFY=$(top_srcdir)/../bin/verify
-PYTHON_EXAMPLES=$(top_srcdir)/../python/examples
-EXAMPLES= \
- examples/pub-sub \
- examples/fanout \
- examples/direct \
- examples/request-response \
- $(PYTHON_EXAMPLES)/pubsub \
- $(PYTHON_EXAMPLES)/fanout \
- $(PYTHON_EXAMPLES)/direct \
- $(PYTHON_EXAMPLES)/request-response
-
-# Build the examples in the source tree.
+# Build the examples - copy sources to the build tree in VPATH build.
all-local:
+ test -d examples || cp -R $(srcdir)/examples .
cd examples && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(CXXFLAGS) -I../../$(top_srcdir)/src -I../../$(top_srcdir)/src/gen -I../../$(top_builddir)/src/gen -L../../$(top_builddir)/src/.libs -Wl,-rpath,$(abs_top_builddir)/src/.libs" all
# Verify the examples in the buid tree.
-check-local: all-local
- QPID_DATA_DIR= QPIDD=$(top_builddir)/src/qpidd $(VERIFY) $(EXAMPLES)
-
-# Build and verify the installed examples, then clean up to avoid rpmbuild warnings.
-EXAMPLE_FLAGS=-I$(DESTDIR)$(includedir) -L$(DESTDIR)$(libdir) -Wl,-rpath,$(DESTDIR)$(libdir)
-EXAMPLE_DIR=$(DESTDIR)$(pkgdatadir)/examples
-installcheck-local:
- cd $(EXAMPLE_DIR) && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(EXAMPLE_FLAGS)" all
- cd $(EXAMPLE_DIR) && QPIDD=$(sbindir)/qpidd $(VERIFY)
- cd $(EXAMPLE_DIR) && $(MAKE) clean
+check-local: all-local verify
+ $(srcdir)/verify_all $(abs_top_srcdir) $(abs_top_builddir)
+
+# TODO:
+# create a tarball for testing installed examples.
+# installcheck-local to use the tarball on installed example and clean up after.
+# Build and verify installed C++ examples, clean up to avoid rpmbuild warnings.
+# EXAMPLE_FLAGS=-I$(DESTDIR)$(includedir) -L$(DESTDIR)$(libdir) -Wl,-rpath,$(DESTDIR)$(libdir)
+# EXAMPLE_DIR=$(DESTDIR)$(pkgdatadir)/examples/cpp
+# installcheck-local:
+# cd $(EXAMPLE_DIR) && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(EXAMPLE_FLAGS)" all
+# cd $(EXAMPLE_DIR) && QPIDD=$(sbindir)/qpidd $(srcdir)/verify *
+# cd $(EXAMPLE_DIR) && $(MAKE) clean
diff --git a/qpid/cpp/examples/examples/direct/listener.cpp b/qpid/cpp/examples/examples/direct/listener.cpp
index 3f92d189de..91b5123c68 100644
--- a/qpid/cpp/examples/examples/direct/listener.cpp
+++ b/qpid/cpp/examples/examples/direct/listener.cpp
@@ -20,32 +20,15 @@
*/
/**
- * listener.cpp:
- *
- * This program is one of three programs designed to be used
- * together. These programs do not specify the exchange type - the
- * default exchange type is the direct exchange.
- *
- * declare_queues.cpp:
- *
- * Creates a queue on a broker, binding a routing key to route
- * messages to that queue.
- *
- * direct_producer.cpp:
- *
- * Publishes to a broker, specifying a routing key.
- *
- * listener.cpp (this program):
- *
- * Reads from a queue on the broker using a message listener.
- *
+ * listener.cpp: This program reads messages fro a queue on
+ * the broker using a message listener.
*/
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
-#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
#include <unistd.h>
#include <cstdlib>
@@ -56,42 +39,25 @@ using namespace qpid::framing;
class Listener : public MessageListener{
-private:
- std::string destination_name;
- Dispatcher dispatcher;
-public:
- Listener(Session& session, string destination_name):
- destination_name(destination_name),
- dispatcher(session)
- {};
-
- virtual void listen();
- virtual void received(Message& message);
- ~Listener() { };
+ private:
+ SubscriptionManager& subscriptions;
+ public:
+ Listener(SubscriptionManager& subscriptions);
+ virtual void received(Message& message);
};
-
-void Listener::listen() {
- std::cout << "Activating listener for: " <<destination_name << std::endl;
- dispatcher.listen(destination_name, this);
-
- // The following line gives up control
-
- dispatcher.run();
-}
-
+Listener::Listener(SubscriptionManager& subs) : subscriptions(subs)
+{}
void Listener::received(Message& message) {
std::cout << "Message: " << message.getData() << std::endl;
-
if (message.getData() == "That's all, folks!") {
- std::cout << "Shutting down listener for " <<destination_name << std::endl;
- dispatcher.stop();
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptions.cancel(message.getDestination());
}
}
-
-
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
@@ -103,25 +69,15 @@ int main(int argc, char** argv) {
//--------- Main body of program --------------------------------------------
+ SubscriptionManager subscriptions(session);
+ // Create a listener and subscribe it to the queue named "message_queue"
+ Listener listener(subscriptions);
+ subscriptions.subscribe(listener, "message_queue");
+ // Deliver messages until the subscription is cancelled
+ // by Listener::received()
+ subscriptions.run();
- // Subscribe to the queue, route it to a client destination for
- // the listener. (The destination name merely identifies the
- // destination in the listener, you can use any name as long as
- // you use the same name for the listener).
-
- session.messageSubscribe(arg::queue="message_queue", arg::destination="listener_destination");
-
- //##############
- session.messageFlow(arg::destination="listener_destination", arg::unit=0, arg::value=1);//messages ### Define a constant?
- session.messageFlow(arg::destination="listener_destination", arg::unit=1, arg::value=0xFFFFFFFF);//bytes ###### Define a constant?
-
- // Tell the listener to listen to the destination we just
- // created above.
-
- Listener listener(session, "listener_destination");
- listener.listen();
-
- //-----------------------------------------------------------------------------
+ //---------------------------------------------------------------------------
connection.close();
return 0;
diff --git a/qpid/cpp/examples/examples/direct/verify.in b/qpid/cpp/examples/examples/direct/verify.in
index afe36335c7..d1e95f1151 100644
--- a/qpid/cpp/examples/examples/direct/verify.in
+++ b/qpid/cpp/examples/examples/direct/verify.in
@@ -1,7 +1,6 @@
-==== ./declare_queues.out
-==== ./direct_producer.out
-==== ./listener.out
-Activating listener for: listener_destination
+==== declare_queues.out
+==== direct_producer.out
+==== listener.out
Message: Message 0
Message: Message 1
Message: Message 2
@@ -13,4 +12,4 @@ Message: Message 7
Message: Message 8
Message: Message 9
Message: That's all, folks!
-Shutting down listener for listener_destination
+Shutting down listener for message_queue
diff --git a/qpid/cpp/examples/examples/direct/verify_cpp_python b/qpid/cpp/examples/examples/direct/verify_cpp_python
index 5ce3681d90..4dc445ba27 100644
--- a/qpid/cpp/examples/examples/direct/verify_cpp_python
+++ b/qpid/cpp/examples/examples/direct/verify_cpp_python
@@ -1,4 +1,4 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/direct
+py=$PYTHON_EXAMPLES/direct
clients ./declare_queues ./direct_producer $py/direct_consumer.py
outputs ./declare_queues.out ./direct_producer.out $py/direct_consumer.py.out
diff --git a/qpid/cpp/examples/examples/direct/verify_cpp_python.in b/qpid/cpp/examples/examples/direct/verify_cpp_python.in
index 0952dbe405..1a329be59a 100644
--- a/qpid/cpp/examples/examples/direct/verify_cpp_python.in
+++ b/qpid/cpp/examples/examples/direct/verify_cpp_python.in
@@ -1,6 +1,6 @@
-==== ./declare_queues.out
-==== ./direct_producer.out
-==== ../../../../python/examples/direct/direct_consumer.py.out
+==== declare_queues.out
+==== direct_producer.out
+==== direct_consumer.py.out
Message 0
Message 1
Message 2
diff --git a/qpid/cpp/examples/examples/direct/verify_python_cpp b/qpid/cpp/examples/examples/direct/verify_python_cpp
index 43c5339150..fe4893e120 100644
--- a/qpid/cpp/examples/examples/direct/verify_python_cpp
+++ b/qpid/cpp/examples/examples/direct/verify_python_cpp
@@ -1,5 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/direct
+py=$PYTHON_EXAMPLES/direct
clients $py/declare_queues.py $py/direct_producer.py ./listener
outputs $py/declare_queues.py.out $py/direct_producer.py.out ./listener.out
diff --git a/qpid/cpp/examples/examples/direct/verify_python_cpp.in b/qpid/cpp/examples/examples/direct/verify_python_cpp.in
index b1907e6c3b..6f35255b18 100644
--- a/qpid/cpp/examples/examples/direct/verify_python_cpp.in
+++ b/qpid/cpp/examples/examples/direct/verify_python_cpp.in
@@ -1,7 +1,6 @@
-==== ../../../../python/examples/direct/declare_queues.py.out
-==== ../../../../python/examples/direct/direct_producer.py.out
-==== ./listener.out
-Activating listener for: listener_destination
+==== declare_queues.py.out
+==== direct_producer.py.out
+==== listener.out
Message: message 0
Message: message 1
Message: message 2
@@ -13,4 +12,4 @@ Message: message 7
Message: message 8
Message: message 9
Message: That's all, folks!
-Shutting down listener for listener_destination
+Shutting down listener for message_queue
diff --git a/qpid/cpp/examples/examples/fanout/declare_queues.cpp b/qpid/cpp/examples/examples/fanout/declare_queues.cpp
index 6b1eabfeec..c76ed54730 100644
--- a/qpid/cpp/examples/examples/fanout/declare_queues.cpp
+++ b/qpid/cpp/examples/examples/fanout/declare_queues.cpp
@@ -20,21 +20,16 @@
*/
/**
- * direct_config_queues.cpp
+ * declare_queues.cpp (this program):
*
* This program is one of three programs designed to be used
- * together. These programs use the "amq.direct" exchange.
+ * together. These programs use the "amq.fanout" exchange.
*
- * direct_config_queues.cpp (this program):
- *
- * Creates a queue on a broker, binding a routing key to route
- * messages to that queue.
- *
- * direct_publisher.cpp:
+ * fanout_producer.cpp:
*
* Publishes to a broker, specifying a routing key.
*
- * direct_listener.cpp
+ * listener.cpp
*
* Reads from a queue on the broker using a message listener.
*
@@ -65,9 +60,7 @@ int main(int argc, char** argv) {
//--------- Main body of program --------------------------------------------
- // Create a queue named "message_queue", and route all messages whose
- // routing key is "routing_key to this newly created queue.
-
+ // Create and bind a queue named "message_queue".
session.queueDeclare(arg::queue="message_queue");
session.queueBind(arg::queue="message_queue", arg::exchange="amq.fanout");
diff --git a/qpid/cpp/examples/examples/fanout/fanout_producer.cpp b/qpid/cpp/examples/examples/fanout/fanout_producer.cpp
index a5904e6731..7521724920 100644
--- a/qpid/cpp/examples/examples/fanout/fanout_producer.cpp
+++ b/qpid/cpp/examples/examples/fanout/fanout_producer.cpp
@@ -21,22 +21,22 @@
/**
- * direct_publisher.cpp:
+ * fanout_producer.cpp:
*
* This program is one of three programs designed to be used
* together. These programs do not specify the exchange type - the
* default exchange type is the direct exchange.
*
- * direct_config_queues.cpp:
+ * declare_queues.cpp:
*
* Creates a queue on a broker, binding a routing key to route
* messages to that queue.
*
- * direct_publisher.cpp (this program):
+ * fanout_producer.cpp (this program):
*
* Publishes to a broker, specifying a routing key.
*
- * direct_listener.cpp
+ * listener.cpp
*
* Reads from a queue on the broker using a message listener.
*
diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp
index 3f94f73f48..91b5123c68 100644
--- a/qpid/cpp/examples/examples/fanout/listener.cpp
+++ b/qpid/cpp/examples/examples/fanout/listener.cpp
@@ -20,32 +20,15 @@
*/
/**
- * direct_listener.cpp:
- *
- * This program is one of three programs designed to be used
- * together. These programs do not specify the exchange type - the
- * default exchange type is the direct exchange.
- *
- * direct_config_queues.cpp:
- *
- * Creates a queue on a broker, binding a routing key to route
- * messages to that queue.
- *
- * direct_publisher.cpp:
- *
- * Publishes to a broker, specifying a routing key.
- *
- * direct_listener.cpp (this program):
- *
- * Reads from a queue on the broker using a message listener.
- *
+ * listener.cpp: This program reads messages fro a queue on
+ * the broker using a message listener.
*/
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
-#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
#include <unistd.h>
#include <cstdlib>
@@ -56,43 +39,25 @@ using namespace qpid::framing;
class Listener : public MessageListener{
-private:
- std::string destination_name;
- Dispatcher dispatcher;
-public:
- Listener(Session& session, string destination_name):
- destination_name(destination_name),
- dispatcher(session)
- {};
-
- virtual void listen();
- virtual void received(Message& message);
- ~Listener() { };
+ private:
+ SubscriptionManager& subscriptions;
+ public:
+ Listener(SubscriptionManager& subscriptions);
+ virtual void received(Message& message);
};
-
-void Listener::listen() {
- std::cout << "Activating listener for: " <<destination_name << std::endl;
- dispatcher.listen(destination_name, this);
-
- // ### The following line gives up control - it should be possible
- // ### to listen without giving up control!
-
- dispatcher.run();
-}
-
+Listener::Listener(SubscriptionManager& subs) : subscriptions(subs)
+{}
void Listener::received(Message& message) {
std::cout << "Message: " << message.getData() << std::endl;
-
if (message.getData() == "That's all, folks!") {
- std::cout << "Shutting down listener for " <<destination_name << std::endl;
- dispatcher.stop();
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptions.cancel(message.getDestination());
}
}
-
-
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
@@ -104,24 +69,15 @@ int main(int argc, char** argv) {
//--------- Main body of program --------------------------------------------
+ SubscriptionManager subscriptions(session);
+ // Create a listener and subscribe it to the queue named "message_queue"
+ Listener listener(subscriptions);
+ subscriptions.subscribe(listener, "message_queue");
+ // Deliver messages until the subscription is cancelled
+ // by Listener::received()
+ subscriptions.run();
- // Subscribe to the queue, route it to a client destination for
- // the listener. (The destination name merely identifies the
- // destination in the listener, you can use any name as long as
- // you use the same name for the listener).
-
- session.messageSubscribe(arg::queue="message_queue", arg::destination="listener_destination");
-
- session.messageFlow(arg::destination="listener_destination", arg::unit=0, arg::value=1);//messages ### Define a constant?
- session.messageFlow(arg::destination="listener_destination", arg::unit=1, arg::value=0xFFFFFFFF);//bytes ###### Define a constant?
-
- // Tell the listener to listen to the destination we just
- // created above.
-
- Listener listener(session, "listener_destination");
- listener.listen();
-
- //-----------------------------------------------------------------------------
+ //---------------------------------------------------------------------------
connection.close();
return 0;
diff --git a/qpid/cpp/examples/examples/fanout/verify.in b/qpid/cpp/examples/examples/fanout/verify.in
index 9d1c1445e1..23d08f38dd 100644
--- a/qpid/cpp/examples/examples/fanout/verify.in
+++ b/qpid/cpp/examples/examples/fanout/verify.in
@@ -1,7 +1,6 @@
-==== ./declare_queues.out
-==== ./fanout_producer.out
-==== ./listener.out
-Activating listener for: listener_destination
+==== declare_queues.out
+==== fanout_producer.out
+==== listener.out
Message: Message 0
Message: Message 1
Message: Message 2
@@ -13,4 +12,4 @@ Message: Message 7
Message: Message 8
Message: Message 9
Message: That's all, folks!
-Shutting down listener for listener_destination
+Shutting down listener for message_queue
diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python b/qpid/cpp/examples/examples/fanout/verify_cpp_python
index 59fa63478c..f53784ef1c 100644
--- a/qpid/cpp/examples/examples/fanout/verify_cpp_python
+++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python
@@ -1,5 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/fanout
+py=$PYTHON_EXAMPLES/fanout
clients ./declare_queues ./fanout_producer $py/fanout_consumer.py
outputs ./declare_queues.out ./fanout_producer.out $py/fanout_consumer.py.out
diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in
index 141e7c6a84..cb9e52cfcc 100644
--- a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in
+++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in
@@ -1,6 +1,6 @@
-==== ./declare_queues.out
-==== ./fanout_producer.out
-==== ../../../../python/examples/fanout/fanout_consumer.py.out
+==== declare_queues.out
+==== fanout_producer.out
+==== fanout_consumer.py.out
Message 0
Message 1
Message 2
diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp b/qpid/cpp/examples/examples/fanout/verify_python_cpp
index cef36bd287..00a0727352 100644
--- a/qpid/cpp/examples/examples/fanout/verify_python_cpp
+++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp
@@ -1,5 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/fanout
-clients $py/declare_queues.py $py/fanout_producer .py ./listener
+py=$PYTHON_EXAMPLES/fanout
+clients $py/declare_queues.py $py/fanout_producer.py ./listener
outputs $py/declare_queues.py.out $py/fanout_producer.py.out ./listener.out
diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in
new file mode 100644
index 0000000000..87a1694e6c
--- /dev/null
+++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in
@@ -0,0 +1,15 @@
+==== declare_queues.py.out
+==== fanout_producer.py.out
+==== listener.out
+Message: message 0
+Message: message 1
+Message: message 2
+Message: message 3
+Message: message 4
+Message: message 5
+Message: message 6
+Message: message 7
+Message: message 8
+Message: message 9
+Message: That's all, folks!
+Shutting down listener for message_queue
diff --git a/qpid/cpp/examples/examples/pub-sub/verify.in b/qpid/cpp/examples/examples/pub-sub/verify.in
index ac51038a1b..6413c5c788 100644
--- a/qpid/cpp/examples/examples/pub-sub/verify.in
+++ b/qpid/cpp/examples/examples/pub-sub/verify.in
@@ -1,4 +1,4 @@
-==== ./topic_publisher.out
+==== topic_publisher.out
==== topic_listener.out | remove_uuid | sort
Declaring queue: europe
Declaring queue: news
diff --git a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python
index ff711a310f..ecc573eed3 100644
--- a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python
+++ b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python
@@ -1,5 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/pubsub
+py=$PYTHON_EXAMPLES/pubsub
background "Queues created" $py/topic_subscriber.py
clients ./topic_publisher
outputs ./topic_publisher.out "$py/topic_subscriber.py.out | remove_uuid64 | sort"
diff --git a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in
index 9e033dc25c..b3c9e750f5 100644
--- a/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in
+++ b/qpid/cpp/examples/examples/pub-sub/verify_cpp_python.in
@@ -1,5 +1,5 @@
-==== ./topic_publisher.out
-==== ../../../../python/examples/pubsub/topic_subscriber.py.out | remove_uuid64 | sort
+==== topic_publisher.out
+==== topic_subscriber.py.out | remove_uuid64 | sort
Message 0
Message 0
Message 0
diff --git a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp
index e9c72c94d7..2ddaad58c2 100644
--- a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp
+++ b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp
@@ -1,5 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/pubsub
+py=$PYTHON_EXAMPLES/pubsub
background "Listening" ./topic_listener
clients $py/topic_publisher.py
outputs $py/topic_publisher.py.out "topic_listener.out | remove_uuid | sort"
diff --git a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in
index 6f92458792..97fccf0a32 100644
--- a/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in
+++ b/qpid/cpp/examples/examples/pub-sub/verify_python_cpp.in
@@ -1,4 +1,4 @@
-==== ../../../../python/examples/pubsub/topic_publisher.py.out
+==== topic_publisher.py.out
==== topic_listener.out | remove_uuid | sort
Declaring queue: europe
Declaring queue: news
diff --git a/qpid/cpp/examples/examples/request-response/verify b/qpid/cpp/examples/examples/request-response/verify
index a8c942b5e6..76007ff8d2 100644
--- a/qpid/cpp/examples/examples/request-response/verify
+++ b/qpid/cpp/examples/examples/request-response/verify
@@ -2,4 +2,4 @@
background "Waiting" ./server
clients ./client
kill %% # Must kill the server.
-outputs "./client.out | remove_uuid" " server.out | remove_uuid"
+outputs "./client.out | remove_uuid" "server.out | remove_uuid"
diff --git a/qpid/cpp/examples/examples/request-response/verify.in b/qpid/cpp/examples/examples/request-response/verify.in
index 3767fb3f43..7925dc5671 100644
--- a/qpid/cpp/examples/examples/request-response/verify.in
+++ b/qpid/cpp/examples/examples/request-response/verify.in
@@ -1,4 +1,4 @@
-==== ./client.out | remove_uuid
+==== client.out | remove_uuid
Activating response queue listener for: client
Request: Twas brillig, and the slithy toves
Request: Did gire and gymble in the wabe.
@@ -10,7 +10,7 @@ Response: DID GIRE AND GYMBLE IN THE WABE.
Response: ALL MIMSY WERE THE BOROGROVES,
Response: AND THE MOME RATHS OUTGRABE.
Shutting down listener for client
-==== server.out | remove_uuid
+==== server.out | remove_uuid
Activating request queue listener for: request
Waiting for requests
Request: Twas brillig, and the slithy toves (client)
diff --git a/qpid/cpp/examples/examples/request-response/verify_cpp_python b/qpid/cpp/examples/examples/request-response/verify_cpp_python
index 9470110bc2..9d71d51c37 100644
--- a/qpid/cpp/examples/examples/request-response/verify_cpp_python
+++ b/qpid/cpp/examples/examples/request-response/verify_cpp_python
@@ -1,6 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/request-response
-background "Request server running" $py/server.py
+background "Request server running" $PYTHON_EXAMPLES/request-response/server.py
clients ./client
kill %% # Must kill the server.
-outputs "./client.out | remove_uuid" "$py/server.py.out | remove_uuid64"
+outputs "./client.out | remove_uuid" "$PYTHON_EXAMPLES/request-response/server.py.out | remove_uuid64"
diff --git a/qpid/cpp/examples/examples/request-response/verify_cpp_python.in b/qpid/cpp/examples/examples/request-response/verify_cpp_python.in
index efffc32332..280484bd2a 100644
--- a/qpid/cpp/examples/examples/request-response/verify_cpp_python.in
+++ b/qpid/cpp/examples/examples/request-response/verify_cpp_python.in
@@ -1,4 +1,4 @@
-==== ./client.out | remove_uuid
+==== client.out | remove_uuid
Activating response queue listener for: client
Request: Twas brillig, and the slithy toves
Request: Did gire and gymble in the wabe.
@@ -10,6 +10,6 @@ Response: DID GIRE AND GYMBLE IN THE WABE.
Response: ALL MIMSY WERE THE BOROGROVES,
Response: AND THE MOME RATHS OUTGRABE.
Shutting down listener for client
-==== ../../../../python/examples/request-response/server.py.out | remove_uuid64
+==== server.py.out | remove_uuid64
Request server running - run your client now.
(Times out after 100 seconds ...)
diff --git a/qpid/cpp/examples/examples/request-response/verify_python_cpp b/qpid/cpp/examples/examples/request-response/verify_python_cpp
index 11f5c4cbc4..9f3f1caaf4 100644
--- a/qpid/cpp/examples/examples/request-response/verify_python_cpp
+++ b/qpid/cpp/examples/examples/request-response/verify_python_cpp
@@ -1,6 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=../../../../python/examples/request-response
-background "Request server running" ./server
-clients $py/client.py
+background "Waiting" ./server
+clients $PYTHON_EXAMPLES/request-response/client.py
kill %% # Must kill the server.
-outputs "./client.py.out | remove_uuid64" " server.out | remove_uuid"
+outputs "$PYTHON_EXAMPLES/request-response/client.py.out | remove_uuid64" "server.out | remove_uuid64"
diff --git a/qpid/cpp/examples/examples/request-response/verify_python_cpp.in b/qpid/cpp/examples/examples/request-response/verify_python_cpp.in
new file mode 100644
index 0000000000..7718d54973
--- /dev/null
+++ b/qpid/cpp/examples/examples/request-response/verify_python_cpp.in
@@ -0,0 +1,18 @@
+==== client.py.out | remove_uuid64
+Request: Twas brilling, and the slithy toves
+Request: Did gyre and gimble in the wabe.
+Request: All mimsy were the borogroves,
+Request: And the mome raths outgrabe.
+Messages queue: ReplyTo:
+Response: TWAS BRILLING, AND THE SLITHY TOVES
+Response: DID GYRE AND GIMBLE IN THE WABE.
+Response: ALL MIMSY WERE THE BOROGROVES,
+Response: AND THE MOME RATHS OUTGRABE.
+No more messages!
+==== server.out | remove_uuid64
+Activating request queue listener for: request
+Waiting for requests
+Request: Twas brilling, and the slithy toves (ReplyTo:)
+Request: Did gyre and gimble in the wabe. (ReplyTo:)
+Request: All mimsy were the borogroves, (ReplyTo:)
+Request: And the mome raths outgrabe. (ReplyTo:)
diff --git a/qpid/cpp/examples/verify_all b/qpid/cpp/examples/verify_all
new file mode 100755
index 0000000000..e6d75929b2
--- /dev/null
+++ b/qpid/cpp/examples/verify_all
@@ -0,0 +1,24 @@
+#!/bin/sh
+# Verify all C++/python example combinations.
+#
+
+src=$1 ; build=$2
+
+verify=./verify
+qpidd=$build/src/qpidd
+cpp=$build/examples/examples
+python=$src/../python
+
+trap "$qpidd -q" exit
+export QPID_PORT=`$qpidd -dp0 --data-dir ""`
+export PYTHON_EXAMPLES=$python/examples
+export PYTHONPATH=$python:$PYTHONPATH
+export AMQP_SPEC=$src/../specs/amqp.0-10-preview.xml
+
+find="find $cpp"
+test -d $PYTHON_EXAMPLES && find="$find $PYTHON_EXAMPLES"
+find="$find -name verify"
+test -d $PYTHON_EXAMPLES && \
+ find="$find -o -name verify_cpp_python -o -name verify_python_cpp"
+$verify `$find`
+
diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in
index 45d67dff6e..3892b1ab40 100644
--- a/qpid/cpp/qpidc.spec.in
+++ b/qpid/cpp/qpidc.spec.in
@@ -5,7 +5,7 @@
Name: @PACKAGE@
Version: @VERSION@
-Release: 18%{?dist}
+Release: 22%{?dist}
Summary: Libraries for Qpid C++ client applications
Group: System Environment/Libraries
License: Apache Software License
@@ -73,16 +73,16 @@ Qpid broker daemon.
%setup -q
%build
-%configure --disable-static --without-cpg CXXFLAGS="-O3 -DNDEBUG"
+%configure --disable-static --without-cpg CXXFLAGS="-g -O3 -DNDEBUG"
make %{?_smp_mflags}
# Remove this generated perl file, we don't need it and it upsets rpmlint.
rm docs/api/html/installdox
%install
rm -rf %{buildroot}
-make install DESTDIR=%{buildroot}
-install -Dp -m0755 etc/qpidd %{buildroot}%{_initrddir}/qpidd
-install -d -m0755 %{buildroot}%{_localstatedir}/qpidd
+make install-strip DESTDIR=%{buildroot}
+install -Dp -m0755 etc/qpidd %{buildroot}%{_initrddir}/qpidd
+install -d -m0755 %{buildroot}%{_localstatedir}/lib/qpidd
rm -f %{buildroot}%_libdir/*.a
rm -f %{buildroot}%_libdir/*.la
@@ -99,7 +99,6 @@ make check
%_libdir/libqpidcommon.so.0.1.0
%_libdir/libqpidclient.so.0
%_libdir/libqpidclient.so.0.1.0
-%_localstatedir/qpidd
%config(noreplace) %_sysconfdir/qpidd.conf
%files devel
@@ -123,6 +122,7 @@ make check
%_libdir/libqpidcluster.so.0.1.0
%_sbindir/%{qpidd}
%{_initrddir}/%{qpidd}
+%_localstatedir/lib/qpidd
%doc %_mandir/man1/%{qpidd}.*
%files -n %{qpidd}-devel
@@ -156,6 +156,14 @@ fi
/sbin/ldconfig
%changelog
+* Tue Feb 12 2008 Alan Conway <aconway@redhat.com> - 0.2-22
+- Added -g to compile flags for debug symbols.
+
+* Tue Feb 12 2008 Alan Conway <aconway@redhat.com> - 0.2-21
+- Create /var/lib/qpidd correctly.
+
+* Mon Feb 11 2008 Rafael Schloming <rafaels@redhat.com> - 0.2-20
+- bumped for Beta 3
* Mon Jan 21 2008 Gordon Sim <gsim@redhat.com> - 0.2-18
- bump up rev for recent changes to plugin modules & mgmt
diff --git a/qpid/cpp/src/qpid/DataDir.cpp b/qpid/cpp/src/qpid/DataDir.cpp
index baf536c109..abf9b061e4 100644
--- a/qpid/cpp/src/qpid/DataDir.cpp
+++ b/qpid/cpp/src/qpid/DataDir.cpp
@@ -20,6 +20,7 @@
#include "Exception.h"
#include "DataDir.h"
+#include "qpid/log/Statement.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
@@ -32,7 +33,10 @@ DataDir::DataDir (std::string path) :
dirPath (path)
{
if (!enabled)
+ {
+ QPID_LOG (info, "No data directory - Disabling persistent configuration");
return;
+ }
const char *cpath = dirPath.c_str ();
struct stat s;
@@ -55,14 +59,20 @@ DataDir::DataDir (std::string path) :
oss << "Error locking data directory: errno=" << errno;
throw Exception (oss.str ());
}
+
+ QPID_LOG (info, "Locked data directory: " << dirPath);
}
DataDir::~DataDir ()
{
+ if (dirPath.empty ())
+ return;
+
std::string lockFile (dirPath);
lockFile = lockFile + "/lock";
::unlink (lockFile.c_str ());
+ QPID_LOG (info, "Unlocked data directory: " << dirPath);
}
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 1d55db0c0f..117a93b571 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -60,6 +60,7 @@ namespace broker {
Broker::Options::Options(const std::string& name) :
qpid::Options(name),
+ noDataDir(0),
dataDir("/var/lib/qpidd"),
port(DEFAULT_PORT),
workerThreads(5),
@@ -75,6 +76,8 @@ Broker::Options::Options(const std::string& name) :
addOptions()
("data-dir", optValue(dataDir,"DIR"),
"Directory to contain persistent data generated by the broker")
+ ("no-data-dir", optValue(noDataDir),
+ "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"),
"Tells the broker to listen on PORT")
("worker-threads", optValue(workerThreads, "N"),
@@ -103,7 +106,7 @@ const std::string qpid_management("qpid.management");
Broker::Broker(const Broker::Options& conf) :
config(conf),
store(0),
- dataDir(conf.dataDir),
+ dataDir(conf.noDataDir ? std::string () : conf.dataDir),
factory(*this),
sessionManager(conf.ack)
{
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 852cb2da42..68dbf570f0 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -61,6 +61,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
struct Options : public qpid::Options {
Options(const std::string& name="Broker Options");
+ bool noDataDir;
std::string dataDir;
uint16_t port;
int workerThreads;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
index 72df9c44e9..9577853de4 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -36,9 +36,9 @@ ConnectionFactory::~ConnectionFactory()
qpid::sys::ConnectionInputHandler*
ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
- const qpid::sys::Socket& s)
+ const std::string& id)
{
- return new Connection(out, broker, s.getPeerAddress());
+ return new Connection(out, broker, id);
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.h b/qpid/cpp/src/qpid/broker/ConnectionFactory.h
index 23fba5c1ab..53fb160279 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.h
@@ -32,8 +32,8 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
public:
ConnectionFactory(Broker& b);
- virtual qpid::sys::ConnectionInputHandler* create
- (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s);
+ virtual qpid::sys::ConnectionInputHandler*
+ create(qpid::sys::ConnectionOutputHandler* out, const std::string& id);
virtual ~ConnectionFactory();
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 1021cca1b1..80fafe0386 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -147,12 +147,14 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
break;
case management::Session::METHOD_CLOSE :
+ /*
if (handler != 0)
{
handler->getConnection().closeChannel(handler->getChannel());
}
status = Manageable::STATUS_OK;
break;
+ */
case management::Session::METHOD_SOLICITACK :
case management::Session::METHOD_RESETLIFESPAN :
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 0783d5bc55..8df4637c88 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -75,11 +75,18 @@ void Dispatcher::run()
if (content->isA<MessageTransferBody>()) {
Message msg(*content, session);
Subscriber::shared_ptr listener = find(msg.getDestination());
- assert(listener);
- listener->received(msg);
+ if (!listener) {
+ QPID_LOG(error, "No listener found for destination " << msg.getDestination());
+ } else {
+ assert(listener);
+ listener->received(msg);
+ }
} else {
- assert (handler.get());
- handler->handle(*content);
+ if (handler.get()) {
+ handler->handle(*content);
+ } else {
+ QPID_LOG(error, "No handler found for " << *(content->getMethod()));
+ }
}
}
} catch (const ClosedException&) {
diff --git a/qpid/cpp/src/qpid/log/Statement.cpp b/qpid/cpp/src/qpid/log/Statement.cpp
index 2935de9071..9b6fb7feaf 100644
--- a/qpid/cpp/src/qpid/log/Statement.cpp
+++ b/qpid/cpp/src/qpid/log/Statement.cpp
@@ -22,6 +22,7 @@
#include <stdexcept>
#include <algorithm>
#include <syslog.h>
+#include <ctype.h>
namespace qpid {
namespace log {
@@ -29,21 +30,21 @@ namespace log {
namespace {
using namespace std;
-struct IsControl { bool operator()(unsigned char c) { return c < 32; } };
+struct NonPrint { bool operator()(unsigned char c) { return !isprint(c); } };
-bool isClean(const std::string& str) {
- return std::find_if(str.begin(), str.end(), IsControl()) == str.end();
-}
+char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
std::string quote(const std::string& str) {
- IsControl isControl;
- size_t n = std::count_if(str.begin(), str.end(), isControl);
+ NonPrint nonPrint;
+ size_t n = std::count_if(str.begin(), str.end(), nonPrint);
+ if (n==0) return str;
std::string ret;
- ret.reserve(str.size()+n); // Avoid extra allocations.
+ ret.reserve(str.size()+2*n); // Avoid extra allocations.
for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
- if (isControl(*i)) {
- ret.push_back('^');
- ret.push_back((*i)+64);
+ if (nonPrint(*i)) {
+ ret.push_back('\\');
+ ret.push_back(hex[((*i) >> 4)&0xf]);
+ ret.push_back(hex[(*i) & 0xf]);
}
else ret.push_back(*i);
}
@@ -53,7 +54,7 @@ std::string quote(const std::string& str) {
}
void Statement::log(const std::string& message) {
- Logger::instance().log(*this, isClean(message) ? message : quote(message));
+ Logger::instance().log(*this, quote(message));
}
Statement::Initializer::Initializer(Statement& s) : statement(s) {
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 39fab270af..709f2a0ecd 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -74,16 +74,16 @@ void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
}
void ManagementAgent::addObject (ManagementObject::shared_ptr object,
- uint64_t persistenceId,
- uint64_t idOffset)
+ uint64_t /*persistenceId*/,
+ uint64_t /*idOffset*/)
{
RWlock::ScopedWlock writeLock (userLock);
uint64_t objectId;
- if (persistenceId == 0)
+// if (persistenceId == 0)
objectId = nextObjectId++;
- else
- objectId = 0x8000000000000000ULL | (persistenceId + idOffset);
+// else
+// objectId = 0x8000000000000000ULL | (persistenceId + idOffset);
object->setObjectId (objectId);
managementObjects[objectId] = object;
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 65d805e492..9fd32add72 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -140,7 +140,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async, s);
+ ConnectionInputHandler* handler = f->create(async, s.getPeerAddress());
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -194,7 +194,7 @@ void AsynchIOAcceptor::connect(const std::string& host, int16_t port, Connection
socket->connect(host, port);
AsynchIOHandler* async = new AsynchIOHandler;
async->setClient();
- ConnectionInputHandler* handler = f->create(async, *socket);
+ ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress());
AsynchIO* aio = new AsynchIO(*socket,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
index 4a90f8b736..2b309b5758 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
@@ -22,7 +22,7 @@
#define _ConnectionInputHandlerFactory_
#include <boost/noncopyable.hpp>
-#include "qpid/sys/Socket.h"
+#include <string>
namespace qpid {
namespace sys {
@@ -37,7 +37,13 @@ class ConnectionInputHandler;
class ConnectionInputHandlerFactory : private boost::noncopyable
{
public:
- virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0;
+ /**
+ *@param out handler for connection output.
+ *@param id identify the connection for management purposes.
+ */
+ virtual ConnectionInputHandler* create(ConnectionOutputHandler* out,
+ const std::string& id) = 0;
+
virtual ~ConnectionInputHandlerFactory(){}
};
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index 6c20ebc58f..08b907cbe2 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -48,9 +48,9 @@ struct ModuleOptions : public qpid::Options {
ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false)
{
addOptions()
- ("load-dir", optValue(loadDir, "DIR"), "Load all modules from this directory")
- ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded")
- ("no-modules", optValue(noLoad), "Don't load any modules");
+ ("module-dir", optValue(loadDir, "DIR"), "Load all .so modules in this directory")
+ ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded")
+ ("no-module-dir", optValue(noLoad), "Don't load modules from module directory");
}
};
@@ -122,7 +122,7 @@ auto_ptr<QpiddOptions> options;
void shutdownHandler(int /*signal*/){
// Note: do not call any async-signal unsafe functions here.
- // Do any extra shtudown actions in main() after broker->run()
+ // Do any extra shutdown actions in main() after broker->run()
brokerPtr->shutdown();
}
@@ -140,6 +140,7 @@ struct QpiddDaemon : public Daemon {
uint16_t port=brokerPtr->getPort();
ready(port); // Notify parent.
brokerPtr->run();
+ brokerPtr.reset();
}
};
@@ -188,12 +189,13 @@ int main(int argc, char* argv[])
// be re-parsed with all of the module-supplied options.
bootOptions.parse (argc, argv, bootOptions.common.config, true);
qpid::log::Logger::instance().configure(bootOptions.log, argv[0]);
- if (!bootOptions.module.noLoad) {
- for (vector<string>::iterator iter = bootOptions.module.load.begin();
- iter != bootOptions.module.load.end();
- iter++)
- tryShlib (iter->data(), false);
+ for (vector<string>::iterator iter = bootOptions.module.load.begin();
+ iter != bootOptions.module.load.end();
+ iter++)
+ tryShlib (iter->data(), false);
+
+ if (!bootOptions.module.noLoad) {
bool isDefault = defaultPath == bootOptions.module.loadDir;
loadModuleDir (bootOptions.module.loadDir, isDefault);
}
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 60cfe04510..87a4f59999 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -21,6 +21,7 @@
#include "unit_test.h"
#include "BrokerFixture.h"
#include "qpid/client/Dispatcher.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/client/Session_0_10.h"
@@ -38,6 +39,7 @@ using namespace qpid::client;
using namespace qpid::client::arg;
using namespace qpid::framing;
using namespace qpid;
+using qpid::sys::Monitor;
using std::string;
using std::cout;
using std::endl;
@@ -67,6 +69,27 @@ struct DummyListener : public sys::Runnable, public MessageListener {
}
};
+struct SimpleListener : public MessageListener
+{
+ Monitor lock;
+ std::vector<Message> messages;
+
+ void received(Message& msg)
+ {
+ Monitor::ScopedLock l(lock);
+ messages.push_back(msg);
+ lock.notifyAll();
+ }
+
+ void waitFor(const uint n)
+ {
+ Monitor::ScopedLock l(lock);
+ while (messages.size() < n) {
+ lock.wait();
+ }
+ }
+};
+
struct ClientSessionFixture : public ProxySessionFixture
{
void declareSubscribe(const string& q="my-queue",
@@ -167,22 +190,31 @@ BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture)
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
}
+/**
+ * Currently broken due to a deadlock in SessionCore
+ *
BOOST_FIXTURE_TEST_CASE(testSendToSelf, SessionFixture) {
- // https://bugzilla.redhat.com/show_bug.cgi?id=410551
// Deadlock if SubscriptionManager run() concurrent with session ack.
- LocalQueue myq;
+ SimpleListener mylistener;
session.queueDeclare(queue="myq", exclusive=true, autoDelete=true);
- subs.subscribe(myq, "myq");
+ subs.subscribe(mylistener, "myq", "myq");
+ sys::Thread runner(subs);//start dispatcher thread
string data("msg");
Message msg(data, "myq");
- const int count=100; // Verified with count=100000 in a loop.
- for (int i = 0; i < count; ++i)
+ const uint count=10000;
+ for (uint i = 0; i < count; ++i) {
session.messageTransfer(content=msg);
- for (int j = 0; j < count; ++j) {
- Message m=myq.pop();
- BOOST_CHECK_EQUAL(m.getData(), data);
+ }
+ mylistener.waitFor(count);
+ subs.cancel("myq");
+ subs.stop();
+ session.close();
+ BOOST_CHECK_EQUAL(mylistener.messages.size(), count);
+ for (uint j = 0; j < count; ++j) {
+ BOOST_CHECK_EQUAL(mylistener.messages[j].getData(), data);
}
}
+*/
QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/logging.cpp b/qpid/cpp/src/tests/logging.cpp
index 4969c3d6a9..2c0ed08105 100644
--- a/qpid/cpp/src/tests/logging.cpp
+++ b/qpid/cpp/src/tests/logging.cpp
@@ -367,7 +367,7 @@ BOOST_AUTO_TEST_CASE(testLoggerConfigure) {
unlink("logging.tmp");
}
-BOOST_AUTO_TEST_CASE(testQuoteControlChars) {
+BOOST_AUTO_TEST_CASE(testQuoteNonPrintable) {
Logger& l=Logger::instance();
l.clear();
Options opts;
@@ -375,13 +375,13 @@ BOOST_AUTO_TEST_CASE(testQuoteControlChars) {
opts.outputs.push_back("logging.tmp");
opts.time=false;
l.configure(opts, "test");
- char s[] = "null\0tab\tspace newline\nret\r";
+ char s[] = "null\0tab\tspace newline\nret\r\x80\x99\xff";
string str(s, sizeof(s));
QPID_LOG(critical, str);
ifstream log("logging.tmp");
string line;
getline(log, line);
- string expect="critical null^@tab^Ispace newline^Jret^M^@";
+ string expect="critical null\\00tab\\09space newline\\0Aret\\0D\\80\\99\\FF\\00";
BOOST_CHECK_EQUAL(expect, line);
log.close();
unlink("logging.tmp");
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 6c7575429a..0e36a09bbd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -231,6 +231,14 @@ public abstract class AMQDestination implements Destination, Referenceable
sb.append('?');
+ if (_routingKey != null)
+ {
+ sb.append(BindingURL.OPTION_ROUTING_KEY);
+ sb.append("='");
+ sb.append(_routingKey).append("'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
if (_isDurable)
{
sb.append(BindingURL.OPTION_DURABLE);
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index 51a052bed5..ad47e14fde 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -16,6 +16,7 @@ import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
import org.apache.qpidity.transport.Channel;
import org.apache.qpidity.transport.Connection;
import org.apache.qpidity.transport.ConnectionClose;
+import org.apache.qpidity.transport.ConnectionCloseOk;
import org.apache.qpidity.transport.ConnectionDelegate;
import org.apache.qpidity.transport.ConnectionEvent;
import org.apache.qpidity.transport.ProtocolHeader;
@@ -33,6 +34,9 @@ public class Client implements org.apache.qpidity.nclient.Connection
private ClosedListener _closedListner;
private final Lock _lock = new ReentrantLock();
private static Logger _logger = LoggerFactory.getLogger(Client.class);
+ private Condition closeOk;
+ private boolean closed = false;
+
/**
*
* @return returns a new connection to the broker.
@@ -45,6 +49,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
{
Condition negotiationComplete = _lock.newCondition();
+ closeOk = _lock.newCondition();
_lock.lock();
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
@@ -76,6 +81,21 @@ public class Client implements org.apache.qpidity.nclient.Connection
}
}
+ @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct)
+ {
+ _lock.lock();
+ try
+ {
+ closed = true;
+ this.receivedClose = true;
+ closeOk.signalAll();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
@Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode());
@@ -179,6 +199,24 @@ public class Client implements org.apache.qpidity.nclient.Connection
{
Channel ch = _conn.getChannel(0);
ch.connectionClose(0, "client is closing", 0, 0);
+ _lock.lock();
+ try
+ {
+ try {
+ while (!closed)
+ {
+ closeOk.await();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
_conn.close();
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
new file mode 100644
index 0000000000..20443944d2
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.testutil.QpidTestCase;
+import org.apache.qpidity.transport.util.Logger;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * ConnectionCloseTest
+ *
+ */
+
+public class ConnectionCloseTest extends QpidTestCase
+{
+
+ private static final Logger log = Logger.get(ConnectionCloseTest.class);
+
+ public void testSendReceiveClose() throws Exception
+ {
+ for (int i = 0; i < 500; i++)
+ {
+ if ((i % 10) == 0)
+ {
+ log.warn("%d messages sent and received", i);
+ }
+
+ Connection receiver = getConnection();
+ receiver.start();
+ Session rssn = receiver.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = rssn.createQueue("connection-close-test-queue");
+ MessageConsumer cons = rssn.createConsumer(queue);
+
+ Connection sender = getConnection();
+ sender.start();
+ Session sssn = sender.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sssn.createProducer(queue);
+ prod.send(sssn.createTextMessage("test"));
+ sender.close();
+
+ TextMessage m = (TextMessage) cons.receive(2000);
+ assertNotNull("message was lost", m);
+ assertEquals(m.getText(), "test");
+ receiver.close();
+ }
+ }
+
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java
index ba4ebae258..7c03e16258 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java
@@ -28,7 +28,7 @@ import javax.jms.*;
*
*
*/
-public abstract class AbstractXATest extends QpidTestCase
+public abstract class AbstractXATestCase extends QpidTestCase
{
protected static final String _sequenceNumberPropertyName = "seqNumber";
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java
index cd5b228f76..a703432efb 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java
@@ -26,10 +26,10 @@ import junit.framework.TestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class QueueTests extends AbstractXATest
+public class QueueTest extends AbstractXATestCase
{
/* this clas logger */
- private static final Logger _logger = LoggerFactory.getLogger(QueueTests.class);
+ private static final Logger _logger = LoggerFactory.getLogger(QueueTest.class);
/**
* the queue use by all the tests
@@ -66,7 +66,7 @@ public class QueueTests extends AbstractXATest
*/
public static TestSuite getSuite()
{
- return new TestSuite(QueueTests.class);
+ return new TestSuite(QueueTest.class);
}
/**
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java
index 30b3b09449..5ea059b166 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java
@@ -33,10 +33,10 @@ import org.slf4j.LoggerFactory;
*
*
*/
-public class TopicTests extends AbstractXATest
+public class TopicTest extends AbstractXATestCase
{
/* this clas logger */
- private static final Logger _logger = LoggerFactory.getLogger(TopicTests.class);
+ private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class);
/**
* the topic use by all the tests
@@ -81,7 +81,7 @@ public class TopicTests extends AbstractXATest
*/
public static TestSuite getSuite()
{
- return new TestSuite(TopicTests.class);
+ return new TestSuite(TopicTest.class);
}
/**
@@ -1671,12 +1671,12 @@ public class TopicTests extends AbstractXATest
long seq = 0;
try
{
- seq = message.getLongProperty(TopicTests._sequenceNumberPropertyName);
+ seq = message.getLongProperty(TopicTest._sequenceNumberPropertyName);
}
catch (JMSException e)
{
e.printStackTrace();
- TopicTests.failure();
+ TopicTest.failure();
_lock.set(false);
synchronized (_lock)
{
@@ -1686,7 +1686,7 @@ public class TopicTests extends AbstractXATest
if (seq != _counter)
{
System.out.println("received message " + seq + " expected " + _counter);
- TopicTests.failure();
+ TopicTest.failure();
_lock.set(false);
synchronized (_lock)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
index e7c09fca65..084137b38e 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
@@ -59,16 +59,27 @@ public class QpidTestCase extends TestCase
private InitialContext _initialContext;
private AMQConnectionFactory _connectionFactory;
- protected void setUp() throws Exception
+ public void runBare() throws Throwable
{
- super.setUp();
+ String name = getClass().getSimpleName() + "." + getName();
+ _logger.info("========== start " + name + " ==========");
startBroker();
- }
-
- protected void tearDown() throws Exception
- {
- stopBroker();
- super.tearDown();
+ try
+ {
+ super.runBare();
+ }
+ finally
+ {
+ try
+ {
+ stopBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("exception stopping broker", e);
+ }
+ _logger.info("========== stop " + name + " ==========");
+ }
}
public void startBroker() throws Exception
@@ -102,7 +113,8 @@ public class QpidTestCase extends TestCase
}
catch (IOException e)
{
- _logger.info("redirector", e);
+ // this seems to happen regularly even when
+ // exits are normal
}
}
}.start();
@@ -190,6 +202,11 @@ public class QpidTestCase extends TestCase
return _connectionFactory;
}
+ public Connection getConnection() throws Exception
+ {
+ return getConnection("guest", "guest");
+ }
+
/**
* Get a connection (remote or in-VM)
*
diff --git a/qpid/java/common.xml b/qpid/java/common.xml
index 8e44693c63..cfaab2cc84 100644
--- a/qpid/java/common.xml
+++ b/qpid/java/common.xml
@@ -198,6 +198,13 @@
that modules build root from underneath the project build root:
${build}/&lt;module&gt;
+
+ ant clean-results
+
+ The clean-results target removes all test output from the test
+ results directory:
+
+ ${build.results}
</echo>
</target>
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 08adb99c47..9f0af7cfa1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -63,6 +63,7 @@ public class Session extends Invoker
private long commandsIn = 0;
// completed incoming commands
private final RangeSet processed = new RangeSet();
+ private long processedMark = -1;
private Range syncPoint = null;
// outgoing command count
@@ -132,25 +133,25 @@ public class Session extends Invoker
public void flushProcessed()
{
- long mark = -1;
boolean first = true;
RangeSet rest = new RangeSet();
synchronized (processed)
{
for (Range r: processed)
{
- if (first)
+ if (first && r.includes(processedMark))
{
- first = false;
- mark = r.getUpper();
+ processedMark = r.getUpper();
}
else
{
rest.add(r);
}
+
+ first = false;
}
}
- executionComplete(mark, rest);
+ executionComplete(processedMark, rest);
}
void syncPoint()
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
index 17ae7ea0f7..f0f5731037 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
@@ -51,16 +51,23 @@ public class MinaSender implements Sender<java.nio.ByteBuffer>
{
throw new TransportException("attempted to write to a closed socket");
}
- lastWrite = session.write(ByteBuffer.wrap(buf));
+
+ synchronized (this)
+ {
+ lastWrite = session.write(ByteBuffer.wrap(buf));
+ }
}
- public void close()
+ public synchronized void close()
{
// MINA will sometimes throw away in-progress writes when you
// ask it to close
- if (lastWrite != null)
+ synchronized (this)
{
- lastWrite.join();
+ if (lastWrite != null)
+ {
+ lastWrite.join();
+ }
}
CloseFuture closed = session.close();
closed.join();
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index d80a2c0f56..af943e3b1f 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -176,7 +176,7 @@
<target name="test" depends="compile-tests" if="module.test.src.exists"
description="execute unit tests">
- <junit fork="yes" haltonfailure="no" printsummary="on" timeout="90000">
+ <junit fork="yes" haltonfailure="no" printsummary="on" timeout="600000">
<sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
<sysproperty key="root.logging.level" value="${root.logging.level}"/>
diff --git a/qpid/python/examples/direct/verify.in b/qpid/python/examples/direct/verify.in
index e0dca33039..5e691619d9 100644
--- a/qpid/python/examples/direct/verify.in
+++ b/qpid/python/examples/direct/verify.in
@@ -1,6 +1,6 @@
-==== ./declare_queues.py.out
-==== ./direct_producer.py.out
-==== ./direct_consumer.py.out
+==== declare_queues.py.out
+==== direct_producer.py.out
+==== direct_consumer.py.out
message 0
message 1
message 2
diff --git a/qpid/python/examples/fanout/verify.in b/qpid/python/examples/fanout/verify.in
index c625c30773..a5f57f0b4b 100644
--- a/qpid/python/examples/fanout/verify.in
+++ b/qpid/python/examples/fanout/verify.in
@@ -1,6 +1,6 @@
-==== ./declare_queues.py.out
-==== ./fanout_producer.py.out
-==== ./fanout_consumer.py.out
+==== declare_queues.py.out
+==== fanout_producer.py.out
+==== fanout_consumer.py.out
message 0
message 1
message 2
diff --git a/qpid/python/examples/pubsub/verify.in b/qpid/python/examples/pubsub/verify.in
index 19dcf88312..69de08d17c 100644
--- a/qpid/python/examples/pubsub/verify.in
+++ b/qpid/python/examples/pubsub/verify.in
@@ -1,4 +1,4 @@
-==== ./topic_publisher.py.out
+==== topic_publisher.py.out
==== topic_subscriber.py.out | remove_uuid64 | sort
message 0
message 0
diff --git a/qpid/python/examples/request-response/verify.in b/qpid/python/examples/request-response/verify.in
index c02a423bcb..f681253b3c 100644
--- a/qpid/python/examples/request-response/verify.in
+++ b/qpid/python/examples/request-response/verify.in
@@ -1,4 +1,4 @@
-==== ./client.py.out | remove_uuid64
+==== client.py.out | remove_uuid64
Request: Twas brilling, and the slithy toves
Request: Did gyre and gimble in the wabe.
Request: All mimsy were the borogroves,
@@ -9,6 +9,6 @@ Response: DID GYRE AND GIMBLE IN THE WABE.
Response: ALL MIMSY WERE THE BOROGROVES,
Response: AND THE MOME RATHS OUTGRABE.
No more messages!
-==== server.py.out | remove_uuid64
+==== server.py.out | remove_uuid64
Request server running - run your client now.
(Times out after 100 seconds ...)
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index 9ec1cc270c..c251e6aca0 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,7 +47,7 @@ class MessageTests(TestBase):
msg = included.get(timeout=1)
self.assertEqual("consume_no_local", msg.content.body)
try:
- excluded.get(timeout=1)
+ excluded.get(timeout=1)
self.fail("Received locally published message though no_local=true")
except Empty: None
@@ -59,9 +59,9 @@ class MessageTests(TestBase):
could be left on the queue, possibly never being consumed
(this is the case for example in the qpid JMS mapping of
topics). This test excercises a Qpid C++ broker hack that
- deletes such messages.
+ deletes such messages.
"""
-
+
channel = self.channel
#setup:
channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
@@ -84,7 +84,7 @@ class MessageTests(TestBase):
msg = excluded.get(timeout=1)
self.assertEqual("foreign", msg.content.body)
try:
- excluded.get(timeout=1)
+ excluded.get(timeout=1)
self.fail("Received extra message")
except Empty: None
#check queue is empty
@@ -107,7 +107,7 @@ class MessageTests(TestBase):
except Closed, e:
self.assertChannelException(403, e.args[0])
- #open new channel and cleanup last consumer:
+ #open new channel and cleanup last consumer:
channel = self.client.channel(2)
channel.session_open()
@@ -173,7 +173,7 @@ class MessageTests(TestBase):
msg = myqueue.get(timeout=1)
self.assertEqual("One", msg.content.body)
try:
- msg = myqueue.get(timeout=1)
+ msg = myqueue.get(timeout=1)
self.fail("Got message after cancellation: " + msg)
except Empty: None
@@ -188,7 +188,7 @@ class MessageTests(TestBase):
"""
channel = self.channel
channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True)
-
+
self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
@@ -197,13 +197,13 @@ class MessageTests(TestBase):
channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
-
+
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
msg3 = queue.get(timeout=1)
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
-
+
self.assertEqual("One", msg1.content.body)
self.assertEqual("Two", msg2.content.body)
self.assertEqual("Three", msg3.content.body)
@@ -214,10 +214,10 @@ class MessageTests(TestBase):
msg4.complete(cumulative=False)
channel.message_recover(requeue=False)
-
+
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
-
+
self.assertEqual("Three", msg3b.content.body)
self.assertEqual("Five", msg5b.content.body)
@@ -236,7 +236,7 @@ class MessageTests(TestBase):
channel.queue_bind(exchange="amq.fanout", queue="queue-a")
channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
channel.queue_bind(exchange="amq.fanout", queue="queue-b")
-
+
self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
confirmed = self.client.queue("confirmed")
@@ -246,10 +246,10 @@ class MessageTests(TestBase):
for d in data:
channel.message_transfer(destination="amq.fanout", content=Content(body=d))
- for q in [confirmed, unconfirmed]:
+ for q in [confirmed, unconfirmed]:
for d in data:
self.assertEqual(d, q.get(timeout=1).content.body)
- self.assertEmpty(q)
+ self.assertEmpty(q)
channel.message_recover(requeue=False)
@@ -265,7 +265,7 @@ class MessageTests(TestBase):
data.remove(msg.content.body)
msg.complete(cumulative=False)
channel.message_recover(requeue=False)
-
+
def test_recover_requeue(self):
"""
@@ -273,7 +273,7 @@ class MessageTests(TestBase):
"""
channel = self.channel
channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
-
+
self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
@@ -282,13 +282,13 @@ class MessageTests(TestBase):
channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
-
+
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
msg3 = queue.get(timeout=1)
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
-
+
self.assertEqual("One", msg1.content.body)
self.assertEqual("Two", msg2.content.body)
self.assertEqual("Three", msg3.content.body)
@@ -307,10 +307,10 @@ class MessageTests(TestBase):
self.subscribe(queue="test-requeue", destination="consumer_tag")
queue2 = self.client.queue("consumer_tag")
-
+
msg3b = queue2.get(timeout=1)
msg5b = queue2.get(timeout=1)
-
+
self.assertEqual("Three", msg3b.content.body)
self.assertEqual("Five", msg5b.content.body)
@@ -327,8 +327,8 @@ class MessageTests(TestBase):
extra = queue.get(timeout=1)
self.fail("Got unexpected message in original queue: " + extra.content.body)
except Empty: None
-
-
+
+
def test_qos_prefetch_count(self):
"""
Test that the prefetch count specified is honoured
@@ -370,7 +370,7 @@ class MessageTests(TestBase):
except Empty: None
-
+
def test_qos_prefetch_size(self):
"""
Test that the prefetch size specified is honoured
@@ -448,7 +448,7 @@ class MessageTests(TestBase):
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
-
+
#set message credit to finite amount (less than enough for all messages)
channel.message_flow(unit = 0, value = 5, destination = "c")
#set infinite byte credit
@@ -458,7 +458,7 @@ class MessageTests(TestBase):
for i in range(1, 6):
self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
self.assertEmpty(q)
-
+
#increase credit again and check more are received
for i in range(6, 11):
channel.message_flow(unit = 0, value = 1, destination = "c")
@@ -512,7 +512,7 @@ class MessageTests(TestBase):
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
-
+
#set message credit to finite amount (less than enough for all messages)
channel.message_flow(unit = 0, value = 5, destination = "c")
#set infinite byte credit
@@ -523,7 +523,7 @@ class MessageTests(TestBase):
msg = q.get(timeout = 1)
self.assertDataEquals(channel, msg, "Message %d" % i)
self.assertEmpty(q)
-
+
#acknowledge messages and check more are received
msg.complete(cumulative=True)
for i in range(6, 11):
@@ -560,7 +560,7 @@ class MessageTests(TestBase):
msgs.append(msg)
self.assertDataEquals(channel, msg, "abcdefgh")
self.assertEmpty(q)
-
+
#ack each message individually and check more are received
for i in range(5):
msg = msgs.pop()
@@ -650,7 +650,7 @@ class MessageTests(TestBase):
channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
queue = self.client.queue("a")
first = queue.get(timeout = 1)
- for i in range (2, 10):
+ for i in range (2, 10):
self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
last = queue.get(timeout = 1)
self.assertEmpty(queue)
@@ -672,7 +672,7 @@ class MessageTests(TestBase):
channel.message_flow(unit = 0, value = 10, destination = "a")
channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
queue = self.client.queue("a")
- for i in range (1, 11):
+ for i in range (1, 11):
self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body)
self.assertEmpty(queue)
@@ -683,14 +683,14 @@ class MessageTests(TestBase):
self.assertEmpty(queue)
def test_subscribe_not_acquired_2(self):
- channel = self.channel
+ channel = self.channel
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
- #consume some of them
+ #consume some of them
channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
channel.message_flow_mode(mode = 0, destination = "a")
channel.message_flow(unit = 0, value = 5, destination = "a")