summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-02-18 12:07:38 +0000
committerAidan Skinner <aidan@apache.org>2008-02-18 12:07:38 +0000
commit7ef664e5e82c93675ecfa5cd8cdc9dcefca453ac (patch)
tree0f8b3e5148dcaaea1570a8a7d8bd39ce6d0c9b17
parent81d9c0a9184c0fde88c63c785a4e3dc73fdec8bd (diff)
downloadqpid-python-7ef664e5e82c93675ecfa5cd8cdc9dcefca453ac.tar.gz
Merged revisions 627484,627531,627536,627543,627607,627629,627634,627652-627655,627718,627777,628132,628169,628171,628659 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk ........ r627484 | aconway | 2008-02-13 15:39:59 +0000 (Wed, 13 Feb 2008) | 14 lines Broker::connect - connect to URL, return ConnectionInputHandler. M src/qpid/broker/Broker.cpp M src/qpid/broker/Broker.h M src/qpid/sys/Acceptor.h M src/qpid/sys/AsynchIOAcceptor.cpp AMQBody::match - test for matching frames. M src/qpid/framing/AMQBody.cpp M src/qpid/framing/AMQBody.h Url::throwIfEmpty() - test for empty URL. M src/qpid/Url.cpp M src/qpid/Url.h ........ r627531 | rhs | 2008-02-13 17:13:45 +0000 (Wed, 13 Feb 2008) | 1 line added client/example to the build ........ r627536 | rhs | 2008-02-13 17:30:32 +0000 (Wed, 13 Feb 2008) | 1 line updated runSample.sh from jrobie ........ r627543 | rajith | 2008-02-13 17:33:20 +0000 (Wed, 13 Feb 2008) | 1 line modified it to show the selector test bug ........ r627607 | rhs | 2008-02-13 22:45:26 +0000 (Wed, 13 Feb 2008) | 1 line fixed logging of message Data ........ r627629 | rajith | 2008-02-14 01:08:58 +0000 (Thu, 14 Feb 2008) | 1 line This is a fix for QPID-791 ........ r627634 | rajith | 2008-02-14 02:09:59 +0000 (Thu, 14 Feb 2008) | 1 line Fix for Qpid-793 ........ r627652 | rhs | 2008-02-14 03:52:08 +0000 (Thu, 14 Feb 2008) | 1 line set initial cause ........ r627653 | rhs | 2008-02-14 03:53:06 +0000 (Thu, 14 Feb 2008) | 1 line rethrow exception rather than ignoring it ........ r627654 | rhs | 2008-02-14 03:54:04 +0000 (Thu, 14 Feb 2008) | 1 line print message text directly ........ r627655 | rhs | 2008-02-14 03:55:28 +0000 (Thu, 14 Feb 2008) | 1 line recompute internal 0-10 message for reused messages ........ r627718 | gsim | 2008-02-14 11:49:17 +0000 (Thu, 14 Feb 2008) | 3 lines Fixed bug in browsing that failed to deal correctly with 'gaps' in message sequence. ........ r627777 | rhs | 2008-02-14 15:12:11 +0000 (Thu, 14 Feb 2008) | 1 line bumped spec release ........ r628132 | rhs | 2008-02-15 17:49:47 +0000 (Fri, 15 Feb 2008) | 1 line protect the _currentException variable with its own lock, this avoids deadlocks between getCurrentException and the dispatcher thread ........ r628169 | aconway | 2008-02-15 21:00:44 +0000 (Fri, 15 Feb 2008) | 2 lines Updated c++ and python fanout examples and verify scripts. ........ r628171 | aconway | 2008-02-15 21:03:28 +0000 (Fri, 15 Feb 2008) | 2 lines svn:ignore properties. ........ r628659 | gsim | 2008-02-18 09:23:54 +0000 (Mon, 18 Feb 2008) | 3 lines Requests to release a message that has not been acquired should be ignored. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@628695 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/bin/verify6
-rw-r--r--qpid/cpp/examples/Makefile.am1
-rw-r--r--qpid/cpp/examples/examples/fanout/Makefile2
-rw-r--r--qpid/cpp/examples/examples/fanout/declare_queues.cpp80
-rw-r--r--qpid/cpp/examples/examples/fanout/listener.cpp16
-rw-r--r--qpid/cpp/examples/examples/fanout/verify7
-rw-r--r--qpid/cpp/examples/examples/fanout/verify.in34
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_cpp_python6
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_cpp_python.in43
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_python_cpp6
-rw-r--r--qpid/cpp/examples/examples/fanout/verify_python_cpp.in20
-rw-r--r--qpid/cpp/qpidc.spec.in5
-rw-r--r--qpid/cpp/src/qpid/Url.cpp4
-rw-r--r--qpid/cpp/src/qpid/Url.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h15
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp21
-rw-r--r--qpid/cpp/src/qpid/framing/AMQBody.cpp37
-rw-r--r--qpid/cpp/src/qpid/framing/AMQBody.h3
-rw-r--r--qpid/cpp/src/qpid/sys/Acceptor.h5
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp9
-rw-r--r--qpid/java/build.deps2
-rw-r--r--qpid/java/build.xml3
-rw-r--r--qpid/java/client/example/build.xml27
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java2
-rwxr-xr-xqpid/java/client/example/src/main/java/runSample.sh6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java1
-rwxr-xr-xqpid/python/examples/fanout/declare_queues.py52
-rwxr-xr-xqpid/python/examples/fanout/fanout_consumer.py92
-rw-r--r--qpid/python/examples/fanout/verify6
-rw-r--r--qpid/python/examples/fanout/verify.in43
-rw-r--r--qpid/python/tests_0-10/message.py85
41 files changed, 482 insertions, 273 deletions
diff --git a/qpid/bin/verify b/qpid/bin/verify
index 844e99b765..96fe1df9b3 100755
--- a/qpid/bin/verify
+++ b/qpid/bin/verify
@@ -19,7 +19,11 @@ trap cleanup EXIT
ARGS="${QPID_HOST:-localhost} $QPID_PORT"
-outfile() { echo $1.out; }
+outfile() {
+ file=$1
+ while [ -f $file.out ]; do file="${file}X"; done
+ echo $file.out
+ }
fail() { test -n "$*" && echo $* 1>&2 ; FAIL=1; return 1; }
diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am
index 92625550e2..9b41b130e9 100644
--- a/qpid/cpp/examples/Makefile.am
+++ b/qpid/cpp/examples/Makefile.am
@@ -6,7 +6,6 @@ nobase_pkgdata_DATA= \
examples/request-response/server.cpp \
examples/request-response/Makefile \
examples/fanout/Makefile \
- examples/fanout/declare_queues.cpp \
examples/fanout/listener.cpp \
examples/fanout/fanout_producer.cpp \
examples/pub-sub/Makefile \
diff --git a/qpid/cpp/examples/examples/fanout/Makefile b/qpid/cpp/examples/examples/fanout/Makefile
index b58026f337..7963af7ddf 100644
--- a/qpid/cpp/examples/examples/fanout/Makefile
+++ b/qpid/cpp/examples/examples/fanout/Makefile
@@ -2,7 +2,7 @@ CXX=g++
CXXFLAGS=
LDFLAGS=-lqpidclient
-PROGRAMS=declare_queues fanout_producer listener
+PROGRAMS=fanout_producer listener
all: $(PROGRAMS)
diff --git a/qpid/cpp/examples/examples/fanout/declare_queues.cpp b/qpid/cpp/examples/examples/fanout/declare_queues.cpp
deleted file mode 100644
index c76ed54730..0000000000
--- a/qpid/cpp/examples/examples/fanout/declare_queues.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/**
- * declare_queues.cpp (this program):
- *
- * This program is one of three programs designed to be used
- * together. These programs use the "amq.fanout" exchange.
- *
- * fanout_producer.cpp:
- *
- * Publishes to a broker, specifying a routing key.
- *
- * listener.cpp
- *
- * Reads from a queue on the broker using a message listener.
- *
- */
-
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-
-#include <unistd.h>
-#include <cstdlib>
-#include <iostream>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-using std::string;
-
-
-int main(int argc, char** argv) {
- const char* host = argc>1 ? argv[1] : "127.0.0.1";
- int port = argc>2 ? atoi(argv[2]) : 5672;
- Connection connection;
- Message msg;
- try {
- connection.open(host, port);
- Session session = connection.newSession();
-
-
- //--------- Main body of program --------------------------------------------
-
- // Create and bind a queue named "message_queue".
- session.queueDeclare(arg::queue="message_queue");
- session.queueBind(arg::queue="message_queue", arg::exchange="amq.fanout");
-
-
- //-----------------------------------------------------------------------------
-
- connection.close();
- return 0;
- } catch(const std::exception& error) {
- std::cout << error.what() << std::endl;
- }
- return 1;
-
-}
-
-
-
diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp
index 91b5123c68..5295e10f34 100644
--- a/qpid/cpp/examples/examples/fanout/listener.cpp
+++ b/qpid/cpp/examples/examples/fanout/listener.cpp
@@ -69,12 +69,24 @@ int main(int argc, char** argv) {
//--------- Main body of program --------------------------------------------
+ // Unique name for private queue:
+ std::string myQueue=session.getId().str();
+ // Declear my queue.
+ session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
+ arg::autoDelete=true);
+ // Bind my queue to the fanout exchange.
+ // Note no routingKey required, the fanout exchange delivers
+ // all messages to all bound queues unconditionally.
+ session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
+
+ // Create a listener and subscribe it to my queue.
SubscriptionManager subscriptions(session);
- // Create a listener and subscribe it to the queue named "message_queue"
Listener listener(subscriptions);
- subscriptions.subscribe(listener, "message_queue");
+ subscriptions.subscribe(listener, myQueue);
+
// Deliver messages until the subscription is cancelled
// by Listener::received()
+ std::cout << "Listening" << std::endl;
subscriptions.run();
//---------------------------------------------------------------------------
diff --git a/qpid/cpp/examples/examples/fanout/verify b/qpid/cpp/examples/examples/fanout/verify
index 1e1e8121d2..ace4a6dfee 100644
--- a/qpid/cpp/examples/examples/fanout/verify
+++ b/qpid/cpp/examples/examples/fanout/verify
@@ -1,3 +1,6 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-clients ./declare_queues ./fanout_producer ./listener
-outputs ./declare_queues.out ./fanout_producer.out ./listener.out
+background "Listening" ./listener
+background "Listening" ./listener
+background "Listening" ./listener
+clients ./fanout_producer
+outputs ./fanout_producer.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid" "./listenerXX.out | remove_uuid"
diff --git a/qpid/cpp/examples/examples/fanout/verify.in b/qpid/cpp/examples/examples/fanout/verify.in
index 23d08f38dd..8f8612ce67 100644
--- a/qpid/cpp/examples/examples/fanout/verify.in
+++ b/qpid/cpp/examples/examples/fanout/verify.in
@@ -1,6 +1,6 @@
-==== declare_queues.out
==== fanout_producer.out
-==== listener.out
+==== listener.out | remove_uuid
+Listening
Message: Message 0
Message: Message 1
Message: Message 2
@@ -12,4 +12,32 @@ Message: Message 7
Message: Message 8
Message: Message 9
Message: That's all, folks!
-Shutting down listener for message_queue
+Shutting down listener for
+==== listenerX.out | remove_uuid
+Listening
+Message: Message 0
+Message: Message 1
+Message: Message 2
+Message: Message 3
+Message: Message 4
+Message: Message 5
+Message: Message 6
+Message: Message 7
+Message: Message 8
+Message: Message 9
+Message: That's all, folks!
+Shutting down listener for
+==== listenerXX.out | remove_uuid
+Listening
+Message: Message 0
+Message: Message 1
+Message: Message 2
+Message: Message 3
+Message: Message 4
+Message: Message 5
+Message: Message 6
+Message: Message 7
+Message: Message 8
+Message: Message 9
+Message: That's all, folks!
+Shutting down listener for
diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python b/qpid/cpp/examples/examples/fanout/verify_cpp_python
index f53784ef1c..e840e68f91 100644
--- a/qpid/cpp/examples/examples/fanout/verify_cpp_python
+++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python
@@ -1,5 +1,7 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
py=$PYTHON_EXAMPLES/fanout
-clients ./declare_queues ./fanout_producer $py/fanout_consumer.py
-outputs ./declare_queues.out ./fanout_producer.out $py/fanout_consumer.py.out
+background "Subscribed" $py/fanout_consumer.py
+background "Subscribed" $py/fanout_consumer.py
+clients ./fanout_producer
+outputs ./fanout_producer.out "$py/fanout_consumer.py.out | remove_uuid64" "$py/fanout_consumer.pyX.out | remove_uuid64"
diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in
index cb9e52cfcc..fac2b365d3 100644
--- a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in
+++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in
@@ -1,14 +1,31 @@
-==== declare_queues.out
==== fanout_producer.out
-==== fanout_consumer.py.out
-Message 0
-Message 1
-Message 2
-Message 3
-Message 4
-Message 5
-Message 6
-Message 7
-Message 8
-Message 9
-That's all, folks!
+==== fanout_consumer.py.out | remove_uuid64
+Messages queue:
+Subscribed to queue
+Response: Message 0
+Response: Message 1
+Response: Message 2
+Response: Message 3
+Response: Message 4
+Response: Message 5
+Response: Message 6
+Response: Message 7
+Response: Message 8
+Response: Message 9
+Response: That's all, folks!
+No more messages!
+==== fanout_consumer.pyX.out | remove_uuid64
+Messages queue:
+Subscribed to queue
+Response: Message 0
+Response: Message 1
+Response: Message 2
+Response: Message 3
+Response: Message 4
+Response: Message 5
+Response: Message 6
+Response: Message 7
+Response: Message 8
+Response: Message 9
+Response: That's all, folks!
+No more messages!
diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp b/qpid/cpp/examples/examples/fanout/verify_python_cpp
index 00a0727352..d9b3361523 100644
--- a/qpid/cpp/examples/examples/fanout/verify_python_cpp
+++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp
@@ -1,5 +1,7 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
py=$PYTHON_EXAMPLES/fanout
-clients $py/declare_queues.py $py/fanout_producer.py ./listener
-outputs $py/declare_queues.py.out $py/fanout_producer.py.out ./listener.out
+background "Listening" ./listener
+background "Listening" ./listener
+clients $py/fanout_producer.py
+outputs $py/fanout_producer.py.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid"
diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in
index 87a1694e6c..8f9e959053 100644
--- a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in
+++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in
@@ -1,6 +1,6 @@
-==== declare_queues.py.out
==== fanout_producer.py.out
-==== listener.out
+==== listener.out | remove_uuid
+Listening
Message: message 0
Message: message 1
Message: message 2
@@ -12,4 +12,18 @@ Message: message 7
Message: message 8
Message: message 9
Message: That's all, folks!
-Shutting down listener for message_queue
+Shutting down listener for
+==== listenerX.out | remove_uuid
+Listening
+Message: message 0
+Message: message 1
+Message: message 2
+Message: message 3
+Message: message 4
+Message: message 5
+Message: message 6
+Message: message 7
+Message: message 8
+Message: message 9
+Message: That's all, folks!
+Shutting down listener for
diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in
index 3892b1ab40..8c248a44ee 100644
--- a/qpid/cpp/qpidc.spec.in
+++ b/qpid/cpp/qpidc.spec.in
@@ -5,7 +5,7 @@
Name: @PACKAGE@
Version: @VERSION@
-Release: 22%{?dist}
+Release: 23%{?dist}
Summary: Libraries for Qpid C++ client applications
Group: System Environment/Libraries
License: Apache Software License
@@ -156,6 +156,9 @@ fi
/sbin/ldconfig
%changelog
+* Thu Feb 14 2008 Rafael Schloming <rafaels@redhat.com> - 0.2-23
+- Bumped to pull in fixes for Beta 3
+
* Tue Feb 12 2008 Alan Conway <aconway@redhat.com> - 0.2-22
- Added -g to compile flags for debug symbols.
diff --git a/qpid/cpp/src/qpid/Url.cpp b/qpid/cpp/src/qpid/Url.cpp
index d056edc683..aa53d5cbe2 100644
--- a/qpid/cpp/src/qpid/Url.cpp
+++ b/qpid/cpp/src/qpid/Url.cpp
@@ -160,6 +160,10 @@ void Url::parseNoThrow(const char* url) {
clear();
}
+void Url::throwIfEmpty() const {
+ throw InvalidUrl("URL contains no addresses");
+}
+
std::istream& operator>>(std::istream& is, Url& url) {
std::string s;
is >> s;
diff --git a/qpid/cpp/src/qpid/Url.h b/qpid/cpp/src/qpid/Url.h
index 2e24ba948d..20f42db0ad 100644
--- a/qpid/cpp/src/qpid/Url.h
+++ b/qpid/cpp/src/qpid/Url.h
@@ -78,6 +78,9 @@ struct Url : public std::vector<Address> {
template<class T> Url& operator=(T s) { parse(s); return *this; }
+ /** Throw InvalidUrl if the URL does not contain any addresses. */
+ void throwIfEmpty() const;
+
/** Replace contents with parsed URL as defined in
* https://wiki.108.redhat.com/jira/browse/AMQP-95
*@exception InvalidUrl if the url is invalid.
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 117a93b571..0a0eb0a0df 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -40,6 +40,7 @@
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/Url.h"
#include <boost/bind.hpp>
@@ -263,11 +264,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_ECHO :
status = Manageable::STATUS_OK;
break;
- case management::Broker::METHOD_CONNECT :
- connect(dynamic_cast<management::ArgsBrokerConnect&>(args));
+ case management::Broker::METHOD_CONNECT : {
+ management::ArgsBrokerConnect& hp=
+ dynamic_cast<management::ArgsBrokerConnect&>(args);
+ connect(hp.i_host, hp.i_port);
status = Manageable::STATUS_OK;
break;
-
+ }
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -277,9 +280,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
-void Broker::connect(management::ArgsBrokerConnect& args)
+sys::ConnectionInputHandler* Broker::connect(
+ const std::string& host, uint16_t port,
+ sys::ConnectionInputHandlerFactory* f)
+{
+ return getAcceptor().connect(host, port, f ? f : &factory);
+}
+
+sys::ConnectionInputHandler* Broker::connect(
+ const Url& url, sys::ConnectionInputHandlerFactory* f)
{
- getAcceptor().connect(args.i_host, args.i_port, &factory);
+ url.throwIfEmpty();
+ TcpAddress addr=boost::get<TcpAddress>(url[0]);
+ return connect(addr.host, addr.port, f);
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 68dbf570f0..55bc7644a5 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -47,6 +47,9 @@
#include <vector>
namespace qpid {
+
+class Url;
+
namespace broker {
static const uint16_t DEFAULT_PORT=5672;
@@ -54,7 +57,8 @@ static const uint16_t DEFAULT_PORT=5672;
/**
* A broker instance.
*/
-class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable
+class Broker : public sys::Runnable, public Plugin::Target,
+ public management::Manageable
{
public:
@@ -111,6 +115,14 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
+ /** Create a connection to another broker. */
+ sys::ConnectionInputHandler*
+ connect(const std::string& host, uint16_t port,
+ sys::ConnectionInputHandlerFactory* =0);
+ /** Create a connection to another broker. */
+ sys::ConnectionInputHandler*
+ connect(const Url& url, sys::ConnectionInputHandlerFactory* =0);
+
private:
sys::Acceptor& getAcceptor() const;
@@ -129,7 +141,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
Vhost::shared_ptr vhostObject;
void declareStandardExchange(const std::string& name, const std::string& type);
- void connect(management::ArgsBrokerConnect& args);
};
}}
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 2e7c126162..154394e5de 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -104,7 +104,7 @@ void DeliveryRecord::requeue() const
void DeliveryRecord::release()
{
- if (!confirmed) {
+ if (acquired && !confirmed) {
queue->requeue(msg);
acquired = false;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index c7dd656a4e..d34ca06364 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -170,12 +170,17 @@ void Queue::requeue(const QueuedMessage& msg){
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
+ QPID_LOG(debug, "attempting to acquire " << msg.position);
for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
if (i->position == msg.position) {
messages.erase(i);
+ QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position);
return true;
+ } else {
+ QPID_LOG(debug, "No match: " << i->position << " != " << msg.position);
}
}
+ QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
}
@@ -255,8 +260,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
m = msg;
return true;
} else {
- //consumer hasn't got enough credit for the message
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ //browser hasn't got enough credit for the message
+ QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'");
return false;
}
} else {
@@ -304,11 +309,13 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) {
msg = messages.front();
return true;
} else {
- uint index = (c.position - messages.front().position) + 1;
- if (index < messages.size()) {
- msg = messages[index];
- return true;
- }
+ //TODO: can improve performance of this search, for now just searching linearly from end
+ Messages::reverse_iterator pos;
+ for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) {
+ pos = i;
+ }
+ msg = *pos;
+ return true;
}
}
addListener(c);
diff --git a/qpid/cpp/src/qpid/framing/AMQBody.cpp b/qpid/cpp/src/qpid/framing/AMQBody.cpp
index a64d224a86..b3eeae0615 100644
--- a/qpid/cpp/src/qpid/framing/AMQBody.cpp
+++ b/qpid/cpp/src/qpid/framing/AMQBody.cpp
@@ -19,15 +19,46 @@
*
*/
-#include "AMQBody.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeartbeatBody.h"
#include <iostream>
-std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body)
+namespace qpid {
+namespace framing {
+
+std::ostream& operator<<(std::ostream& out, const AMQBody& body)
{
body.print(out);
return out;
}
-qpid::framing::AMQBody::~AMQBody() {}
+AMQBody::~AMQBody() {}
+
+namespace {
+struct MatchBodies : public AMQBodyConstVisitor {
+ const AMQBody& body;
+ bool match;
+
+ MatchBodies(const AMQBody& b) : body(b), match(false) {}
+ virtual ~MatchBodies() {}
+
+ virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); }
+ virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); }
+ virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); }
+ virtual void visit(const AMQMethodBody& x) {
+ const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body);
+ match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId());
+ }
+};
+}
+bool AMQBody::match(const AMQBody& a, const AMQBody& b) {
+ MatchBodies matcher(a);
+ b.accept(matcher);
+ return matcher.match;
+}
+}} // namespace
diff --git a/qpid/cpp/src/qpid/framing/AMQBody.h b/qpid/cpp/src/qpid/framing/AMQBody.h
index b05301bd05..f3bf65470c 100644
--- a/qpid/cpp/src/qpid/framing/AMQBody.h
+++ b/qpid/cpp/src/qpid/framing/AMQBody.h
@@ -59,6 +59,9 @@ class AMQBody
virtual AMQMethodBody* getMethod() { return 0; }
virtual const AMQMethodBody* getMethod() const { return 0; }
+
+ /** Match if same type and same class/method ID for methods */
+ static bool match(const AMQBody& , const AMQBody& );
};
std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/sys/Acceptor.h
index 7ff03e0eeb..5eb1f1a500 100644
--- a/qpid/cpp/src/qpid/sys/Acceptor.h
+++ b/qpid/cpp/src/qpid/sys/Acceptor.h
@@ -29,6 +29,7 @@ namespace qpid {
namespace sys {
class ConnectionInputHandlerFactory;
+class ConnectionInputHandler;
class Acceptor : public qpid::SharedObject<Acceptor>
{
@@ -38,7 +39,9 @@ class Acceptor : public qpid::SharedObject<Acceptor>
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
virtual void run(ConnectionInputHandlerFactory* factory) = 0;
- virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
+ virtual ConnectionInputHandler* connect(
+ const std::string& host, int16_t port,
+ ConnectionInputHandlerFactory* factory) = 0;
/** Note: this function is async-signal safe */
virtual void shutdown() = 0;
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 9fd32add72..0586eb9d36 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -54,7 +54,10 @@ class AsynchIOAcceptor : public Acceptor {
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
- void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
+ ConnectionInputHandler* connect(
+ const std::string& host, int16_t port,
+ ConnectionInputHandlerFactory* factory);
+
void shutdown();
uint16_t getPort() const;
@@ -188,7 +191,7 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
}
}
-void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
@@ -209,7 +212,7 @@ void AsynchIOAcceptor::connect(const std::string& host, int16_t port, Connection
aio->queueReadBuffer(new Buff);
}
aio->start(poller);
-
+ return handler;
}
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index f8b1e8b4e3..21b13fe019 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -39,6 +39,8 @@ systests.libs=${client.libs} ${test.libs}
perftests.libs=${systests.libs}
integrationtests.libs=${systests.libs}
+client-example.libs=${client.libs}
+
ibm-icu=lib/com.ibm.icu-3.4.4.jar
ecl-core-jface=lib/org.eclipse.jface-3.2.0.jar
ecl-core-commands=lib/org.eclipse.core.commands-3.2.0.jar
diff --git a/qpid/java/build.xml b/qpid/java/build.xml
index 440c361a29..5b00f630d1 100644
--- a/qpid/java/build.xml
+++ b/qpid/java/build.xml
@@ -23,10 +23,11 @@
<import file="common.xml"/>
<property name="modules.core" value="common broker client"/>
+ <property name="modules.examples" value="client/example"/>
<property name="modules.tests" value="systests perftests integrationtests"/>
<property name="modules.management" value="management/eclipse-plugin"/>
- <property name="modules" value="${modules.core} ${modules.tests} ${modules.management}"/>
+ <property name="modules" value="${modules.core} ${modules.examples} ${modules.tests} ${modules.management}"/>
<property name="qpid.jar" location="${build.lib}/qpid-incubating.jar"/>
<basename property="qpid.jar.name" file="${qpid.jar}"/>
diff --git a/qpid/java/client/example/build.xml b/qpid/java/client/example/build.xml
new file mode 100644
index 0000000000..8bcd59d829
--- /dev/null
+++ b/qpid/java/client/example/build.xml
@@ -0,0 +1,27 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="AMQ Client" default="build">
+
+ <property name="module.depends" value="common client"/>
+
+ <import file="../../module.xml"/>
+
+</project>
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java
index fb22966b11..360b2c6aed 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java
@@ -125,7 +125,7 @@ public class Publisher
for (int i=1; i <= 6; i++)
{
message.setText("message " + i);
- System.out.println(CLASS + ": Sending message: " + i);
+ System.out.println(CLASS + ": Sending " + message.getText());
messagePublisher
.send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
}
diff --git a/qpid/java/client/example/src/main/java/runSample.sh b/qpid/java/client/example/src/main/java/runSample.sh
index d596f0a924..23bde52c44 100755
--- a/qpid/java/client/example/src/main/java/runSample.sh
+++ b/qpid/java/client/example/src/main/java/runSample.sh
@@ -23,7 +23,7 @@ if test "'x$QPID_SAMPLE'" != "'x'"
then
QPID_SAMPLE=$QPID_SAMPLE
else
- QPID_SAMPLE="/usr/share/rhm-docs-0.2/java"
+ QPID_SAMPLE="/usr/share/doc/rhm-0.2"
fi
echo "Using QPID_SAMPLE: $QPID_SAMPLE"
@@ -39,7 +39,7 @@ javac -cp "$CLASSPATH" -sourcepath "$QPID_SAMPLE" -d . `find $QPID_SAMPLE -name
CLASSPATH="$CLASSPATH$DIVIDER$."
# Set VM parameters
-QPID_PARAM="$QPID_PARAM -Dlog4j.configuration=file://$QPID_SAMPLE/log4j.xml"
+QPID_PARAM="$QPID_PARAM -Dlog4j.configuration=file://$QPID_SAMPLE/java/log4j.xml"
# Check if the user supplied a sample classname
@@ -49,4 +49,4 @@ then
exit;
else
java -cp $CLASSPATH $QPID_PARAM $*
-fi \ No newline at end of file
+fi
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index fc6ebad1c0..f56477c0fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -92,13 +92,11 @@ public class AMQQueueBrowser implements QueueBrowser
return new Enumeration()
{
-
- Message _nextMessage = consumer.receive();
+ Message _nextMessage = consumer.receive(1000);
public boolean hasMoreElements()
{
_logger.info("QB:hasMoreElements:" + (_nextMessage != null));
-
return (_nextMessage != null);
}
@@ -108,7 +106,6 @@ public class AMQQueueBrowser implements QueueBrowser
try
{
_logger.info("QB:nextElement about to receive");
-
_nextMessage = consumer.receive(1000);
_logger.info("QB:nextElement received:" + _nextMessage);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index dd2b9a2389..407f1f3786 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -70,6 +70,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* The latest qpid Exception that has been reaised.
*/
+ private Object _currentExceptionLock = new Object();
private QpidException _currentException;
/**
@@ -553,14 +554,17 @@ public class AMQSession_0_10 extends AMQSession
*
* @throws org.apache.qpid.AMQException get the latest thrown error.
*/
- public synchronized void getCurrentException() throws AMQException
+ public void getCurrentException() throws AMQException
{
- if (_currentException != null)
+ synchronized (_currentExceptionLock)
{
- QpidException toBeTrhown = _currentException;
- _currentException = null;
- throw new AMQException(AMQConstant.getConstant(toBeTrhown.getErrorCode().getCode()),
- toBeTrhown.getMessage(), toBeTrhown);
+ if (_currentException != null)
+ {
+ QpidException toBeThrown = _currentException;
+ _currentException = null;
+ throw new AMQException(AMQConstant.getConstant(toBeThrown.getErrorCode().getCode()),
+ toBeThrown.getMessage(), toBeThrown);
+ }
}
}
@@ -594,11 +598,11 @@ public class AMQSession_0_10 extends AMQSession
{
public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
- synchronized (this)
+ synchronized (_currentExceptionLock)
{
- //todo check the error code for finding out if we need to notify the
+ // todo check the error code for finding out if we need to notify the
// JMS connection exception listener
- _currentException = new QpidException(reason, errorCode, null);
+ _currentException = new QpidException(reason, errorCode, t);
}
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index c9cb1a6313..d42305debb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -609,11 +609,13 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
{
// //fixme this probably is not right
// if (!isNoConsume())
- { // done in BasicCancelOK Handler but not sending one so just deregister.
- deregisterConsumer();
- }
+ //{ // done in BasicCancelOK Handler but not sending one so just deregister.
+ // deregisterConsumer();
+ //}
}
+ deregisterConsumer();
+
if (_messageListener != null && _receiving.get() && _receivingThread != null)
{
if (_logger.isInfoEnabled())
@@ -1140,7 +1142,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
this._queuename = queuename;
}
- public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException
+ public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException
{
_session.addBindingKey(this,amqd,routingKey);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index a31bfe9df1..42c1d687cb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -68,17 +68,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
if (message.get010Message() == null)
{
message.set010Message(new ByteBufferMessage());
- if (message.getData() == null)
- {
- try
- {
- message.get010Message().appendData(ByteBuffer.allocate(0));
- }
- catch (IOException e)
- {
- throw new JMSException(e.getMessage());
- }
- }
+ }
+ // force a rebuild of the 0-10 message if data has changed
+ if (message.getData() == null)
+ {
+ message.dataChanged();
}
DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 7eb56acb27..e3ca6b5de1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -78,7 +78,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
if (_data != null)
{
- _010message.appendData(_data.buf());
+ _010message.appendData(_data.buf().slice());
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 87cc80f21d..4a43a7bba8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -134,6 +134,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
// should never occur
JMSException jmse = new JMSException("Unable to decode text data");
jmse.setLinkedException(e);
+ throw jmse;
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
index 008b85e98a..feb4c1c94d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
@@ -1,9 +1,13 @@
package org.apache.qpidity.nclient;
+import java.util.Enumeration;
+
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
@@ -24,10 +28,29 @@ public class JMSTestCase
javax.jms.Session ssn = con.createSession(false, 1);
javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test");
- javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
- //javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ QueueBrowser browser = ssn.createBrowser((Queue)dest, "Test = 'test'");
+
+ javax.jms.TextMessage msg = ssn.createTextMessage();
+ msg.setStringProperty("TEST", "test");
+ msg.setText("Should get this");
+ prod.send(msg);
+
+ javax.jms.TextMessage msg2 = ssn.createTextMessage();
+ msg2.setStringProperty("TEST", "test2");
+ msg2.setText("Shouldn't get this");
+ prod.send(msg2);
+
+
+ Enumeration enu = browser.getEnumeration();
+ for (;enu.hasMoreElements();)
+ {
+ System.out.println(enu.nextElement());
+ System.out.println("\n");
+ }
- javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive();
+ javax.jms.MessageConsumer cons = ssn.createConsumer(dest, "Test = 'test'");
+ javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive();
cons.setMessageListener(new MessageListener()
{
public void onMessage(Message m)
@@ -35,7 +58,9 @@ public class JMSTestCase
javax.jms.TextMessage m2 = (javax.jms.TextMessage)m;
try
{
+ System.out.println("headers : " + m2.toString());
System.out.println("m : " + m2.getText());
+ System.out.println("\n\n");
}
catch(Exception e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java
index d3b2e88f23..e00f9008d3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java
@@ -41,6 +41,7 @@ public class ExceptionHelper
jmsException = new JMSException(exception.getMessage());
}
jmsException.setLinkedException(exception);
+ jmsException.initCause(exception);
}
else
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
index 602a7fbca1..b9b8636d18 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
@@ -95,7 +95,6 @@ public class Data implements ProtocolEvent
}
str.append(str(buf, 20));
}
- str.setLength(str.length() - 3);
str.append(")");
return str.toString();
}
diff --git a/qpid/python/examples/fanout/declare_queues.py b/qpid/python/examples/fanout/declare_queues.py
deleted file mode 100755
index 52f23f4f9a..0000000000
--- a/qpid/python/examples/fanout/declare_queues.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python
-"""
- declare_queues.py
-
- Creates and binds a queue on an AMQP direct exchange.
-
- All messages using the routing key "routing_key" are
- sent to the queue named "message_queue".
-"""
-
-import qpid
-import sys
-from qpid.client import Client
-from qpid.content import Content
-from qpid.queue import Empty
-
-#----- Initialization -----------------------------------
-
-# Set parameters for login
-
-host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
-port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
-user="guest"
-password="guest"
-
-# Create a client and log in to it.
-
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
-
-session = client.session()
-session.session_open()
-
-#----- Create a queue -------------------------------------
-
-# queue_declare() creates an AMQP queue, which is held
-# on the broker. Published messages are sent to the AMQP queue,
-# from which messages are delivered to consumers.
-#
-# queue_bind() determines which messages are routed to a queue.
-# Route all messages with the routing key "routing_key" to
-# the AMQP queue named "message_queue".
-
-session.queue_declare(queue="message_queue")
-session.queue_bind(exchange="amq.fanout", queue="message_queue")
-
-#----- Cleanup ---------------------------------------------
-
-# Clean up before exiting so there are no open threads.
-
-session.session_close()
diff --git a/qpid/python/examples/fanout/fanout_consumer.py b/qpid/python/examples/fanout/fanout_consumer.py
index b91ea35c0d..ef24bf35b2 100755
--- a/qpid/python/examples/fanout/fanout_consumer.py
+++ b/qpid/python/examples/fanout/fanout_consumer.py
@@ -5,13 +5,57 @@
This AMQP client reads messages from a message
queue named "message_queue".
"""
-
+import base64
import qpid
import sys
from qpid.client import Client
from qpid.content import Content
from qpid.queue import Empty
+#----- Functions -------------------------------------------
+
+def dump_queue(client, queue_name):
+
+ print "Messages queue: " + queue_name
+
+ consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag
+ queue = client.queue(consumer_tag)
+
+ # Call message_subscribe() to tell the broker to deliver messages
+ # from the AMQP queue to a local client queue. The broker will
+ # start delivering messages as soon as message_subscribe() is called.
+
+ session.message_subscribe(queue=queue_name, destination=consumer_tag)
+ session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
+ session.message_flow(consumer_tag, 1, 0xFFFFFFFF)
+
+ print "Subscribed to queue " + queue_name
+ sys.stdout.flush()
+
+ message = 0
+
+ while True:
+ try:
+ message = queue.get(timeout=10)
+ content = message.content.body
+ print "Response: " + content
+ except Empty:
+ print "No more messages!"
+ break
+ except:
+ print "Unexpected exception!"
+ break
+
+
+ # Messages are not removed from the queue until they
+ # are acknowledged. Using cumulative=True, all messages
+ # in the session up to and including the one identified
+ # by the delivery tag are acknowledged. This is more efficient,
+ # because there are fewer network round-trips.
+
+ if message != 0:
+ message.complete(cumulative=True)
+
#----- Initialization --------------------------------------
@@ -29,44 +73,22 @@ client = Client(host, port, qpid.spec.load(amqp_spec))
client.start({"LOGIN": user, "PASSWORD": password})
session = client.session()
-session.session_open()
-
-#----- Read from queue --------------------------------------------
-
-# Now let's create a local client queue and tell it to read
-# incoming messages.
-
-# The consumer tag identifies the client-side queue.
-
-consumer_tag = "consumer1"
-queue = client.queue(consumer_tag)
-
-# Call message_subscribe() to tell the broker to deliver messages
-# from the AMQP queue to this local client queue. The broker will
-# start delivering messages as soon as message_subscribe() is called.
-
-session.message_subscribe(queue="message_queue", destination=consumer_tag)
-session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
-session.message_flow(consumer_tag, 1, 0xFFFFFFFF)
-
-# Initialize 'final' and 'content', variables used to identify the last message.
+session_info = session.session_open()
+session_id = session_info.session_id
-final = "That's all, folks!" # In a message body, signals the last message
-content = "" # Content of the last message read
+#----- Main Body -- ----------------------------------------
-message = None
-while content != final:
- message = queue.get(timeout=10)
- content = message.content.body
- print content
+# Make a unique queue name for my queue from the session ID.
+my_queue = base64.urlsafe_b64encode(session_id)
+session.queue_declare(queue=my_queue)
-# Messages are not removed from the queue until they are
-# acknowledged. Using cumulative=True, all messages from the session
-# up to and including the one identified by the delivery tag are
-# acknowledged. This is more efficient, because there are fewer
-# network round-trips.
+# Bind my queue to the fanout exchange. No routing key is required
+# the fanout exchange copies messages unconditionally to every
+# bound queue
+session.queue_bind(queue=my_queue, exchange="amq.fanout")
-message.complete(cumulative=True)
+# Dump the messages on the queue.
+dump_queue(client, my_queue)
#----- Cleanup ------------------------------------------------
diff --git a/qpid/python/examples/fanout/verify b/qpid/python/examples/fanout/verify
index f136ccd39b..7650853e11 100644
--- a/qpid/python/examples/fanout/verify
+++ b/qpid/python/examples/fanout/verify
@@ -1,3 +1,5 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-clients ./declare_queues.py ./fanout_producer.py ./fanout_consumer.py
-outputs ./declare_queues.py.out ./fanout_producer.py.out ./fanout_consumer.py.out
+background "Subscribed" ./fanout_consumer.py
+background "Subscribed" ./fanout_consumer.py
+clients ./fanout_producer.py
+outputs ./fanout_producer.py.out "./fanout_consumer.py.out | remove_uuid64" "./fanout_consumer.pyX.out | remove_uuid64"
diff --git a/qpid/python/examples/fanout/verify.in b/qpid/python/examples/fanout/verify.in
index a5f57f0b4b..d5067b3850 100644
--- a/qpid/python/examples/fanout/verify.in
+++ b/qpid/python/examples/fanout/verify.in
@@ -1,14 +1,31 @@
-==== declare_queues.py.out
==== fanout_producer.py.out
-==== fanout_consumer.py.out
-message 0
-message 1
-message 2
-message 3
-message 4
-message 5
-message 6
-message 7
-message 8
-message 9
-That's all, folks!
+==== fanout_consumer.py.out | remove_uuid64
+Messages queue:
+Subscribed to queue
+Response: message 0
+Response: message 1
+Response: message 2
+Response: message 3
+Response: message 4
+Response: message 5
+Response: message 6
+Response: message 7
+Response: message 8
+Response: message 9
+Response: That's all, folks!
+No more messages!
+==== fanout_consumer.pyX.out | remove_uuid64
+Messages queue:
+Subscribed to queue
+Response: message 0
+Response: message 1
+Response: message 2
+Response: message 3
+Response: message 4
+Response: message 5
+Response: message 6
+Response: message 7
+Response: message 8
+Response: message 9
+Response: That's all, folks!
+No more messages!
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index c251e6aca0..a3d32bdb2d 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -720,6 +720,91 @@ class MessageTests(TestBase):
#check all 'browsed' messages are still on the queue
self.assertEqual(5, channel.queue_query(queue="q").message_count)
+ def test_subscribe_not_acquired_3(self):
+ channel = self.channel
+
+ #publish some messages
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+
+ #create a not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+
+ #browse through messages
+ queue = self.client.queue("a")
+ for i in range(1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ if (i % 2):
+ #try to acquire every second message
+ channel.message_acquire([msg.command_id, msg.command_id])
+ #check that acquire succeeds
+ response = channel.control_queue.get(timeout=1)
+ self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ msg.complete()
+ self.assertEmpty(queue)
+
+ #create a second not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ #check it gets those not consumed
+ queue = self.client.queue("b")
+ for i in [2,4,6,8,10]:
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
+ def test_release_unacquired(self):
+ channel = self.channel
+
+ #create queue
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True)
+
+ #send message
+ channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+ #create two 'browsers'
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ queueA = self.client.queue("a")
+
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ channel.message_flow(unit = 0, value = 10, destination = "b")
+ queueB = self.client.queue("b")
+
+ #have each browser release the message
+ msgA = queueA.get(timeout = 1)
+ channel.message_release([msgA.command_id, msgA.command_id])
+
+ msgB = queueB.get(timeout = 1)
+ channel.message_release([msgB.command_id, msgB.command_id])
+
+ #cancel browsers
+ channel.message_cancel(destination = "a")
+ channel.message_cancel(destination = "b")
+
+ #create consumer
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ channel.message_flow(unit = 0, value = 10, destination = "c")
+ queueC = self.client.queue("c")
+ #consume the message then ack it
+ msgC = queueC.get(timeout = 1)
+ msgC.complete()
+ #ensure there are no other messages
+ self.assertEmpty(queueC)
+
def test_no_size(self):
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)