diff options
author | Aidan Skinner <aidan@apache.org> | 2008-02-18 12:07:38 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-02-18 12:07:38 +0000 |
commit | 7ef664e5e82c93675ecfa5cd8cdc9dcefca453ac (patch) | |
tree | 0f8b3e5148dcaaea1570a8a7d8bd39ce6d0c9b17 | |
parent | 81d9c0a9184c0fde88c63c785a4e3dc73fdec8bd (diff) | |
download | qpid-python-7ef664e5e82c93675ecfa5cd8cdc9dcefca453ac.tar.gz |
Merged revisions 627484,627531,627536,627543,627607,627629,627634,627652-627655,627718,627777,628132,628169,628171,628659 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk
........
r627484 | aconway | 2008-02-13 15:39:59 +0000 (Wed, 13 Feb 2008) | 14 lines
Broker::connect - connect to URL, return ConnectionInputHandler.
M src/qpid/broker/Broker.cpp
M src/qpid/broker/Broker.h
M src/qpid/sys/Acceptor.h
M src/qpid/sys/AsynchIOAcceptor.cpp
AMQBody::match - test for matching frames.
M src/qpid/framing/AMQBody.cpp
M src/qpid/framing/AMQBody.h
Url::throwIfEmpty() - test for empty URL.
M src/qpid/Url.cpp
M src/qpid/Url.h
........
r627531 | rhs | 2008-02-13 17:13:45 +0000 (Wed, 13 Feb 2008) | 1 line
added client/example to the build
........
r627536 | rhs | 2008-02-13 17:30:32 +0000 (Wed, 13 Feb 2008) | 1 line
updated runSample.sh from jrobie
........
r627543 | rajith | 2008-02-13 17:33:20 +0000 (Wed, 13 Feb 2008) | 1 line
modified it to show the selector test bug
........
r627607 | rhs | 2008-02-13 22:45:26 +0000 (Wed, 13 Feb 2008) | 1 line
fixed logging of message Data
........
r627629 | rajith | 2008-02-14 01:08:58 +0000 (Thu, 14 Feb 2008) | 1 line
This is a fix for QPID-791
........
r627634 | rajith | 2008-02-14 02:09:59 +0000 (Thu, 14 Feb 2008) | 1 line
Fix for Qpid-793
........
r627652 | rhs | 2008-02-14 03:52:08 +0000 (Thu, 14 Feb 2008) | 1 line
set initial cause
........
r627653 | rhs | 2008-02-14 03:53:06 +0000 (Thu, 14 Feb 2008) | 1 line
rethrow exception rather than ignoring it
........
r627654 | rhs | 2008-02-14 03:54:04 +0000 (Thu, 14 Feb 2008) | 1 line
print message text directly
........
r627655 | rhs | 2008-02-14 03:55:28 +0000 (Thu, 14 Feb 2008) | 1 line
recompute internal 0-10 message for reused messages
........
r627718 | gsim | 2008-02-14 11:49:17 +0000 (Thu, 14 Feb 2008) | 3 lines
Fixed bug in browsing that failed to deal correctly with 'gaps' in message sequence.
........
r627777 | rhs | 2008-02-14 15:12:11 +0000 (Thu, 14 Feb 2008) | 1 line
bumped spec release
........
r628132 | rhs | 2008-02-15 17:49:47 +0000 (Fri, 15 Feb 2008) | 1 line
protect the _currentException variable with its own lock, this avoids deadlocks between getCurrentException and the dispatcher thread
........
r628169 | aconway | 2008-02-15 21:00:44 +0000 (Fri, 15 Feb 2008) | 2 lines
Updated c++ and python fanout examples and verify scripts.
........
r628171 | aconway | 2008-02-15 21:03:28 +0000 (Fri, 15 Feb 2008) | 2 lines
svn:ignore properties.
........
r628659 | gsim | 2008-02-18 09:23:54 +0000 (Mon, 18 Feb 2008) | 3 lines
Requests to release a message that has not been acquired should be ignored.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@628695 13f79535-47bb-0310-9956-ffa450edef68
41 files changed, 482 insertions, 273 deletions
diff --git a/qpid/bin/verify b/qpid/bin/verify index 844e99b765..96fe1df9b3 100755 --- a/qpid/bin/verify +++ b/qpid/bin/verify @@ -19,7 +19,11 @@ trap cleanup EXIT ARGS="${QPID_HOST:-localhost} $QPID_PORT" -outfile() { echo $1.out; } +outfile() { + file=$1 + while [ -f $file.out ]; do file="${file}X"; done + echo $file.out + } fail() { test -n "$*" && echo $* 1>&2 ; FAIL=1; return 1; } diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am index 92625550e2..9b41b130e9 100644 --- a/qpid/cpp/examples/Makefile.am +++ b/qpid/cpp/examples/Makefile.am @@ -6,7 +6,6 @@ nobase_pkgdata_DATA= \ examples/request-response/server.cpp \ examples/request-response/Makefile \ examples/fanout/Makefile \ - examples/fanout/declare_queues.cpp \ examples/fanout/listener.cpp \ examples/fanout/fanout_producer.cpp \ examples/pub-sub/Makefile \ diff --git a/qpid/cpp/examples/examples/fanout/Makefile b/qpid/cpp/examples/examples/fanout/Makefile index b58026f337..7963af7ddf 100644 --- a/qpid/cpp/examples/examples/fanout/Makefile +++ b/qpid/cpp/examples/examples/fanout/Makefile @@ -2,7 +2,7 @@ CXX=g++ CXXFLAGS= LDFLAGS=-lqpidclient -PROGRAMS=declare_queues fanout_producer listener +PROGRAMS=fanout_producer listener all: $(PROGRAMS) diff --git a/qpid/cpp/examples/examples/fanout/declare_queues.cpp b/qpid/cpp/examples/examples/fanout/declare_queues.cpp deleted file mode 100644 index c76ed54730..0000000000 --- a/qpid/cpp/examples/examples/fanout/declare_queues.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * declare_queues.cpp (this program): - * - * This program is one of three programs designed to be used - * together. These programs use the "amq.fanout" exchange. - * - * fanout_producer.cpp: - * - * Publishes to a broker, specifying a routing key. - * - * listener.cpp - * - * Reads from a queue on the broker using a message listener. - * - */ - -#include <qpid/client/Connection.h> -#include <qpid/client/Session.h> - -#include <unistd.h> -#include <cstdlib> -#include <iostream> - -using namespace qpid::client; -using namespace qpid::framing; - -using std::string; - - -int main(int argc, char** argv) { - const char* host = argc>1 ? argv[1] : "127.0.0.1"; - int port = argc>2 ? atoi(argv[2]) : 5672; - Connection connection; - Message msg; - try { - connection.open(host, port); - Session session = connection.newSession(); - - - //--------- Main body of program -------------------------------------------- - - // Create and bind a queue named "message_queue". - session.queueDeclare(arg::queue="message_queue"); - session.queueBind(arg::queue="message_queue", arg::exchange="amq.fanout"); - - - //----------------------------------------------------------------------------- - - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; - -} - - - diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp index 91b5123c68..5295e10f34 100644 --- a/qpid/cpp/examples/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/examples/fanout/listener.cpp @@ -69,12 +69,24 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- + // Unique name for private queue: + std::string myQueue=session.getId().str(); + // Declear my queue. + session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, + arg::autoDelete=true); + // Bind my queue to the fanout exchange. + // Note no routingKey required, the fanout exchange delivers + // all messages to all bound queues unconditionally. + session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); + + // Create a listener and subscribe it to my queue. SubscriptionManager subscriptions(session); - // Create a listener and subscribe it to the queue named "message_queue" Listener listener(subscriptions); - subscriptions.subscribe(listener, "message_queue"); + subscriptions.subscribe(listener, myQueue); + // Deliver messages until the subscription is cancelled // by Listener::received() + std::cout << "Listening" << std::endl; subscriptions.run(); //--------------------------------------------------------------------------- diff --git a/qpid/cpp/examples/examples/fanout/verify b/qpid/cpp/examples/examples/fanout/verify index 1e1e8121d2..ace4a6dfee 100644 --- a/qpid/cpp/examples/examples/fanout/verify +++ b/qpid/cpp/examples/examples/fanout/verify @@ -1,3 +1,6 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -clients ./declare_queues ./fanout_producer ./listener -outputs ./declare_queues.out ./fanout_producer.out ./listener.out +background "Listening" ./listener +background "Listening" ./listener +background "Listening" ./listener +clients ./fanout_producer +outputs ./fanout_producer.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid" "./listenerXX.out | remove_uuid" diff --git a/qpid/cpp/examples/examples/fanout/verify.in b/qpid/cpp/examples/examples/fanout/verify.in index 23d08f38dd..8f8612ce67 100644 --- a/qpid/cpp/examples/examples/fanout/verify.in +++ b/qpid/cpp/examples/examples/fanout/verify.in @@ -1,6 +1,6 @@ -==== declare_queues.out ==== fanout_producer.out -==== listener.out +==== listener.out | remove_uuid +Listening Message: Message 0 Message: Message 1 Message: Message 2 @@ -12,4 +12,32 @@ Message: Message 7 Message: Message 8 Message: Message 9 Message: That's all, folks! -Shutting down listener for message_queue +Shutting down listener for +==== listenerX.out | remove_uuid +Listening +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for +==== listenerXX.out | remove_uuid +Listening +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python b/qpid/cpp/examples/examples/fanout/verify_cpp_python index f53784ef1c..e840e68f91 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python @@ -1,5 +1,7 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify py=$PYTHON_EXAMPLES/fanout -clients ./declare_queues ./fanout_producer $py/fanout_consumer.py -outputs ./declare_queues.out ./fanout_producer.out $py/fanout_consumer.py.out +background "Subscribed" $py/fanout_consumer.py +background "Subscribed" $py/fanout_consumer.py +clients ./fanout_producer +outputs ./fanout_producer.out "$py/fanout_consumer.py.out | remove_uuid64" "$py/fanout_consumer.pyX.out | remove_uuid64" diff --git a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in index cb9e52cfcc..fac2b365d3 100644 --- a/qpid/cpp/examples/examples/fanout/verify_cpp_python.in +++ b/qpid/cpp/examples/examples/fanout/verify_cpp_python.in @@ -1,14 +1,31 @@ -==== declare_queues.out ==== fanout_producer.out -==== fanout_consumer.py.out -Message 0 -Message 1 -Message 2 -Message 3 -Message 4 -Message 5 -Message 6 -Message 7 -Message 8 -Message 9 -That's all, folks! +==== fanout_consumer.py.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: Message 0 +Response: Message 1 +Response: Message 2 +Response: Message 3 +Response: Message 4 +Response: Message 5 +Response: Message 6 +Response: Message 7 +Response: Message 8 +Response: Message 9 +Response: That's all, folks! +No more messages! +==== fanout_consumer.pyX.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: Message 0 +Response: Message 1 +Response: Message 2 +Response: Message 3 +Response: Message 4 +Response: Message 5 +Response: Message 6 +Response: Message 7 +Response: Message 8 +Response: Message 9 +Response: That's all, folks! +No more messages! diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp b/qpid/cpp/examples/examples/fanout/verify_python_cpp index 00a0727352..d9b3361523 100644 --- a/qpid/cpp/examples/examples/fanout/verify_python_cpp +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp @@ -1,5 +1,7 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify py=$PYTHON_EXAMPLES/fanout -clients $py/declare_queues.py $py/fanout_producer.py ./listener -outputs $py/declare_queues.py.out $py/fanout_producer.py.out ./listener.out +background "Listening" ./listener +background "Listening" ./listener +clients $py/fanout_producer.py +outputs $py/fanout_producer.py.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid" diff --git a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in index 87a1694e6c..8f9e959053 100644 --- a/qpid/cpp/examples/examples/fanout/verify_python_cpp.in +++ b/qpid/cpp/examples/examples/fanout/verify_python_cpp.in @@ -1,6 +1,6 @@ -==== declare_queues.py.out ==== fanout_producer.py.out -==== listener.out +==== listener.out | remove_uuid +Listening Message: message 0 Message: message 1 Message: message 2 @@ -12,4 +12,18 @@ Message: message 7 Message: message 8 Message: message 9 Message: That's all, folks! -Shutting down listener for message_queue +Shutting down listener for +==== listenerX.out | remove_uuid +Listening +Message: message 0 +Message: message 1 +Message: message 2 +Message: message 3 +Message: message 4 +Message: message 5 +Message: message 6 +Message: message 7 +Message: message 8 +Message: message 9 +Message: That's all, folks! +Shutting down listener for diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in index 3892b1ab40..8c248a44ee 100644 --- a/qpid/cpp/qpidc.spec.in +++ b/qpid/cpp/qpidc.spec.in @@ -5,7 +5,7 @@ Name: @PACKAGE@ Version: @VERSION@ -Release: 22%{?dist} +Release: 23%{?dist} Summary: Libraries for Qpid C++ client applications Group: System Environment/Libraries License: Apache Software License @@ -156,6 +156,9 @@ fi /sbin/ldconfig %changelog +* Thu Feb 14 2008 Rafael Schloming <rafaels@redhat.com> - 0.2-23 +- Bumped to pull in fixes for Beta 3 + * Tue Feb 12 2008 Alan Conway <aconway@redhat.com> - 0.2-22 - Added -g to compile flags for debug symbols. diff --git a/qpid/cpp/src/qpid/Url.cpp b/qpid/cpp/src/qpid/Url.cpp index d056edc683..aa53d5cbe2 100644 --- a/qpid/cpp/src/qpid/Url.cpp +++ b/qpid/cpp/src/qpid/Url.cpp @@ -160,6 +160,10 @@ void Url::parseNoThrow(const char* url) { clear(); } +void Url::throwIfEmpty() const { + throw InvalidUrl("URL contains no addresses"); +} + std::istream& operator>>(std::istream& is, Url& url) { std::string s; is >> s; diff --git a/qpid/cpp/src/qpid/Url.h b/qpid/cpp/src/qpid/Url.h index 2e24ba948d..20f42db0ad 100644 --- a/qpid/cpp/src/qpid/Url.h +++ b/qpid/cpp/src/qpid/Url.h @@ -78,6 +78,9 @@ struct Url : public std::vector<Address> { template<class T> Url& operator=(T s) { parse(s); return *this; } + /** Throw InvalidUrl if the URL does not contain any addresses. */ + void throwIfEmpty() const; + /** Replace contents with parsed URL as defined in * https://wiki.108.redhat.com/jira/browse/AMQP-95 *@exception InvalidUrl if the url is invalid. diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 117a93b571..0a0eb0a0df 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -40,6 +40,7 @@ #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/Url.h" #include <boost/bind.hpp> @@ -263,11 +264,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_ECHO : status = Manageable::STATUS_OK; break; - case management::Broker::METHOD_CONNECT : - connect(dynamic_cast<management::ArgsBrokerConnect&>(args)); + case management::Broker::METHOD_CONNECT : { + management::ArgsBrokerConnect& hp= + dynamic_cast<management::ArgsBrokerConnect&>(args); + connect(hp.i_host, hp.i_port); status = Manageable::STATUS_OK; break; - + } case management::Broker::METHOD_JOINCLUSTER : case management::Broker::METHOD_LEAVECLUSTER : status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -277,9 +280,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } -void Broker::connect(management::ArgsBrokerConnect& args) +sys::ConnectionInputHandler* Broker::connect( + const std::string& host, uint16_t port, + sys::ConnectionInputHandlerFactory* f) +{ + return getAcceptor().connect(host, port, f ? f : &factory); +} + +sys::ConnectionInputHandler* Broker::connect( + const Url& url, sys::ConnectionInputHandlerFactory* f) { - getAcceptor().connect(args.i_host, args.i_port, &factory); + url.throwIfEmpty(); + TcpAddress addr=boost::get<TcpAddress>(url[0]); + return connect(addr.host, addr.port, f); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 68dbf570f0..55bc7644a5 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -47,6 +47,9 @@ #include <vector> namespace qpid { + +class Url; + namespace broker { static const uint16_t DEFAULT_PORT=5672; @@ -54,7 +57,8 @@ static const uint16_t DEFAULT_PORT=5672; /** * A broker instance. */ -class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable +class Broker : public sys::Runnable, public Plugin::Target, + public management::Manageable { public: @@ -111,6 +115,14 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); + /** Create a connection to another broker. */ + sys::ConnectionInputHandler* + connect(const std::string& host, uint16_t port, + sys::ConnectionInputHandlerFactory* =0); + /** Create a connection to another broker. */ + sys::ConnectionInputHandler* + connect(const Url& url, sys::ConnectionInputHandlerFactory* =0); + private: sys::Acceptor& getAcceptor() const; @@ -129,7 +141,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M Vhost::shared_ptr vhostObject; void declareStandardExchange(const std::string& name, const std::string& type); - void connect(management::ArgsBrokerConnect& args); }; }} diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 2e7c126162..154394e5de 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -104,7 +104,7 @@ void DeliveryRecord::requeue() const void DeliveryRecord::release() { - if (!confirmed) { + if (acquired && !confirmed) { queue->requeue(msg); acquired = false; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index c7dd656a4e..d34ca06364 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -170,12 +170,17 @@ void Queue::requeue(const QueuedMessage& msg){ bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); + QPID_LOG(debug, "attempting to acquire " << msg.position); for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { if (i->position == msg.position) { messages.erase(i); + QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); return true; + } else { + QPID_LOG(debug, "No match: " << i->position << " != " << msg.position); } } + QPID_LOG(debug, "Acquire failed for " << msg.position); return false; } @@ -255,8 +260,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) m = msg; return true; } else { - //consumer hasn't got enough credit for the message - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + //browser hasn't got enough credit for the message + QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'"); return false; } } else { @@ -304,11 +309,13 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { msg = messages.front(); return true; } else { - uint index = (c.position - messages.front().position) + 1; - if (index < messages.size()) { - msg = messages[index]; - return true; - } + //TODO: can improve performance of this search, for now just searching linearly from end + Messages::reverse_iterator pos; + for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) { + pos = i; + } + msg = *pos; + return true; } } addListener(c); diff --git a/qpid/cpp/src/qpid/framing/AMQBody.cpp b/qpid/cpp/src/qpid/framing/AMQBody.cpp index a64d224a86..b3eeae0615 100644 --- a/qpid/cpp/src/qpid/framing/AMQBody.cpp +++ b/qpid/cpp/src/qpid/framing/AMQBody.cpp @@ -19,15 +19,46 @@ * */ -#include "AMQBody.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" #include <iostream> -std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body) +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& out, const AMQBody& body) { body.print(out); return out; } -qpid::framing::AMQBody::~AMQBody() {} +AMQBody::~AMQBody() {} + +namespace { +struct MatchBodies : public AMQBodyConstVisitor { + const AMQBody& body; + bool match; + + MatchBodies(const AMQBody& b) : body(b), match(false) {} + virtual ~MatchBodies() {} + + virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); } + virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); } + virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); } + virtual void visit(const AMQMethodBody& x) { + const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body); + match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId()); + } +}; +} +bool AMQBody::match(const AMQBody& a, const AMQBody& b) { + MatchBodies matcher(a); + b.accept(matcher); + return matcher.match; +} +}} // namespace diff --git a/qpid/cpp/src/qpid/framing/AMQBody.h b/qpid/cpp/src/qpid/framing/AMQBody.h index b05301bd05..f3bf65470c 100644 --- a/qpid/cpp/src/qpid/framing/AMQBody.h +++ b/qpid/cpp/src/qpid/framing/AMQBody.h @@ -59,6 +59,9 @@ class AMQBody virtual AMQMethodBody* getMethod() { return 0; } virtual const AMQMethodBody* getMethod() const { return 0; } + + /** Match if same type and same class/method ID for methods */ + static bool match(const AMQBody& , const AMQBody& ); }; std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/sys/Acceptor.h index 7ff03e0eeb..5eb1f1a500 100644 --- a/qpid/cpp/src/qpid/sys/Acceptor.h +++ b/qpid/cpp/src/qpid/sys/Acceptor.h @@ -29,6 +29,7 @@ namespace qpid { namespace sys { class ConnectionInputHandlerFactory; +class ConnectionInputHandler; class Acceptor : public qpid::SharedObject<Acceptor> { @@ -38,7 +39,9 @@ class Acceptor : public qpid::SharedObject<Acceptor> virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; virtual void run(ConnectionInputHandlerFactory* factory) = 0; - virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0; + virtual ConnectionInputHandler* connect( + const std::string& host, int16_t port, + ConnectionInputHandlerFactory* factory) = 0; /** Note: this function is async-signal safe */ virtual void shutdown() = 0; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 9fd32add72..0586eb9d36 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -54,7 +54,10 @@ class AsynchIOAcceptor : public Acceptor { AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); - void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory); + ConnectionInputHandler* connect( + const std::string& host, int16_t port, + ConnectionInputHandlerFactory* factory); + void shutdown(); uint16_t getPort() const; @@ -188,7 +191,7 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { } } -void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) +ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); @@ -209,7 +212,7 @@ void AsynchIOAcceptor::connect(const std::string& host, int16_t port, Connection aio->queueReadBuffer(new Buff); } aio->start(poller); - + return handler; } diff --git a/qpid/java/build.deps b/qpid/java/build.deps index f8b1e8b4e3..21b13fe019 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -39,6 +39,8 @@ systests.libs=${client.libs} ${test.libs} perftests.libs=${systests.libs} integrationtests.libs=${systests.libs} +client-example.libs=${client.libs} + ibm-icu=lib/com.ibm.icu-3.4.4.jar ecl-core-jface=lib/org.eclipse.jface-3.2.0.jar ecl-core-commands=lib/org.eclipse.core.commands-3.2.0.jar diff --git a/qpid/java/build.xml b/qpid/java/build.xml index 440c361a29..5b00f630d1 100644 --- a/qpid/java/build.xml +++ b/qpid/java/build.xml @@ -23,10 +23,11 @@ <import file="common.xml"/> <property name="modules.core" value="common broker client"/> + <property name="modules.examples" value="client/example"/> <property name="modules.tests" value="systests perftests integrationtests"/> <property name="modules.management" value="management/eclipse-plugin"/> - <property name="modules" value="${modules.core} ${modules.tests} ${modules.management}"/> + <property name="modules" value="${modules.core} ${modules.examples} ${modules.tests} ${modules.management}"/> <property name="qpid.jar" location="${build.lib}/qpid-incubating.jar"/> <basename property="qpid.jar.name" file="${qpid.jar}"/> diff --git a/qpid/java/client/example/build.xml b/qpid/java/client/example/build.xml new file mode 100644 index 0000000000..8bcd59d829 --- /dev/null +++ b/qpid/java/client/example/build.xml @@ -0,0 +1,27 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="AMQ Client" default="build"> + + <property name="module.depends" value="common client"/> + + <import file="../../module.xml"/> + +</project> diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java index fb22966b11..360b2c6aed 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java @@ -125,7 +125,7 @@ public class Publisher for (int i=1; i <= 6; i++) { message.setText("message " + i); - System.out.println(CLASS + ": Sending message: " + i); + System.out.println(CLASS + ": Sending " + message.getText()); messagePublisher .send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); } diff --git a/qpid/java/client/example/src/main/java/runSample.sh b/qpid/java/client/example/src/main/java/runSample.sh index d596f0a924..23bde52c44 100755 --- a/qpid/java/client/example/src/main/java/runSample.sh +++ b/qpid/java/client/example/src/main/java/runSample.sh @@ -23,7 +23,7 @@ if test "'x$QPID_SAMPLE'" != "'x'" then QPID_SAMPLE=$QPID_SAMPLE else - QPID_SAMPLE="/usr/share/rhm-docs-0.2/java" + QPID_SAMPLE="/usr/share/doc/rhm-0.2" fi echo "Using QPID_SAMPLE: $QPID_SAMPLE" @@ -39,7 +39,7 @@ javac -cp "$CLASSPATH" -sourcepath "$QPID_SAMPLE" -d . `find $QPID_SAMPLE -name CLASSPATH="$CLASSPATH$DIVIDER$." # Set VM parameters -QPID_PARAM="$QPID_PARAM -Dlog4j.configuration=file://$QPID_SAMPLE/log4j.xml" +QPID_PARAM="$QPID_PARAM -Dlog4j.configuration=file://$QPID_SAMPLE/java/log4j.xml" # Check if the user supplied a sample classname @@ -49,4 +49,4 @@ then exit; else java -cp $CLASSPATH $QPID_PARAM $* -fi
\ No newline at end of file +fi diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index fc6ebad1c0..f56477c0fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -92,13 +92,11 @@ public class AMQQueueBrowser implements QueueBrowser return new Enumeration() { - - Message _nextMessage = consumer.receive(); + Message _nextMessage = consumer.receive(1000); public boolean hasMoreElements() { _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); - return (_nextMessage != null); } @@ -108,7 +106,6 @@ public class AMQQueueBrowser implements QueueBrowser try { _logger.info("QB:nextElement about to receive"); - _nextMessage = consumer.receive(1000); _logger.info("QB:nextElement received:" + _nextMessage); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index dd2b9a2389..407f1f3786 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -70,6 +70,7 @@ public class AMQSession_0_10 extends AMQSession /** * The latest qpid Exception that has been reaised. */ + private Object _currentExceptionLock = new Object(); private QpidException _currentException; /** @@ -553,14 +554,17 @@ public class AMQSession_0_10 extends AMQSession * * @throws org.apache.qpid.AMQException get the latest thrown error. */ - public synchronized void getCurrentException() throws AMQException + public void getCurrentException() throws AMQException { - if (_currentException != null) + synchronized (_currentExceptionLock) { - QpidException toBeTrhown = _currentException; - _currentException = null; - throw new AMQException(AMQConstant.getConstant(toBeTrhown.getErrorCode().getCode()), - toBeTrhown.getMessage(), toBeTrhown); + if (_currentException != null) + { + QpidException toBeThrown = _currentException; + _currentException = null; + throw new AMQException(AMQConstant.getConstant(toBeThrown.getErrorCode().getCode()), + toBeThrown.getMessage(), toBeThrown); + } } } @@ -594,11 +598,11 @@ public class AMQSession_0_10 extends AMQSession { public void onClosed(ErrorCode errorCode, String reason, Throwable t) { - synchronized (this) + synchronized (_currentExceptionLock) { - //todo check the error code for finding out if we need to notify the + // todo check the error code for finding out if we need to notify the // JMS connection exception listener - _currentException = new QpidException(reason, errorCode, null); + _currentException = new QpidException(reason, errorCode, t); } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c9cb1a6313..d42305debb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -609,11 +609,13 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me { // //fixme this probably is not right // if (!isNoConsume()) - { // done in BasicCancelOK Handler but not sending one so just deregister. - deregisterConsumer(); - } + //{ // done in BasicCancelOK Handler but not sending one so just deregister. + // deregisterConsumer(); + //} } + deregisterConsumer(); + if (_messageListener != null && _receiving.get() && _receivingThread != null) { if (_logger.isInfoEnabled()) @@ -1140,7 +1142,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me this._queuename = queuename; } - public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException + public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException { _session.addBindingKey(this,amqd,routingKey); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index a31bfe9df1..42c1d687cb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -68,17 +68,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (message.get010Message() == null) { message.set010Message(new ByteBufferMessage()); - if (message.getData() == null) - { - try - { - message.get010Message().appendData(ByteBuffer.allocate(0)); - } - catch (IOException e) - { - throw new JMSException(e.getMessage()); - } - } + } + // force a rebuild of the 0-10 message if data has changed + if (message.getData() == null) + { + message.dataChanged(); } DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 7eb56acb27..e3ca6b5de1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -78,7 +78,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { if (_data != null) { - _010message.appendData(_data.buf()); + _010message.appendData(_data.buf().slice()); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 87cc80f21d..4a43a7bba8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -134,6 +134,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text // should never occur JMSException jmse = new JMSException("Unable to decode text data"); jmse.setLinkedException(e); + throw jmse; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java index 008b85e98a..feb4c1c94d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java @@ -1,9 +1,13 @@ package org.apache.qpidity.nclient; +import java.util.Enumeration; + import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueBrowser; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; @@ -24,10 +28,29 @@ public class JMSTestCase javax.jms.Session ssn = con.createSession(false, 1); javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test"); - javax.jms.MessageConsumer cons = ssn.createConsumer(dest); - //javax.jms.MessageProducer prod = ssn.createProducer(dest); + javax.jms.MessageProducer prod = ssn.createProducer(dest); + QueueBrowser browser = ssn.createBrowser((Queue)dest, "Test = 'test'"); + + javax.jms.TextMessage msg = ssn.createTextMessage(); + msg.setStringProperty("TEST", "test"); + msg.setText("Should get this"); + prod.send(msg); + + javax.jms.TextMessage msg2 = ssn.createTextMessage(); + msg2.setStringProperty("TEST", "test2"); + msg2.setText("Shouldn't get this"); + prod.send(msg2); + + + Enumeration enu = browser.getEnumeration(); + for (;enu.hasMoreElements();) + { + System.out.println(enu.nextElement()); + System.out.println("\n"); + } - javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive(); + javax.jms.MessageConsumer cons = ssn.createConsumer(dest, "Test = 'test'"); + javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive(); cons.setMessageListener(new MessageListener() { public void onMessage(Message m) @@ -35,7 +58,9 @@ public class JMSTestCase javax.jms.TextMessage m2 = (javax.jms.TextMessage)m; try { + System.out.println("headers : " + m2.toString()); System.out.println("m : " + m2.getText()); + System.out.println("\n\n"); } catch(Exception e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java index d3b2e88f23..e00f9008d3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java @@ -41,6 +41,7 @@ public class ExceptionHelper jmsException = new JMSException(exception.getMessage()); } jmsException.setLinkedException(exception); + jmsException.initCause(exception); } else { diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java index 602a7fbca1..b9b8636d18 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -95,7 +95,6 @@ public class Data implements ProtocolEvent } str.append(str(buf, 20)); } - str.setLength(str.length() - 3); str.append(")"); return str.toString(); } diff --git a/qpid/python/examples/fanout/declare_queues.py b/qpid/python/examples/fanout/declare_queues.py deleted file mode 100755 index 52f23f4f9a..0000000000 --- a/qpid/python/examples/fanout/declare_queues.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -""" - declare_queues.py - - Creates and binds a queue on an AMQP direct exchange. - - All messages using the routing key "routing_key" are - sent to the queue named "message_queue". -""" - -import qpid -import sys -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Initialization ----------------------------------- - -# Set parameters for login - -host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" -port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Create a queue ------------------------------------- - -# queue_declare() creates an AMQP queue, which is held -# on the broker. Published messages are sent to the AMQP queue, -# from which messages are delivered to consumers. -# -# queue_bind() determines which messages are routed to a queue. -# Route all messages with the routing key "routing_key" to -# the AMQP queue named "message_queue". - -session.queue_declare(queue="message_queue") -session.queue_bind(exchange="amq.fanout", queue="message_queue") - -#----- Cleanup --------------------------------------------- - -# Clean up before exiting so there are no open threads. - -session.session_close() diff --git a/qpid/python/examples/fanout/fanout_consumer.py b/qpid/python/examples/fanout/fanout_consumer.py index b91ea35c0d..ef24bf35b2 100755 --- a/qpid/python/examples/fanout/fanout_consumer.py +++ b/qpid/python/examples/fanout/fanout_consumer.py @@ -5,13 +5,57 @@ This AMQP client reads messages from a message queue named "message_queue". """ - +import base64 import qpid import sys from qpid.client import Client from qpid.content import Content from qpid.queue import Empty +#----- Functions ------------------------------------------- + +def dump_queue(client, queue_name): + + print "Messages queue: " + queue_name + + consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag + queue = client.queue(consumer_tag) + + # Call message_subscribe() to tell the broker to deliver messages + # from the AMQP queue to a local client queue. The broker will + # start delivering messages as soon as message_subscribe() is called. + + session.message_subscribe(queue=queue_name, destination=consumer_tag) + session.message_flow(consumer_tag, 0, 0xFFFFFFFF) + session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + + print "Subscribed to queue " + queue_name + sys.stdout.flush() + + message = 0 + + while True: + try: + message = queue.get(timeout=10) + content = message.content.body + print "Response: " + content + except Empty: + print "No more messages!" + break + except: + print "Unexpected exception!" + break + + + # Messages are not removed from the queue until they + # are acknowledged. Using cumulative=True, all messages + # in the session up to and including the one identified + # by the delivery tag are acknowledged. This is more efficient, + # because there are fewer network round-trips. + + if message != 0: + message.complete(cumulative=True) + #----- Initialization -------------------------------------- @@ -29,44 +73,22 @@ client = Client(host, port, qpid.spec.load(amqp_spec)) client.start({"LOGIN": user, "PASSWORD": password}) session = client.session() -session.session_open() - -#----- Read from queue -------------------------------------------- - -# Now let's create a local client queue and tell it to read -# incoming messages. - -# The consumer tag identifies the client-side queue. - -consumer_tag = "consumer1" -queue = client.queue(consumer_tag) - -# Call message_subscribe() to tell the broker to deliver messages -# from the AMQP queue to this local client queue. The broker will -# start delivering messages as soon as message_subscribe() is called. - -session.message_subscribe(queue="message_queue", destination=consumer_tag) -session.message_flow(consumer_tag, 0, 0xFFFFFFFF) -session.message_flow(consumer_tag, 1, 0xFFFFFFFF) - -# Initialize 'final' and 'content', variables used to identify the last message. +session_info = session.session_open() +session_id = session_info.session_id -final = "That's all, folks!" # In a message body, signals the last message -content = "" # Content of the last message read +#----- Main Body -- ---------------------------------------- -message = None -while content != final: - message = queue.get(timeout=10) - content = message.content.body - print content +# Make a unique queue name for my queue from the session ID. +my_queue = base64.urlsafe_b64encode(session_id) +session.queue_declare(queue=my_queue) -# Messages are not removed from the queue until they are -# acknowledged. Using cumulative=True, all messages from the session -# up to and including the one identified by the delivery tag are -# acknowledged. This is more efficient, because there are fewer -# network round-trips. +# Bind my queue to the fanout exchange. No routing key is required +# the fanout exchange copies messages unconditionally to every +# bound queue +session.queue_bind(queue=my_queue, exchange="amq.fanout") -message.complete(cumulative=True) +# Dump the messages on the queue. +dump_queue(client, my_queue) #----- Cleanup ------------------------------------------------ diff --git a/qpid/python/examples/fanout/verify b/qpid/python/examples/fanout/verify index f136ccd39b..7650853e11 100644 --- a/qpid/python/examples/fanout/verify +++ b/qpid/python/examples/fanout/verify @@ -1,3 +1,5 @@ # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify -clients ./declare_queues.py ./fanout_producer.py ./fanout_consumer.py -outputs ./declare_queues.py.out ./fanout_producer.py.out ./fanout_consumer.py.out +background "Subscribed" ./fanout_consumer.py +background "Subscribed" ./fanout_consumer.py +clients ./fanout_producer.py +outputs ./fanout_producer.py.out "./fanout_consumer.py.out | remove_uuid64" "./fanout_consumer.pyX.out | remove_uuid64" diff --git a/qpid/python/examples/fanout/verify.in b/qpid/python/examples/fanout/verify.in index a5f57f0b4b..d5067b3850 100644 --- a/qpid/python/examples/fanout/verify.in +++ b/qpid/python/examples/fanout/verify.in @@ -1,14 +1,31 @@ -==== declare_queues.py.out ==== fanout_producer.py.out -==== fanout_consumer.py.out -message 0 -message 1 -message 2 -message 3 -message 4 -message 5 -message 6 -message 7 -message 8 -message 9 -That's all, folks! +==== fanout_consumer.py.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: message 0 +Response: message 1 +Response: message 2 +Response: message 3 +Response: message 4 +Response: message 5 +Response: message 6 +Response: message 7 +Response: message 8 +Response: message 9 +Response: That's all, folks! +No more messages! +==== fanout_consumer.pyX.out | remove_uuid64 +Messages queue: +Subscribed to queue +Response: message 0 +Response: message 1 +Response: message 2 +Response: message 3 +Response: message 4 +Response: message 5 +Response: message 6 +Response: message 7 +Response: message 8 +Response: message 9 +Response: That's all, folks! +No more messages! diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index c251e6aca0..a3d32bdb2d 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -720,6 +720,91 @@ class MessageTests(TestBase): #check all 'browsed' messages are still on the queue self.assertEqual(5, channel.queue_query(queue="q").message_count) + def test_subscribe_not_acquired_3(self): + channel = self.channel + + #publish some messages + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + + #create a not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + + #browse through messages + queue = self.client.queue("a") + for i in range(1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + if (i % 2): + #try to acquire every second message + channel.message_acquire([msg.command_id, msg.command_id]) + #check that acquire succeeds + response = channel.control_queue.get(timeout=1) + self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + msg.complete() + self.assertEmpty(queue) + + #create a second not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + channel.message_flow(unit = 0, value = 1, destination = "b") + #check it gets those not consumed + queue = self.client.queue("b") + for i in [2,4,6,8,10]: + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + channel.message_flow(unit = 0, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, channel.queue_query(queue="q").message_count) + + def test_release_unacquired(self): + channel = self.channel + + #create queue + self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True) + + #send message + channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message")) + + #create two 'browsers' + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + queueA = self.client.queue("a") + + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + channel.message_flow(unit = 0, value = 10, destination = "b") + queueB = self.client.queue("b") + + #have each browser release the message + msgA = queueA.get(timeout = 1) + channel.message_release([msgA.command_id, msgA.command_id]) + + msgB = queueB.get(timeout = 1) + channel.message_release([msgB.command_id, msgB.command_id]) + + #cancel browsers + channel.message_cancel(destination = "a") + channel.message_cancel(destination = "b") + + #create consumer + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + channel.message_flow(unit = 0, value = 10, destination = "c") + queueC = self.client.queue("c") + #consume the message then ack it + msgC = queueC.get(timeout = 1) + msgC.complete() + #ensure there are no other messages + self.assertEmpty(queueC) + def test_no_size(self): self.queue_declare(queue = "q", exclusive=True, auto_delete=True) |