From 10d07002af4b211dfbbc3341a4edb6ec4c2e5cb5 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Sat, 25 Oct 2008 01:55:06 +0000 Subject: Client API change: Centralize access to subscription status, better control of acquire/accept. client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe. client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools. Issues addressed by the change: - old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing. - old AckPolicy was broken - not possible to access the instance associated with an active subscription - old AckPolicy did not provide a way to do manual acquire, only accept. - setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually. - a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707808 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/Makefile.am | 15 ++- qpid/cpp/src/qpid/RangeSet.h | 8 +- qpid/cpp/src/qpid/client/AckPolicy.cpp | 50 -------- qpid/cpp/src/qpid/client/AckPolicy.h | 58 ---------- qpid/cpp/src/qpid/client/Dispatcher.cpp | 35 +----- qpid/cpp/src/qpid/client/Dispatcher.h | 25 +--- qpid/cpp/src/qpid/client/FailoverListener.cpp | 2 +- .../qpid/client/FailoverSubscriptionManager.cpp | 117 +------------------ .../src/qpid/client/FailoverSubscriptionManager.h | 32 +----- qpid/cpp/src/qpid/client/FlowControl.h | 7 +- qpid/cpp/src/qpid/client/Handle.h | 61 ++++++++++ qpid/cpp/src/qpid/client/HandleAccess.h | 41 +++++++ qpid/cpp/src/qpid/client/HandlePrivate.h | 61 ++++++++++ qpid/cpp/src/qpid/client/LocalQueue.cpp | 11 +- qpid/cpp/src/qpid/client/LocalQueue.h | 18 +-- qpid/cpp/src/qpid/client/Subscription.cpp | 47 ++++++++ qpid/cpp/src/qpid/client/Subscription.h | 99 ++++++++++++++++ qpid/cpp/src/qpid/client/SubscriptionImpl.cpp | 116 +++++++++++++++++++ qpid/cpp/src/qpid/client/SubscriptionImpl.h | 99 ++++++++++++++++ qpid/cpp/src/qpid/client/SubscriptionManager.cpp | 94 +++++---------- qpid/cpp/src/qpid/client/SubscriptionManager.h | 128 +++++++++++---------- qpid/cpp/src/qpid/client/SubscriptionSettings.h | 62 ++++++++++ qpid/cpp/src/tests/ClientSessionTest.cpp | 56 +++------ qpid/cpp/src/tests/QueuePolicyTest.cpp | 6 +- qpid/cpp/src/tests/XmlClientSessionTest.cpp | 26 +---- qpid/cpp/src/tests/consume.cpp | 12 +- qpid/cpp/src/tests/echotest.cpp | 4 +- qpid/cpp/src/tests/latencytest.cpp | 11 +- qpid/cpp/src/tests/perftest.cpp | 13 ++- qpid/cpp/src/tests/topic_listener.cpp | 14 ++- qpid/cpp/src/tests/txtest.cpp | 18 +-- 31 files changed, 795 insertions(+), 551 deletions(-) delete mode 100644 qpid/cpp/src/qpid/client/AckPolicy.cpp delete mode 100644 qpid/cpp/src/qpid/client/AckPolicy.h create mode 100644 qpid/cpp/src/qpid/client/Handle.h create mode 100644 qpid/cpp/src/qpid/client/HandleAccess.h create mode 100644 qpid/cpp/src/qpid/client/HandlePrivate.h create mode 100644 qpid/cpp/src/qpid/client/Subscription.cpp create mode 100644 qpid/cpp/src/qpid/client/Subscription.h create mode 100644 qpid/cpp/src/qpid/client/SubscriptionImpl.cpp create mode 100644 qpid/cpp/src/qpid/client/SubscriptionImpl.h create mode 100644 qpid/cpp/src/qpid/client/SubscriptionSettings.h diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index f8c4fc42a8..07b7957a8e 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -368,14 +368,15 @@ libqpidclient_la_LIBADD = libqpidcommon.la -luuid libqpidclient_la_SOURCES = \ $(rgen_client_srcs) \ - qpid/client/AckPolicy.cpp \ qpid/client/Bounds.cpp \ qpid/client/Connection.cpp \ qpid/client/ConnectionHandler.cpp \ + qpid/client/ConnectionImpl.h \ qpid/client/ConnectionImpl.cpp \ qpid/client/ConnectionSettings.cpp \ qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ + qpid/client/Dispatcher.h \ qpid/client/Dispatcher.cpp \ qpid/client/FailoverConnection.cpp \ qpid/client/FailoverSession.cpp \ @@ -385,6 +386,7 @@ libqpidclient_la_SOURCES = \ qpid/client/Future.cpp \ qpid/client/FutureCompletion.cpp \ qpid/client/FutureResult.cpp \ + qpid/client/HandlePrivate.h \ qpid/client/LoadPlugins.cpp \ qpid/client/LocalQueue.cpp \ qpid/client/Message.cpp \ @@ -395,8 +397,12 @@ libqpidclient_la_SOURCES = \ qpid/client/SessionBase_0_10.h \ qpid/client/SessionBase_0_10Access.h \ qpid/client/ConnectionAccess.h \ + qpid/client/SessionImpl.h \ qpid/client/SessionImpl.cpp \ qpid/client/StateManager.cpp \ + qpid/client/Subscription.cpp \ + qpid/client/SubscriptionImpl.h \ + qpid/client/SubscriptionImpl.cpp \ qpid/client/SubscriptionManager.cpp nobase_include_HEADERS = \ @@ -500,25 +506,25 @@ nobase_include_HEADERS = \ qpid/broker/TxPublish.h \ qpid/broker/Vhost.h \ qpid/client/AckMode.h \ - qpid/client/AckPolicy.h \ qpid/client/Bounds.h \ qpid/client/ChainableFrameHandler.h \ qpid/client/Completion.h \ qpid/client/Connection.h \ qpid/client/ConnectionHandler.h \ - qpid/client/ConnectionImpl.h \ qpid/client/ConnectionSettings.h \ qpid/client/Connector.h \ qpid/client/Demux.h \ - qpid/client/Dispatcher.h \ qpid/client/Execution.h \ qpid/client/FailoverConnection.h \ qpid/client/FailoverSession.h \ + qpid/client/Subscription.h \ + qpid/client/SubscriptionSettings.h \ qpid/client/FailoverSubscriptionManager.h \ qpid/client/FlowControl.h \ qpid/client/Future.h \ qpid/client/FutureCompletion.h \ qpid/client/FutureResult.h \ + qpid/client/Handle.h \ qpid/client/LocalQueue.h \ qpid/client/QueueOptions.h \ qpid/client/Message.h \ @@ -527,7 +533,6 @@ nobase_include_HEADERS = \ qpid/client/SessionBase_0_10.h \ qpid/client/Session.h \ qpid/client/AsyncSession.h \ - qpid/client/SessionImpl.h \ qpid/client/StateManager.h \ qpid/client/SubscriptionManager.h \ qpid/client/TypedResult.h \ diff --git a/qpid/cpp/src/qpid/RangeSet.h b/qpid/cpp/src/qpid/RangeSet.h index 2a88426f17..1ba4fbbcef 100644 --- a/qpid/cpp/src/qpid/RangeSet.h +++ b/qpid/cpp/src/qpid/RangeSet.h @@ -27,6 +27,7 @@ #include #include #include +#include namespace qpid { @@ -53,7 +54,7 @@ class Range { void begin(const T& t) { begin_ = t; } void end(const T& t) { end_ = t; } - + size_t size() const { return end_ - begin_; } bool empty() const { return begin_ == end_; } bool contains(const T& x) const { return begin_ <= x && x < end_; } @@ -172,6 +173,7 @@ class RangeSet // The difference between the start and end of this range set uint32_t span() const; + size_t size() const; bool empty() const { return ranges.empty(); } void clear() { ranges.clear(); } @@ -185,6 +187,7 @@ class RangeSet template void decode(S& s) { uint16_t sz; s(sz); ranges.resize(sz/sizeof(Range)); } private: + static size_t accumulateSize(size_t s, const Range& r) { return s+r.size(); } Ranges ranges; template friend std::ostream& operator<<(std::ostream& o, const RangeSet& r); @@ -317,6 +320,9 @@ template uint32_t RangeSet::span() const { return ranges.back().last() - ranges.front().first(); } +template size_t RangeSet::size() const { + return std::accumulate(rangesBegin(), rangesEnd(), 0, &RangeSet::accumulateSize); +} } // namespace qpid diff --git a/qpid/cpp/src/qpid/client/AckPolicy.cpp b/qpid/cpp/src/qpid/client/AckPolicy.cpp deleted file mode 100644 index 7956ebad0f..0000000000 --- a/qpid/cpp/src/qpid/client/AckPolicy.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "AckPolicy.h" - -namespace qpid { -namespace client { - -AckPolicy::AckPolicy(size_t n) : interval(n), count(n) {} - -void AckPolicy::ack(const Message& msg, AsyncSession session) -{ - accepted.add(msg.getId()); - if (interval && --count==0) { - session.markCompleted(msg.getId(), false, true); - session.messageAccept(accepted); - accepted.clear(); - count = interval; - } else { - session.markCompleted(msg.getId(), false, false); - } -} - -void AckPolicy::ackOutstanding(AsyncSession session) -{ - if (!accepted.empty()) { - session.messageAccept(accepted); - accepted.clear(); - session.sendCompletion(); - } -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/AckPolicy.h b/qpid/cpp/src/qpid/client/AckPolicy.h deleted file mode 100644 index 84bfb6a46a..0000000000 --- a/qpid/cpp/src/qpid/client/AckPolicy.h +++ /dev/null @@ -1,58 +0,0 @@ -#ifndef QPID_CLIENT_ACKPOLICY_H -#define QPID_CLIENT_ACKPOLICY_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/SequenceSet.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/client/Message.h" - -namespace qpid { -namespace client { - -/** - * Policy for automatic acknowledgement of messages. - * - * - * \ingroup clientapi - */ -class AckPolicy -{ - framing::SequenceSet accepted; - size_t interval; - size_t count; - - public: - /** - * Sends accepts and marks completion of received transfers. - * - *@param n: send an accept for every n messages received. - *n==0 means no automatic acknowledgement. - */ - AckPolicy(size_t n=1); - void ack(const Message& msg, AsyncSession session); - void ackOutstanding(AsyncSession session);}; - -}} // namespace qpid::client - - - -#endif /*!QPID_CLIENT_ACKPOLICY_H*/ diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index c26dba188d..0b7618eb4c 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -19,6 +19,7 @@ * */ #include "Dispatcher.h" +#include "SubscriptionImpl.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/MessageTransferBody.h" @@ -37,18 +38,6 @@ using qpid::sys::Thread; namespace qpid { namespace client { -Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a) - : session(s), listener(l), autoAck(a) {} - -void Subscriber::received(Message& msg) -{ - - if (listener) { - listener->received(msg); - autoAck.ack(msg, session); - } -} - Dispatcher::Dispatcher(const Session& s, const std::string& q) : session(s), running(false), @@ -78,7 +67,7 @@ void Dispatcher::run() FrameSet::shared_ptr content = queue->pop(); if (content->isA()) { Message msg(*content); - Subscriber::shared_ptr listener = find(msg.getDestination()); + boost::intrusive_ptr listener = find(msg.getDestination()); if (!listener) { QPID_LOG(error, "No listener found for destination " << msg.getDestination()); } else { @@ -121,7 +110,7 @@ void Dispatcher::setAutoStop(bool b) autoStop = b; } -Subscriber::shared_ptr Dispatcher::find(const std::string& name) +boost::intrusive_ptr Dispatcher::find(const std::string& name) { ScopedLock l(lock); Listeners::iterator i = listeners.find(name); @@ -131,24 +120,12 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name) return i->second; } -void Dispatcher::listen( - MessageListener* listener, AckPolicy autoAck -) -{ - ScopedLock l(lock); - defaultListener = Subscriber::shared_ptr( - new Subscriber(session, listener, autoAck)); -} - -void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck) -{ +void Dispatcher::listen(const boost::intrusive_ptr& subscription) { ScopedLock l(lock); - listeners[destination] = Subscriber::shared_ptr( - new Subscriber(session, listener, autoAck)); + listeners[subscription->getName()] = subscription; } -void Dispatcher::cancel(const std::string& destination) -{ +void Dispatcher::cancel(const std::string& destination) { ScopedLock l(lock); listeners.erase(destination); if (autoStop && listeners.empty()) diff --git a/qpid/cpp/src/qpid/client/Dispatcher.h b/qpid/cpp/src/qpid/client/Dispatcher.h index d85785ed2c..921c6449a3 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.h +++ b/qpid/cpp/src/qpid/client/Dispatcher.h @@ -30,24 +30,12 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "MessageListener.h" -#include "AckPolicy.h" +#include "SubscriptionImpl.h" namespace qpid { namespace client { -///@internal -class Subscriber : public MessageListener -{ - AsyncSession session; - MessageListener* const listener; - AckPolicy autoAck; - -public: - typedef boost::shared_ptr shared_ptr; - Subscriber(const Session& session, MessageListener* listener, AckPolicy); - void received(Message& msg); - -}; +class SubscriptionImpl; ///@internal typedef framing::Handler FrameSetHandler; @@ -55,7 +43,7 @@ typedef framing::Handler FrameSetHandler; ///@internal class Dispatcher : public sys::Runnable { - typedef std::map Listeners; + typedef std::map >Listeners; sys::Mutex lock; sys::Thread worker; Session session; @@ -63,10 +51,10 @@ class Dispatcher : public sys::Runnable bool running; bool autoStop; Listeners listeners; - Subscriber::shared_ptr defaultListener; + boost::intrusive_ptr defaultListener; std::auto_ptr handler; - Subscriber::shared_ptr find(const std::string& name); + boost::intrusive_ptr find(const std::string& name); bool isStopped(); boost::function failoverHandler; @@ -84,8 +72,7 @@ public: failoverHandler = fh; } - void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy()); - void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy()); + void listen(const boost::intrusive_ptr& subscription); void cancel(const std::string& destination); }; diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp index 98df12fc57..8311e713a4 100644 --- a/qpid/cpp/src/qpid/client/FailoverListener.cpp +++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp @@ -60,7 +60,7 @@ FailoverListener::FailoverListener(const boost::shared_ptr& c, c std::string qname=session.getId().getName(); session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true); session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER); - subscriptions->subscribe(*this, qname, FlowControl::unlimited()); + subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE)); thread = sys::Thread(*subscriptions); } diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index 0331cbeb9e..5fa4cb2800 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -68,7 +68,7 @@ FailoverSubscriptionManager::failover ( ) void FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, - const FlowControl & flow, + const SubscriptionSettings & settings, const std::string & tag, bool record_this ) @@ -77,11 +77,11 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener, subscriptionManager->subscribe ( listener, queue, - flow, + settings, tag ); if ( record_this ) - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, settings, tag, false ) ); } @@ -89,7 +89,7 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener, void FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, - const FlowControl & flow, + const SubscriptionSettings & settings, const std::string & tag, bool record_this ) @@ -98,12 +98,12 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, subscriptionManager->subscribe ( localQueue, queue, - flow, + settings, tag ); if ( record_this ) - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, settings, tag, false ) ); } @@ -245,109 +245,4 @@ FailoverSubscriptionManager::stop ( ) lock.notifyAll(); } - - -void -FailoverSubscriptionManager::setFlowControl ( const std::string & destination, - const FlowControl & flow -) -{ - - subscriptionManager->setFlowControl ( destination, flow ); -} - - - -void -FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow ) -{ - - subscriptionManager->setFlowControl ( flow ); -} - - - -const FlowControl & -FailoverSubscriptionManager::getFlowControl ( ) const -{ - - return subscriptionManager->getFlowControl ( ); -} - - - - -void -FailoverSubscriptionManager::setFlowControl ( const std::string & tag, - uint32_t messages, - uint32_t bytes, - bool window -) -{ - - subscriptionManager->setFlowControl ( tag, - messages, - bytes, - window - ); -} - - - -void -FailoverSubscriptionManager::setFlowControl ( uint32_t messages, - uint32_t bytes, - bool window -) -{ - - subscriptionManager->setFlowControl ( messages, - bytes, - window - ); -} - - - -void -FailoverSubscriptionManager::setAcceptMode ( bool required ) -{ - - subscriptionManager->setAcceptMode ( required ); -} - - - -void -FailoverSubscriptionManager::setAcquireMode ( bool acquire ) -{ - - subscriptionManager->setAcquireMode ( acquire ); -} - - - -void -FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck ) -{ - - subscriptionManager->setAckPolicy ( autoAck ); -} - - - -AckPolicy & -FailoverSubscriptionManager::getAckPolicy() -{ - - return subscriptionManager->getAckPolicy ( ); -} - - - - - - - - }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h index b7631d3a98..0556ba15ec 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h @@ -31,7 +31,7 @@ #include #include #include -#include +#include #include #include @@ -50,13 +50,13 @@ class FailoverSubscriptionManager void subscribe ( MessageListener & listener, const std::string & queue, - const FlowControl & flow, + const SubscriptionSettings & , const std::string & tag = std::string(), bool record_this = true ); void subscribe ( LocalQueue & localQueue, const std::string & queue, - const FlowControl & flow, + const SubscriptionSettings & , const std::string & tag=std::string(), bool record_this = true ); @@ -84,32 +84,6 @@ class FailoverSubscriptionManager void stop ( ); - void setFlowControl ( const std::string & destintion, - const FlowControl & flow ); - - void setFlowControl ( const FlowControl & flow ); - - const FlowControl & getFlowControl ( ) const; - - void setFlowControl ( const std::string & tag, - uint32_t messages, - uint32_t bytes, - bool window=true ); - - void setFlowControl ( uint32_t messages, - uint32_t bytes, - bool window = true - ); - - void setAcceptMode ( bool required ); - - void setAcquireMode ( bool acquire ); - - void setAckPolicy ( const AckPolicy & autoAck ); - - AckPolicy & getAckPolicy(); - - // Get ready for a failover. void prepareForFailover ( Session newSession ); void failover ( ); diff --git a/qpid/cpp/src/qpid/client/FlowControl.h b/qpid/cpp/src/qpid/client/FlowControl.h index 081061ac02..0f5f8596ec 100644 --- a/qpid/cpp/src/qpid/client/FlowControl.h +++ b/qpid/cpp/src/qpid/client/FlowControl.h @@ -22,6 +22,8 @@ * */ +#include + namespace qpid { namespace client { @@ -40,9 +42,8 @@ namespace client { * is renewed. * * In "window mode" credit is automatically renewed when a message is - * acknowledged (@see AckPolicy) In non-window mode credit is not - * automatically renewed, it must be explicitly re-set (@see - * SubscriptionManager) + * accepted. In non-window mode credit is not automatically renewed, + * it must be explicitly re-set (@see Subscription) */ struct FlowControl { static const uint32_t UNLIMITED=0xFFFFFFFF; diff --git a/qpid/cpp/src/qpid/client/Handle.h b/qpid/cpp/src/qpid/client/Handle.h new file mode 100644 index 0000000000..4fd82b7646 --- /dev/null +++ b/qpid/cpp/src/qpid/client/Handle.h @@ -0,0 +1,61 @@ +#ifndef QPID_CLIENT_HANDLE_H +#define QPID_CLIENT_HANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace client { + +template class HandlePrivate; + +/** + * A handle is like a pointer: it points to some underlying object. + * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the + * implicit conversion to bool to test for a null handle. + */ +template class Handle { + public: + ~Handle(); + Handle(const Handle&); + Handle& operator=(const Handle&); + + /**@return true if handle is valid, i.e. not null. */ + bool isValid() const { return impl; } + + /**@return true if handle is null. It is an error to call any function on a null handle. */ + bool isNull() const { return !impl; } + + operator bool() const { return impl; } + bool operator !() const { return impl; } + + void swap(Handle&); + + protected: + Handle(T* =0); + T* impl; + + friend class HandlePrivate; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_HANDLE_H*/ diff --git a/qpid/cpp/src/qpid/client/HandleAccess.h b/qpid/cpp/src/qpid/client/HandleAccess.h new file mode 100644 index 0000000000..f1747db638 --- /dev/null +++ b/qpid/cpp/src/qpid/client/HandleAccess.h @@ -0,0 +1,41 @@ +#ifndef QPID_CLIENT_HANDLEACCESS_H +#define QPID_CLIENT_HANDLEACCESS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +namespace qpid { +namespace client { + +/** + * Provide access to the private impl member of a Handle. + */ +template +class HandleAccess +{ + public: + static boost::shared_ptr getImpl(Handle& h) { return h.impl; } +}; +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_HANDLEACCESS_H*/ diff --git a/qpid/cpp/src/qpid/client/HandlePrivate.h b/qpid/cpp/src/qpid/client/HandlePrivate.h new file mode 100644 index 0000000000..488ce48075 --- /dev/null +++ b/qpid/cpp/src/qpid/client/HandlePrivate.h @@ -0,0 +1,61 @@ +#ifndef QPID_CLIENT_HANDLEPRIVATE_H +#define QPID_CLIENT_HANDLEPRIVATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include + +namespace qpid { +namespace client { + +/** @file + * Private implementation of handle, include in .cpp file of handle + * subclasses _after_ including the declaration of class T. + * T can be any class that can be used with boost::intrusive_ptr. + */ + +template +Handle::Handle(T* p) : impl(p) { if (impl) boost::intrusive_ptr_add_ref(impl); } + +template +Handle::~Handle() { if(impl) boost::intrusive_ptr_release(impl); } + +template +Handle::Handle(const Handle& h) : impl(h.impl) { if(impl) boost::intrusive_ptr_add_ref(impl); } + +template +Handle& Handle::operator=(const Handle& h) { Handle(h).swap(*this); return *this; } + +template +void Handle::swap(Handle& h) { std::swap(impl, h.impl); } + + +/** Access to private impl of a Handle */ +template +class HandlePrivate { + public: + static boost::intrusive_ptr get(Handle& h) { return boost::intrusive_ptr(h.impl); } +}; + + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_HANDLEPRIVATE_H*/ diff --git a/qpid/cpp/src/qpid/client/LocalQueue.cpp b/qpid/cpp/src/qpid/client/LocalQueue.cpp index 99ab6f0133..229d3766ef 100644 --- a/qpid/cpp/src/qpid/client/LocalQueue.cpp +++ b/qpid/cpp/src/qpid/client/LocalQueue.cpp @@ -22,13 +22,15 @@ #include "qpid/Exception.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/reply_exceptions.h" +#include "HandlePrivate.h" +#include "SubscriptionImpl.h" namespace qpid { namespace client { using namespace framing; -LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {} +LocalQueue::LocalQueue() {} LocalQueue::~LocalQueue() {} Message LocalQueue::pop() { return get(); } @@ -48,7 +50,9 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) { if (!ok) return false; if (content->isA()) { result = Message(*content); - autoAck.ack(result, session); + boost::intrusive_ptr si = HandlePrivate::get(subscription); + assert(si); + if (si) si->received(result); return true; } else @@ -56,9 +60,6 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) { QPID_MSG("Unexpected method: " << content->getMethod())); } -void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; } -AckPolicy& LocalQueue::getAckPolicy() { return autoAck; } - bool LocalQueue::empty() const { if (!queue) diff --git a/qpid/cpp/src/qpid/client/LocalQueue.h b/qpid/cpp/src/qpid/client/LocalQueue.h index f81065ef3c..9fe72762c3 100644 --- a/qpid/cpp/src/qpid/client/LocalQueue.h +++ b/qpid/cpp/src/qpid/client/LocalQueue.h @@ -23,8 +23,8 @@ */ #include "qpid/client/Message.h" +#include "qpid/client/Subscription.h" #include "qpid/client/Demux.h" -#include "qpid/client/AckPolicy.h" #include "qpid/sys/Time.h" namespace qpid { @@ -38,17 +38,14 @@ namespace client { * * \ingroup clientapi */ -class LocalQueue -{ +class LocalQueue { public: /** Create a local queue. Subscribe the local queue to a remote broker * queue with a SubscriptionManager. * * LocalQueue is an alternative to implementing a MessageListener. - * - *@param ackPolicy Policy for acknowledging messages. @see AckPolicy. */ - LocalQueue(AckPolicy ackPolicy=AckPolicy()); + LocalQueue(); ~LocalQueue(); @@ -74,16 +71,9 @@ class LocalQueue /** Number of messages on the local queue */ size_t size() const; - /** Set the message acknowledgement policy. @see AckPolicy. */ - void setAckPolicy(AckPolicy); - - /** Get the message acknowledgement policy. @see AckPolicy. */ - AckPolicy& getAckPolicy(); - private: - Session session; Demux::QueuePtr queue; - AckPolicy autoAck; + Subscription subscription; friend class SubscriptionManager; }; diff --git a/qpid/cpp/src/qpid/client/Subscription.cpp b/qpid/cpp/src/qpid/client/Subscription.cpp new file mode 100644 index 0000000000..449c7a736c --- /dev/null +++ b/qpid/cpp/src/qpid/client/Subscription.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Subscription.h" +#include "SubscriptionImpl.h" +#include "HandlePrivate.h" + +namespace qpid { +namespace client { + +template class Handle; + + +std::string Subscription::getName() const { return impl->getName(); } +std::string Subscription::getQueue() const { return impl->getQueue(); } +const SubscriptionSettings& Subscription::getSettings() const { return impl->getSettings(); } +void Subscription::setFlowControl(const FlowControl& f) { impl->setFlowControl(f); } +void Subscription::setAutoAck(size_t n) { impl->setAutoAck(n); } +SequenceSet Subscription::getUnacquired() const { return impl->getUnacquired(); } +SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); } +void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); } +void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); } +Session Subscription::getSession() const { return impl->getSession(); } +SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); } +void Subscription::cancel() { impl->cancel(); } + +}} // namespace qpid::client + + diff --git a/qpid/cpp/src/qpid/client/Subscription.h b/qpid/cpp/src/qpid/client/Subscription.h new file mode 100644 index 0000000000..2ed56d4e8a --- /dev/null +++ b/qpid/cpp/src/qpid/client/Subscription.h @@ -0,0 +1,99 @@ +#ifndef QPID_CLIENT_SUBSCRIPTION_H +#define QPID_CLIENT_SUBSCRIPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/Session.h" +#include "qpid/client/SubscriptionSettings.h" +#include "qpid/client/Handle.h" +#include "qpid/client/Message.h" + +namespace qpid { +namespace client { + +class SubscriptionImpl; +class SubscriptionManager; + +/** + * A handle to an active subscription. Provides methods to query the subscription status + * and control acknowledgement (acquire and accept) of messages. + */ +class Subscription : public Handle { + public: + Subscription(SubscriptionImpl* si=0) : Handle(si) {} + + /** The name of the subsctription, used as the "destination" for messages from the broker. + * Usually the same as the queue name but can be set differently. + */ + std::string getName() const; + + /** Name of the queue this subscription subscribes to */ + std::string getQueue() const; + + /** Get the flow control and acknowledgement settings for this subscription */ + const SubscriptionSettings& getSettings() const; + + /** Set the flow control parameters */ + void setFlowControl(const FlowControl&); + + /** Automatically acknowledge (acquire and accept) batches of n messages. + * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept() + * to manually acquire and accept messages. + */ + void setAutoAck(unsigned int n); + + /** Get the set of ID's for messages received by this subscription but not yet acquired. + * This will always be empty if getSettings().acquireMode=ACQUIRE_MODE_PRE_ACQUIRED + */ + SequenceSet getUnacquired() const; + + /** Get the set of ID's for messages received by this subscription but not yet accepted. */ + SequenceSet getUnaccepted() const; + + /** Acquire messageIds and remove them from the unacquired set. + * oAdd them to the unaccepted set if getSettings().acceptMode == ACCEPT_MODE_EXPLICIT. + */ + void acquire(const SequenceSet& messageIds); + + /** Accept messageIds and remove them from the unaccepted set. + *@pre messageIds is a subset of getUnaccepted() + */ + void accept(const SequenceSet& messageIds); + + /* Acquire a single message */ + void acquire(const Message& m) { acquire(SequenceSet(m.getId())); } + + /* Accept a single message */ + void accept(const Message& m) { accept(SequenceSet(m.getId())); } + + /** Get the session associated with this subscription */ + Session getSession() const; + + /** Get the subscription manager associated with this subscription */ + SubscriptionManager& getSubscriptionManager() const; + + /** Cancel the subscription. */ + void cancel(); +}; +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SUBSCRIPTION_H*/ diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp new file mode 100644 index 0000000000..3363dda11f --- /dev/null +++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SubscriptionImpl.h" +#include "SubscriptionManager.h" +#include "SubscriptionSettings.h" + +namespace qpid { +namespace client { + +using sys::Mutex; + +SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l) + : manager(m), name(n), queue(q), settings(s), listener(l) +{ + async(manager.getSession()).messageSubscribe( + arg::queue=queue, + arg::destination=name, + arg::acceptMode=settings.acceptMode, + arg::acquireMode=settings.acquireMode); + setFlowControl(settings.flowControl); +} + +std::string SubscriptionImpl::getName() const { return name; } + +std::string SubscriptionImpl::getQueue() const { return queue; } + +const SubscriptionSettings& SubscriptionImpl::getSettings() const { + Mutex::ScopedLock l(lock); + return settings; +} + +void SubscriptionImpl::setFlowControl(const FlowControl& f) { + Mutex::ScopedLock l(lock); + AsyncSession s=manager.getSession(); + if (&settings.flowControl != &f) settings.flowControl = f; + s.messageSetFlowMode(name, f.window); + s.messageFlow(name, CREDIT_UNIT_MESSAGE, f.messages); + s.messageFlow(name, CREDIT_UNIT_BYTE, f.bytes); + s.sync(); +} + +void SubscriptionImpl::setAutoAck(size_t n) { + Mutex::ScopedLock l(lock); + settings.autoAck = n; +} + +SequenceSet SubscriptionImpl::getUnacquired() const { Mutex::ScopedLock l(lock); return unacquired; } +SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); return unaccepted; } + +void SubscriptionImpl::acquire(const SequenceSet& messageIds) { + Mutex::ScopedLock l(lock); + manager.getSession().messageAcquire(messageIds); + unacquired.remove(messageIds); + if (settings.acceptMode == ACCEPT_MODE_EXPLICIT) + unaccepted.add(messageIds); +} + +void SubscriptionImpl::accept(const SequenceSet& messageIds) { + Mutex::ScopedLock l(lock); + manager.getSession().messageAccept(messageIds); + unaccepted.remove(messageIds); +} + +Session SubscriptionImpl::getSession() const { return manager.getSession(); } + +SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; } + +void SubscriptionImpl::cancel() { manager.cancel(name); } + +void SubscriptionImpl::received(Message& m) { + Mutex::ScopedLock l(lock); + manager.getSession().markCompleted(m.getId(), false, false); + if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) + unacquired.add(m.getId()); + else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT) + unaccepted.add(m.getId()); + + if (listener) { + Mutex::ScopedUnlock u(lock); + listener->received(m); + } + + if (settings.autoAck) { + if (unacquired.size() + unaccepted.size() >= settings.autoAck) { + if (unacquired.size()) { + async(manager.getSession()).messageAcquire(unacquired); + unaccepted.add(unacquired); + unaccepted.clear(); + } + async(manager.getSession()).messageAccept(unaccepted); + unaccepted.clear(); + } + } +} + +}} // namespace qpid::client + diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.h b/qpid/cpp/src/qpid/client/SubscriptionImpl.h new file mode 100644 index 0000000000..44fd1a7d6c --- /dev/null +++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.h @@ -0,0 +1,99 @@ +#ifndef QPID_CLIENT_SUBSCRIPTIONIMPL_H +#define QPID_CLIENT_SUBSCRIPTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/SubscriptionSettings.h" +#include "qpid/client/Session.h" +#include "qpid/client/MessageListener.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/sys/Mutex.h" +#include "qpid/RefCounted.h" + +namespace qpid { +namespace client { + +class SubscriptionManager; + +class SubscriptionImpl : public RefCounted, public MessageListener { + public: + SubscriptionImpl(SubscriptionManager&, const std::string& queue, + const SubscriptionSettings&, const std::string& name, MessageListener* =0); + + /** The name of the subsctription, used as the "destination" for messages from the broker. + * Usually the same as the queue name but can be set differently. + */ + std::string getName() const; + + /** Name of the queue this subscription subscribes to */ + std::string getQueue() const; + + /** Get the flow control and acknowledgement settings for this subscription */ + const SubscriptionSettings& getSettings() const; + + /** Set the flow control parameters */ + void setFlowControl(const FlowControl&); + + /** Automatically acknowledge (acquire and accept) batches of n messages. + * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept() + * to manually acquire and accept messages. + */ + void setAutoAck(size_t n); + + /** Get the set of ID's for messages received by this subscription but not yet acquired. + * This will always be empty if acquireMode=ACQUIRE_MODE_PRE_ACQUIRED + */ + SequenceSet getUnacquired() const; + + /** Get the set of ID's for messages acquired by this subscription but not yet accepted. */ + SequenceSet getUnaccepted() const; + + /** Acquire messageIds and remove them from the un-acquired set for the session. */ + void acquire(const SequenceSet& messageIds); + + /** Accept messageIds and remove them from the un-acceptd set for the session. */ + void accept(const SequenceSet& messageIds); + + /** Get the session associated with this subscription */ + Session getSession() const; + + /** Get the subscription manager associated with this subscription */ + SubscriptionManager& getSubscriptionManager() const; + + /** Cancel the subscription. */ + void cancel(); + + void received(Message&); + + private: + + mutable sys::Mutex lock; + SubscriptionManager& manager; + std::string name, queue; + SubscriptionSettings settings; + framing::SequenceSet unacquired, unaccepted; + MessageListener* listener; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SUBSCRIPTIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp index dde93635c8..7e2f2f8595 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp @@ -22,6 +22,7 @@ #define _Subscription_ #include "SubscriptionManager.h" +#include "SubscriptionImpl.h" #include #include #include @@ -34,83 +35,41 @@ namespace qpid { namespace client { SubscriptionManager::SubscriptionManager(const Session& s) - : dispatcher(s), session(s), - flowControl(UNLIMITED, UNLIMITED, false), - acceptMode(0), acquireMode(0), - autoStop(true) + : dispatcher(s), session(s), autoStop(true) {} -void SubscriptionManager::subscribeInternal( - const std::string& q, const std::string& dest, const FlowControl& fc) +Subscription SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { - session.messageSubscribe( - arg::queue=q, arg::destination=dest, - arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - if (fc.messages || fc.bytes) // No need to set if all 0. - setFlowControl(dest, fc); + std::string name=n.empty() ? q:n; + boost::intrusive_ptr si = new SubscriptionImpl(*this, q, ss, name, &listener); + dispatcher.listen(si); + return subscriptions[name] = Subscription(si.get()); } -void SubscriptionManager::subscribe( - MessageListener& listener, const std::string& q, const std::string& d) +Subscription SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { - subscribe(listener, q, getFlowControl(), d); + std::string name=n.empty() ? q:n; + lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name)); + boost::intrusive_ptr si = new SubscriptionImpl(*this, q, ss, name, 0); + lq.subscription = Subscription(si.get()); + return subscriptions[name] = lq.subscription; } -void SubscriptionManager::subscribe( - MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d) +Subscription SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const std::string& n) { - std::string dest=d.empty() ? q:d; - dispatcher.listen(dest, &listener, autoAck); - return subscribeInternal(q, dest, fc); + return subscribe(listener, q, defaultSettings, n); } -void SubscriptionManager::subscribe( - LocalQueue& lq, const std::string& q, const std::string& d) +Subscription SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const std::string& n) { - subscribe(lq, q, getFlowControl(), d); + return subscribe(lq, q, defaultSettings, n); } -void SubscriptionManager::subscribe( - LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d) -{ - std::string dest=d.empty() ? q:d; - lq.session=session; - lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); - return subscribeInternal(q, dest, fc); -} - -void SubscriptionManager::setFlowControl( - const std::string& dest, uint32_t messages, uint32_t bytes, bool window) -{ - session.messageSetFlowMode(dest, window); - session.messageFlow(dest, 0, messages); - session.messageFlow(dest, 1, bytes); - session.sync(); -} - -void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) { - setFlowControl(dest, fc.messages, fc.bytes, fc.window); -} - -void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; } - -void SubscriptionManager::setFlowControl( - uint32_t messages_, uint32_t bytes_, bool window_) -{ - setFlowControl(FlowControl(messages_, bytes_, window_)); -} - -const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; } - -void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } - -void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } - -void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; } - -AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } - -void SubscriptionManager::cancel(const std::string dest) +void SubscriptionManager::cancel(const std::string& dest) { sync(session).messageCancel(dest); dispatcher.cancel(dest); @@ -138,10 +97,11 @@ void SubscriptionManager::stop() bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { LocalQueue lq; std::string unique = framing::Uuid(true).str(); - subscribe(lq, queue, FlowControl::messageCredit(1), unique); + subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique); AutoCancel ac(*this, unique); //first wait for message to be delivered if a timeout has been specified - if (timeout && lq.get(result, timeout)) return true; + if (timeout && lq.get(result, timeout)) + return true; //make sure message is not on queue before final check sync(session).messageFlush(unique); return lq.get(result, 0); @@ -149,6 +109,10 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du Session SubscriptionManager::getSession() const { return session; } +Subscription SubscriptionManager::getSubscription(const std::string& name) const { + return subscriptions.at(name); +} + void SubscriptionManager::registerFailoverHandler (boost::function fh) { dispatcher.registerFailoverHandler(fh); } diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.h b/qpid/cpp/src/qpid/client/SubscriptionManager.h index 07faa48fee..8b27a2c9b9 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionManager.h +++ b/qpid/cpp/src/qpid/client/SubscriptionManager.h @@ -25,9 +25,10 @@ #include #include #include +#include #include #include -#include +#include #include #include #include @@ -48,15 +49,10 @@ class SubscriptionManager : public sys::Runnable typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&); - qpid::client::Dispatcher dispatcher; qpid::client::AsyncSession session; - FlowControl flowControl; - AckPolicy autoAck; - bool acceptMode; - bool acquireMode; bool autoStop; + SubscriptionSettings defaultSettings; public: /** Create a new SubscriptionManager associated with a session */ @@ -70,14 +66,13 @@ class SubscriptionManager : public sys::Runnable * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. - *@param flow initial FlowControl for the subscription. - *@param tag Unique destination tag for the listener. - * If not specified, the queue name is used. + *@param settings settings for the subscription. + *@param name unique destination name for the subscription, defaults to queue name. */ - void subscribe(MessageListener& listener, - const std::string& queue, - const FlowControl& flow, - const std::string& tag=std::string()); + Subscription subscribe(MessageListener& listener, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. @@ -86,13 +81,13 @@ class SubscriptionManager : public sys::Runnable * *@param queue Name of the queue to subscribe to. *@param flow initial FlowControl for the subscription. - *@param tag Unique destination tag for the listener. + *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ - void subscribe(LocalQueue& localQueue, - const std::string& queue, - const FlowControl& flow, - const std::string& tag=std::string()); + Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); /** * Subscribe a MessagesListener to receive messages from queue. @@ -102,12 +97,12 @@ class SubscriptionManager : public sys::Runnable * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. - *@param tag Unique destination tag for the listener. + *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ - void subscribe(MessageListener& listener, - const std::string& queue, - const std::string& tag=std::string()); + Subscription subscribe(MessageListener& listener, + const std::string& queue, + const std::string& name=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. @@ -115,12 +110,12 @@ class SubscriptionManager : public sys::Runnable * Incoming messages are stored in the queue for you to retrieve. * *@param queue Name of the queue to subscribe to. - *@param tag Unique destination tag for the listener. + *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ - void subscribe(LocalQueue& localQueue, - const std::string& queue, - const std::string& tag=std::string()); + Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const std::string& name=std::string()); /** Get a single message from a queue. @@ -131,8 +126,13 @@ class SubscriptionManager : public sys::Runnable */ bool get(Message& result, const std::string& queue, sys::Duration timeout=0); - /** Cancel a subscription. */ - void cancel(const std::string tag); + /** Get a subscription by name, returns a null Subscription handle + * if not found. + */ + Subscription getSubscription(const std::string& name) const; + + /** Cancel a subscription. See also: Subscription.cancel() */ + void cancel(const std::string& name); /** Deliver messages in the current thread until stop() is called. * Only one thread may be running in a SubscriptionManager at a time. @@ -157,53 +157,65 @@ class SubscriptionManager : public sys::Runnable static const uint32_t UNLIMITED=0xFFFFFFFF; - /** Set the flow control for destination. */ - void setFlowControl(const std::string& destintion, const FlowControl& flow); - - /** Set the default initial flow control for subscriptions that do not specify it. */ - void setFlowControl(const FlowControl& flow); + /** Set the flow control for a subscription. */ + void setFlowControl(const std::string& name, const FlowControl& flow) { + getSubscription(name).setFlowControl(flow); + } - /** Get the default flow control for new subscriptions that do not specify it. */ - const FlowControl& getFlowControl() const; - - /** Set the flow control for destination tag. - *@param tag: name of the destination. + /** Set the flow control for a subscription. + *@param name: name of the subscription. *@param messages: message credit. *@param bytes: byte credit. *@param window: if true use window-based flow control. */ - void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true); + void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true) { + setFlowControl(name, messages, bytes, window); + } - /** Set the initial flow control settings to be applied to each new subscribtion. - *@param messages: message credit. - *@param bytes: byte credit. - *@param window: if true use window-based flow control. + /** Set the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; } + + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. */ - void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); + const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; } - /** Set the accept-mode for new subscriptions. Defaults to true. - *@param required: if true messages must be confirmed by calling - *Message::acknowledge() or automatically via an AckPolicy, see setAckPolicy() + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. */ - void setAcceptMode(bool required); + SubscriptionSettings& getDefaultSettings() { return defaultSettings; } - /** Set the acquire-mode for new subscriptions. Defaults to false. - *@param acquire: if false messages pre-acquired, if true - * messages are dequed on acknowledgement or on transfer - * depending on acceptMode. + /** + * Set the default flow control settings for subscribe() calls + * that don't include a SubscriptionSettings parameter. + * + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. */ - void setAcquireMode(bool acquire); + void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) { + defaultSettings.flowControl = FlowControl(messages, bytes, window); + } - /** Set the acknowledgement policy for new subscriptions. - * Default is to acknowledge every message automatically. + /** + *Set the default accept-mode for subscribe() calls that don't + *include a SubscriptionSettings parameter. */ - void setAckPolicy(const AckPolicy& autoAck); + void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; } - AckPolicy& getAckPolicy(); + /** + * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings. + */ + void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; } void registerFailoverHandler ( boost::function fh ); Session getSession() const; + + private: + std::map subscriptions; }; /** AutoCancel cancels a subscription in its destructor */ diff --git a/qpid/cpp/src/qpid/client/SubscriptionSettings.h b/qpid/cpp/src/qpid/client/SubscriptionSettings.h new file mode 100644 index 0000000000..924814c809 --- /dev/null +++ b/qpid/cpp/src/qpid/client/SubscriptionSettings.h @@ -0,0 +1,62 @@ +#ifndef QPID_CLIENT_SUBSCRIPTIONSETTINGS_H +#define QPID_CLIENT_SUBSCRIPTIONSETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/FlowControl.h" +#include "qpid/framing/enum.h" + +namespace qpid { +namespace client { + +/** Bring AMQP enum definitions for message class into this namespace. */ +using namespace qpid::framing::message; + +/** + * Settings for a subscription. + */ +struct SubscriptionSettings +{ + SubscriptionSettings( + FlowControl flow=FlowControl::unlimited(), + AcceptMode accept=ACCEPT_MODE_EXPLICIT, + AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED, + unsigned int autoAck_=1 + ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_) {} + + FlowControl flowControl; ///@< Flow control settings. @see FlowControl + AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE + AcquireMode acquireMode; ///@< ACQUIRE_MODE_PRE_ACQUIRED or ACQUIRE_MODE_NOT_ACQUIRED + + /** Automatically acknowledge (acquire and accept) batches of autoAck messages. + * 0 means no automatic acknowledgement. What it means to "acknowledge" a message depends on + * acceptMode and acquireMode: + * - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing + * - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire" command + * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept" command + * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire" and "accept" commands + */ + unsigned int autoAck; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SUBSCRIPTIONSETTINGS_H*/ diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 440605a2e4..abe317aad8 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -20,8 +20,7 @@ */ #include "unit_test.h" #include "BrokerFixture.h" -#include "qpid/client/AckPolicy.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" @@ -52,22 +51,22 @@ struct DummyListener : public sys::Runnable, public MessageListener { std::vector messages; string name; uint expected; - Dispatcher dispatcher; + SubscriptionManager submgr; DummyListener(Session& session, const string& n, uint ex) : - name(n), expected(ex), dispatcher(session) {} + name(n), expected(ex), submgr(session) {} void run() { - dispatcher.listen(name, this); - dispatcher.run(); + submgr.subscribe(*this, name); + submgr.run(); } void received(Message& msg) { messages.push_back(msg); if (--expected == 0) { - dispatcher.stop(); + submgr.stop(); } } }; @@ -95,53 +94,30 @@ struct SimpleListener : public MessageListener struct ClientSessionFixture : public ProxySessionFixture { - ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {} - - void declareSubscribe(const string& q="my-queue", - const string& dest="my-dest") - { - session.queueDeclare(arg::queue=q); - session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::acquireMode=1); - session.messageFlow(arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);//messages - session.messageFlow(arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);//bytes + ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) { + session.queueDeclare(arg::queue="my-queue"); } }; QPID_AUTO_TEST_CASE(testQueueQuery) { ClientSessionFixture fix; fix.session = fix.connection.newSession(); - fix.session.queueDeclare(arg::queue="my-queue", arg::alternateExchange="amq.fanout", arg::exclusive=true, arg::autoDelete=true); - QueueQueryResult result = fix.session.queueQuery(string("my-queue")); + fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout", + arg::exclusive=true, arg::autoDelete=true); + QueueQueryResult result = fix.session.queueQuery("q"); BOOST_CHECK_EQUAL(false, result.getDurable()); BOOST_CHECK_EQUAL(true, result.getExclusive()); - BOOST_CHECK_EQUAL(string("amq.fanout"), - result.getAlternateExchange()); -} - -QPID_AUTO_TEST_CASE(testTransfer) -{ - ClientSessionFixture fix; - fix.session=fix.connection.newSession(); - fix.declareSubscribe(); - fix.session.messageTransfer(arg::acceptMode=1, arg::content=TransferContent("my-message", "my-queue")); - //get & test the message: - FrameSet::shared_ptr msg = fix.session.get(); - BOOST_CHECK(msg->isA()); - BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); - //confirm receipt: - AckPolicy autoAck; - autoAck.ack(Message(*msg), fix.session); + BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange()); } QPID_AUTO_TEST_CASE(testDispatcher) { ClientSessionFixture fix; fix.session =fix.connection.newSession(); - fix.declareSubscribe(); size_t count = 100; for (size_t i = 0; i < count; ++i) fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast(i), "my-queue")); - DummyListener listener(fix.session, "my-dest", count); + DummyListener listener(fix.session, "my-queue", count); listener.run(); BOOST_CHECK_EQUAL(count, listener.messages.size()); for (size_t i = 0; i < count; ++i) @@ -152,9 +128,8 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) { ClientSessionFixture fix; fix.session =fix.connection.newSession(); - fix.declareSubscribe(); size_t count = 10; - DummyListener listener(fix.session, "my-dest", count); + DummyListener listener(fix.session, "my-queue", count); sys::Thread t(listener); for (size_t i = 0; i < count; ++i) { fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast(i), "my-queue")); @@ -190,7 +165,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) { ClientSessionFixture fix; fix.session.timeout(60); - fix.declareSubscribe(); fix.session.suspend(); // Make sure we are still subscribed after resume. fix.connection.resume(fix.session); @@ -234,7 +208,7 @@ QPID_AUTO_TEST_CASE(testLocalQueue) { BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); BOOST_CHECK(lq.empty()); // Credit exhausted. - fix.subs.setFlowControl("lq", FlowControl::unlimited()); + fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited()); BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); } diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index f7fe81a709..28f555cf6a 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -168,8 +168,10 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy) ProxySessionFixture f; std::string q("my-ring-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); - LocalQueue incoming(AckPolicy(0));//no automatic acknowledgements - f.subs.subscribe(incoming, q); + LocalQueue incoming; + SubscriptionSettings settings(FlowControl::unlimited()); + settings.autoAck = 0; // no auto ack. + Subscription sub = f.subs.subscribe(incoming, q, settings); for (int i = 0; i < 5; i++) { f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); } diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp index 534ecf70f2..98558f0a76 100644 --- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp +++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp @@ -29,7 +29,7 @@ #include "qpid/framing/TransferContent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/client/Connection.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/LocalQueue.h" #include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" @@ -54,30 +54,6 @@ using std::endl; Shlib shlib("../.libs/xml.so"); -struct DummyListener : public sys::Runnable, public MessageListener { - std::vector messages; - string name; - uint expected; - Dispatcher dispatcher; - - DummyListener(Session& session, const string& n, uint ex) : - name(n), expected(ex), dispatcher(session) {} - - void run() - { - dispatcher.listen(name, this); - dispatcher.run(); - } - - void received(Message& msg) - { - messages.push_back(msg); - if (--expected == 0) - dispatcher.stop(); - } -}; - - class SubscribedLocalQueue : public LocalQueue { private: SubscriptionManager& subscriptions; diff --git a/qpid/cpp/src/tests/consume.cpp b/qpid/cpp/src/tests/consume.cpp index 29c61ada1b..4d74b8ae57 100644 --- a/qpid/cpp/src/tests/consume.cpp +++ b/qpid/cpp/src/tests/consume.cpp @@ -75,11 +75,11 @@ struct Client if (opts.declare) session.queueDeclare(opts.queue); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.ack)); - subs.setAcceptMode(opts.ack > 0 ? 0 : 1); - subs.setFlowControl(opts.count, SubscriptionManager::UNLIMITED, - false); - subs.subscribe(lq, opts.queue); + LocalQueue lq; + SubscriptionSettings settings; + settings.acceptMode = opts.ack > 0 ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; + settings.flowControl = FlowControl(opts.count, SubscriptionManager::UNLIMITED,false); + Subscription sub = subs.subscribe(lq, opts.queue, settings); Message msg; AbsTime begin=now(); for (size_t i = 0; i < opts.count; ++i) { @@ -87,7 +87,7 @@ struct Client QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId()); } if (opts.ack != 0) - subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. + sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch. AbsTime end=now(); double secs(double(Duration(begin,end))/TIME_SEC); if (opts.summary) cout << opts.count/secs << endl; diff --git a/qpid/cpp/src/tests/echotest.cpp b/qpid/cpp/src/tests/echotest.cpp index a57e2de5ad..7cbf3e7df4 100644 --- a/qpid/cpp/src/tests/echotest.cpp +++ b/qpid/cpp/src/tests/echotest.cpp @@ -92,9 +92,7 @@ void Listener::start(uint size) { session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true); request.getDeliveryProperties().setRoutingKey(queue); - subscriptions.setAcceptMode(1/*not-required*/); - subscriptions.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); - subscriptions.subscribe(*this, queue); + subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE)); request.getDeliveryProperties().setTimestamp(current_time()); if (size) request.setData(std::string(size, 'X')); diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp index 524870a0e8..a980a43322 100644 --- a/qpid/cpp/src/tests/latencytest.cpp +++ b/qpid/cpp/src/tests/latencytest.cpp @@ -204,14 +204,15 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0 std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl; session.queuePurge(arg::queue=queue); } + SubscriptionSettings settings; if (opts.prefetch) { - mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2))); - mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true); + settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2)); + settings.flowControl = FlowControl::messageWindow(opts.prefetch); } else { - mgr.setAcceptMode(1/*not-required*/); - mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); + settings.acceptMode = ACCEPT_MODE_NONE; + settings.flowControl = FlowControl::unlimited(); } - mgr.subscribe(*this, queue); + mgr.subscribe(*this, queue, settings); } void Receiver::test() diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index 2e15489525..eb7235a09b 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -560,11 +560,12 @@ struct SubscribeThread : public Client { try { if (opts.txSub) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack)); - subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1); - subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, - false); - subs.subscribe(lq, queue); + SubscriptionSettings settings; + settings.autoAck = opts.txSub ? opts.txSub : opts.ack; + settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT); + settings.flowControl = FlowControl::messageCredit(opts.subQuota); + LocalQueue lq; + Subscription subscription = subs.subscribe(lq, queue, settings); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); if (opts.txSub) { @@ -603,7 +604,7 @@ struct SubscribeThread : public Client { } } if (opts.txSub || opts.ack) - lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. + subscription.accept(subscription.getUnaccepted()); if (opts.txSub) { if (opts.commitAsync) session.txCommit(); else sync(session).txCommit(); diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp index 0da26fa2c4..7bdc2c32de 100644 --- a/qpid/cpp/src/tests/topic_listener.cpp +++ b/qpid/cpp/src/tests/topic_listener.cpp @@ -66,6 +66,7 @@ class Listener : public MessageListener{ public: Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); virtual void received(Message& msg); + Subscription subscription; }; /** @@ -118,14 +119,15 @@ int main(int argc, char** argv){ //set up listener SubscriptionManager mgr(session); Listener listener(session, mgr, "response", args.transactional); + SubscriptionSettings settings; if (args.prefetch) { - mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2))); - mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true); + settings.autoAck = (args.ack ? args.ack : (args.prefetch / 2)); + settings.flowControl = FlowControl::messageCredit(args.prefetch); } else { - mgr.setAcceptMode(1/*-not-required*/); - mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); + settings.acceptMode = ACCEPT_MODE_NONE; + settings.flowControl = FlowControl::unlimited(); } - mgr.subscribe(listener, control); + listener.subscription = mgr.subscribe(listener, control, settings); session.sync(); if( args.statusqueue.length() > 0 ) { @@ -170,7 +172,7 @@ void Listener::received(Message& message){ if(string("TERMINATION_REQUEST") == type){ shutdown(); }else if(string("REPORT_REQUEST") == type){ - mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point + subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point cout <<"Batch ended, sending report." << endl; //send a report: report(); diff --git a/qpid/cpp/src/tests/txtest.cpp b/qpid/cpp/src/tests/txtest.cpp index c285ff9fcc..9d253ddb7f 100644 --- a/qpid/cpp/src/tests/txtest.cpp +++ b/qpid/cpp/src/tests/txtest.cpp @@ -139,9 +139,10 @@ struct Transfer : public Client, public Runnable else session.txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(0));//manual acking - subs.setFlowControl(opts.msgsPerTx, SubscriptionManager::UNLIMITED, true); - subs.subscribe(lq, src); + LocalQueue lq; + SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx)); + settings.autoAck = 0; // Disabled + Subscription sub = subs.subscribe(lq, src, settings); for (uint t = 0; t < opts.txCount; t++) { Message in; @@ -157,7 +158,7 @@ struct Transfer : public Client, public Runnable out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); session.messageTransfer(arg::content=out, arg::acceptMode=1); } - lq.getAckPolicy().ackOutstanding(session); + sub.accept(sub.getUnaccepted()); if (opts.dtx) { session.dtxEnd(arg::xid=xid); session.dtxPrepare(arg::xid=xid); @@ -230,8 +231,6 @@ struct Controller : public Client int check() { SubscriptionManager subs(session); - subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); - subs.setAcceptMode(1/*not-required*/); // Recover DTX transactions (if any) if (opts.dtx) { @@ -262,9 +261,10 @@ struct Controller : public Client StringSet drained; //drain each queue and verify the correct set of messages are available for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - //subscribe, allocate credit and flush - LocalQueue lq(AckPolicy(0));//manual acking - subs.subscribe(lq, *i, *i); + //subscribe, allocate credit and flushn + LocalQueue lq; + SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE); + subs.subscribe(lq, *i, settings); session.messageFlush(arg::destination=*i); session.sync(); -- cgit v1.2.1