summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-25 01:55:06 +0000
committerAlan Conway <aconway@apache.org>2008-10-25 01:55:06 +0000
commit10d07002af4b211dfbbc3341a4edb6ec4c2e5cb5 (patch)
tree18573267ea026fd920684ad77dc139638334fc6d
parent10beffb0c6d4233422f668d9d346770ad2f50295 (diff)
downloadqpid-python-10d07002af4b211dfbbc3341a4edb6ec4c2e5cb5.tar.gz
Client API change: Centralize access to subscription status, better control of acquire/accept.
client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe. client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools. Issues addressed by the change: - old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing. - old AckPolicy was broken - not possible to access the instance associated with an active subscription - old AckPolicy did not provide a way to do manual acquire, only accept. - setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually. - a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707808 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am15
-rw-r--r--qpid/cpp/src/qpid/RangeSet.h8
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp35
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.h25
-rw-r--r--qpid/cpp/src/qpid/client/FailoverListener.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp117
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h32
-rw-r--r--qpid/cpp/src/qpid/client/FlowControl.h7
-rw-r--r--qpid/cpp/src/qpid/client/Handle.h (renamed from qpid/cpp/src/qpid/client/AckPolicy.h)61
-rw-r--r--qpid/cpp/src/qpid/client/HandleAccess.h (renamed from qpid/cpp/src/qpid/client/AckPolicy.cpp)39
-rw-r--r--qpid/cpp/src/qpid/client/HandlePrivate.h61
-rw-r--r--qpid/cpp/src/qpid/client/LocalQueue.cpp11
-rw-r--r--qpid/cpp/src/qpid/client/LocalQueue.h18
-rw-r--r--qpid/cpp/src/qpid/client/Subscription.cpp47
-rw-r--r--qpid/cpp/src/qpid/client/Subscription.h99
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionImpl.cpp116
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionImpl.h99
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionManager.cpp94
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionManager.h128
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionSettings.h62
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp56
-rw-r--r--qpid/cpp/src/tests/QueuePolicyTest.cpp6
-rw-r--r--qpid/cpp/src/tests/XmlClientSessionTest.cpp26
-rw-r--r--qpid/cpp/src/tests/consume.cpp12
-rw-r--r--qpid/cpp/src/tests/echotest.cpp4
-rw-r--r--qpid/cpp/src/tests/latencytest.cpp11
-rw-r--r--qpid/cpp/src/tests/perftest.cpp13
-rw-r--r--qpid/cpp/src/tests/topic_listener.cpp14
-rw-r--r--qpid/cpp/src/tests/txtest.cpp18
29 files changed, 740 insertions, 496 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index f8c4fc42a8..07b7957a8e 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -368,14 +368,15 @@ libqpidclient_la_LIBADD = libqpidcommon.la -luuid
libqpidclient_la_SOURCES = \
$(rgen_client_srcs) \
- qpid/client/AckPolicy.cpp \
qpid/client/Bounds.cpp \
qpid/client/Connection.cpp \
qpid/client/ConnectionHandler.cpp \
+ qpid/client/ConnectionImpl.h \
qpid/client/ConnectionImpl.cpp \
qpid/client/ConnectionSettings.cpp \
qpid/client/Connector.cpp \
qpid/client/Demux.cpp \
+ qpid/client/Dispatcher.h \
qpid/client/Dispatcher.cpp \
qpid/client/FailoverConnection.cpp \
qpid/client/FailoverSession.cpp \
@@ -385,6 +386,7 @@ libqpidclient_la_SOURCES = \
qpid/client/Future.cpp \
qpid/client/FutureCompletion.cpp \
qpid/client/FutureResult.cpp \
+ qpid/client/HandlePrivate.h \
qpid/client/LoadPlugins.cpp \
qpid/client/LocalQueue.cpp \
qpid/client/Message.cpp \
@@ -395,8 +397,12 @@ libqpidclient_la_SOURCES = \
qpid/client/SessionBase_0_10.h \
qpid/client/SessionBase_0_10Access.h \
qpid/client/ConnectionAccess.h \
+ qpid/client/SessionImpl.h \
qpid/client/SessionImpl.cpp \
qpid/client/StateManager.cpp \
+ qpid/client/Subscription.cpp \
+ qpid/client/SubscriptionImpl.h \
+ qpid/client/SubscriptionImpl.cpp \
qpid/client/SubscriptionManager.cpp
nobase_include_HEADERS = \
@@ -500,25 +506,25 @@ nobase_include_HEADERS = \
qpid/broker/TxPublish.h \
qpid/broker/Vhost.h \
qpid/client/AckMode.h \
- qpid/client/AckPolicy.h \
qpid/client/Bounds.h \
qpid/client/ChainableFrameHandler.h \
qpid/client/Completion.h \
qpid/client/Connection.h \
qpid/client/ConnectionHandler.h \
- qpid/client/ConnectionImpl.h \
qpid/client/ConnectionSettings.h \
qpid/client/Connector.h \
qpid/client/Demux.h \
- qpid/client/Dispatcher.h \
qpid/client/Execution.h \
qpid/client/FailoverConnection.h \
qpid/client/FailoverSession.h \
+ qpid/client/Subscription.h \
+ qpid/client/SubscriptionSettings.h \
qpid/client/FailoverSubscriptionManager.h \
qpid/client/FlowControl.h \
qpid/client/Future.h \
qpid/client/FutureCompletion.h \
qpid/client/FutureResult.h \
+ qpid/client/Handle.h \
qpid/client/LocalQueue.h \
qpid/client/QueueOptions.h \
qpid/client/Message.h \
@@ -527,7 +533,6 @@ nobase_include_HEADERS = \
qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
qpid/client/AsyncSession.h \
- qpid/client/SessionImpl.h \
qpid/client/StateManager.h \
qpid/client/SubscriptionManager.h \
qpid/client/TypedResult.h \
diff --git a/qpid/cpp/src/qpid/RangeSet.h b/qpid/cpp/src/qpid/RangeSet.h
index 2a88426f17..1ba4fbbcef 100644
--- a/qpid/cpp/src/qpid/RangeSet.h
+++ b/qpid/cpp/src/qpid/RangeSet.h
@@ -27,6 +27,7 @@
#include <boost/operators.hpp>
#include <boost/bind.hpp>
#include <algorithm>
+#include <numeric>
namespace qpid {
@@ -53,7 +54,7 @@ class Range {
void begin(const T& t) { begin_ = t; }
void end(const T& t) { end_ = t; }
-
+ size_t size() const { return end_ - begin_; }
bool empty() const { return begin_ == end_; }
bool contains(const T& x) const { return begin_ <= x && x < end_; }
@@ -172,6 +173,7 @@ class RangeSet
// The difference between the start and end of this range set
uint32_t span() const;
+ size_t size() const;
bool empty() const { return ranges.empty(); }
void clear() { ranges.clear(); }
@@ -185,6 +187,7 @@ class RangeSet
template <class S> void decode(S& s) { uint16_t sz; s(sz); ranges.resize(sz/sizeof(Range<T>)); }
private:
+ static size_t accumulateSize(size_t s, const Range<T>& r) { return s+r.size(); }
Ranges ranges;
template <class U> friend std::ostream& operator<<(std::ostream& o, const RangeSet<U>& r);
@@ -317,6 +320,9 @@ template <class T> uint32_t RangeSet<T>::span() const {
return ranges.back().last() - ranges.front().first();
}
+template <class T> size_t RangeSet<T>::size() const {
+ return std::accumulate(rangesBegin(), rangesEnd(), 0, &RangeSet<T>::accumulateSize);
+}
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index c26dba188d..0b7618eb4c 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -19,6 +19,7 @@
*
*/
#include "Dispatcher.h"
+#include "SubscriptionImpl.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -37,18 +38,6 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
-Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
- : session(s), listener(l), autoAck(a) {}
-
-void Subscriber::received(Message& msg)
-{
-
- if (listener) {
- listener->received(msg);
- autoAck.ack(msg, session);
- }
-}
-
Dispatcher::Dispatcher(const Session& s, const std::string& q)
: session(s),
running(false),
@@ -78,7 +67,7 @@ void Dispatcher::run()
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
Message msg(*content);
- Subscriber::shared_ptr listener = find(msg.getDestination());
+ boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " << msg.getDestination());
} else {
@@ -121,7 +110,7 @@ void Dispatcher::setAutoStop(bool b)
autoStop = b;
}
-Subscriber::shared_ptr Dispatcher::find(const std::string& name)
+boost::intrusive_ptr<SubscriptionImpl> Dispatcher::find(const std::string& name)
{
ScopedLock<Mutex> l(lock);
Listeners::iterator i = listeners.find(name);
@@ -131,24 +120,12 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name)
return i->second;
}
-void Dispatcher::listen(
- MessageListener* listener, AckPolicy autoAck
-)
-{
- ScopedLock<Mutex> l(lock);
- defaultListener = Subscriber::shared_ptr(
- new Subscriber(session, listener, autoAck));
-}
-
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck)
-{
+void Dispatcher::listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription) {
ScopedLock<Mutex> l(lock);
- listeners[destination] = Subscriber::shared_ptr(
- new Subscriber(session, listener, autoAck));
+ listeners[subscription->getName()] = subscription;
}
-void Dispatcher::cancel(const std::string& destination)
-{
+void Dispatcher::cancel(const std::string& destination) {
ScopedLock<Mutex> l(lock);
listeners.erase(destination);
if (autoStop && listeners.empty())
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.h b/qpid/cpp/src/qpid/client/Dispatcher.h
index d85785ed2c..921c6449a3 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.h
+++ b/qpid/cpp/src/qpid/client/Dispatcher.h
@@ -30,24 +30,12 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "MessageListener.h"
-#include "AckPolicy.h"
+#include "SubscriptionImpl.h"
namespace qpid {
namespace client {
-///@internal
-class Subscriber : public MessageListener
-{
- AsyncSession session;
- MessageListener* const listener;
- AckPolicy autoAck;
-
-public:
- typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(const Session& session, MessageListener* listener, AckPolicy);
- void received(Message& msg);
-
-};
+class SubscriptionImpl;
///@internal
typedef framing::Handler<framing::FrameSet> FrameSetHandler;
@@ -55,7 +43,7 @@ typedef framing::Handler<framing::FrameSet> FrameSetHandler;
///@internal
class Dispatcher : public sys::Runnable
{
- typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
+ typedef std::map<std::string, boost::intrusive_ptr<SubscriptionImpl> >Listeners;
sys::Mutex lock;
sys::Thread worker;
Session session;
@@ -63,10 +51,10 @@ class Dispatcher : public sys::Runnable
bool running;
bool autoStop;
Listeners listeners;
- Subscriber::shared_ptr defaultListener;
+ boost::intrusive_ptr<SubscriptionImpl> defaultListener;
std::auto_ptr<FrameSetHandler> handler;
- Subscriber::shared_ptr find(const std::string& name);
+ boost::intrusive_ptr<SubscriptionImpl> find(const std::string& name);
bool isStopped();
boost::function<void ()> failoverHandler;
@@ -84,8 +72,7 @@ public:
failoverHandler = fh;
}
- void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
- void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy());
+ void listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription);
void cancel(const std::string& destination);
};
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp
index 98df12fc57..8311e713a4 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp
@@ -60,7 +60,7 @@ FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, c
std::string qname=session.getId().getName();
session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
- subscriptions->subscribe(*this, qname, FlowControl::unlimited());
+ subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
thread = sys::Thread(*subscriptions);
}
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index 0331cbeb9e..5fa4cb2800 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -68,7 +68,7 @@ FailoverSubscriptionManager::failover ( )
void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & settings,
const std::string & tag,
bool record_this
)
@@ -77,11 +77,11 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
subscriptionManager->subscribe ( listener,
queue,
- flow,
+ settings,
tag
);
if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, settings, tag, false ) );
}
@@ -89,7 +89,7 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & settings,
const std::string & tag,
bool record_this
)
@@ -98,12 +98,12 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
subscriptionManager->subscribe ( localQueue,
queue,
- flow,
+ settings,
tag
);
if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, settings, tag, false ) );
}
@@ -245,109 +245,4 @@ FailoverSubscriptionManager::stop ( )
lock.notifyAll();
}
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
- const FlowControl & flow
-)
-{
-
- subscriptionManager->setFlowControl ( destination, flow );
-}
-
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
-{
-
- subscriptionManager->setFlowControl ( flow );
-}
-
-
-
-const FlowControl &
-FailoverSubscriptionManager::getFlowControl ( ) const
-{
-
- return subscriptionManager->getFlowControl ( );
-}
-
-
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
- uint32_t messages,
- uint32_t bytes,
- bool window
-)
-{
-
- subscriptionManager->setFlowControl ( tag,
- messages,
- bytes,
- window
- );
-}
-
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
- uint32_t bytes,
- bool window
-)
-{
-
- subscriptionManager->setFlowControl ( messages,
- bytes,
- window
- );
-}
-
-
-
-void
-FailoverSubscriptionManager::setAcceptMode ( bool required )
-{
-
- subscriptionManager->setAcceptMode ( required );
-}
-
-
-
-void
-FailoverSubscriptionManager::setAcquireMode ( bool acquire )
-{
-
- subscriptionManager->setAcquireMode ( acquire );
-}
-
-
-
-void
-FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
-{
-
- subscriptionManager->setAckPolicy ( autoAck );
-}
-
-
-
-AckPolicy &
-FailoverSubscriptionManager::getAckPolicy()
-{
-
- return subscriptionManager->getAckPolicy ( );
-}
-
-
-
-
-
-
-
-
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
index b7631d3a98..0556ba15ec 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
@@ -31,7 +31,7 @@
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/LocalQueue.h>
-#include <qpid/client/FlowControl.h>
+#include <qpid/client/SubscriptionSettings.h>
#include <qpid/sys/Runnable.h>
#include <qpid/sys/Monitor.h>
@@ -50,13 +50,13 @@ class FailoverSubscriptionManager
void subscribe ( MessageListener & listener,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & ,
const std::string & tag = std::string(),
bool record_this = true );
void subscribe ( LocalQueue & localQueue,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & ,
const std::string & tag=std::string(),
bool record_this = true );
@@ -84,32 +84,6 @@ class FailoverSubscriptionManager
void stop ( );
- void setFlowControl ( const std::string & destintion,
- const FlowControl & flow );
-
- void setFlowControl ( const FlowControl & flow );
-
- const FlowControl & getFlowControl ( ) const;
-
- void setFlowControl ( const std::string & tag,
- uint32_t messages,
- uint32_t bytes,
- bool window=true );
-
- void setFlowControl ( uint32_t messages,
- uint32_t bytes,
- bool window = true
- );
-
- void setAcceptMode ( bool required );
-
- void setAcquireMode ( bool acquire );
-
- void setAckPolicy ( const AckPolicy & autoAck );
-
- AckPolicy & getAckPolicy();
-
-
// Get ready for a failover.
void prepareForFailover ( Session newSession );
void failover ( );
diff --git a/qpid/cpp/src/qpid/client/FlowControl.h b/qpid/cpp/src/qpid/client/FlowControl.h
index 081061ac02..0f5f8596ec 100644
--- a/qpid/cpp/src/qpid/client/FlowControl.h
+++ b/qpid/cpp/src/qpid/client/FlowControl.h
@@ -22,6 +22,8 @@
*
*/
+#include <qpid/sys/IntegerTypes.h>
+
namespace qpid {
namespace client {
@@ -40,9 +42,8 @@ namespace client {
* is renewed.
*
* In "window mode" credit is automatically renewed when a message is
- * acknowledged (@see AckPolicy) In non-window mode credit is not
- * automatically renewed, it must be explicitly re-set (@see
- * SubscriptionManager)
+ * accepted. In non-window mode credit is not automatically renewed,
+ * it must be explicitly re-set (@see Subscription)
*/
struct FlowControl {
static const uint32_t UNLIMITED=0xFFFFFFFF;
diff --git a/qpid/cpp/src/qpid/client/AckPolicy.h b/qpid/cpp/src/qpid/client/Handle.h
index 84bfb6a46a..4fd82b7646 100644
--- a/qpid/cpp/src/qpid/client/AckPolicy.h
+++ b/qpid/cpp/src/qpid/client/Handle.h
@@ -1,7 +1,8 @@
-#ifndef QPID_CLIENT_ACKPOLICY_H
-#define QPID_CLIENT_ACKPOLICY_H
+#ifndef QPID_CLIENT_HANDLE_H
+#define QPID_CLIENT_HANDLE_H
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,38 +22,40 @@
*
*/
-#include "qpid/framing/SequenceSet.h"
-#include "qpid/client/AsyncSession.h"
-#include "qpid/client/Message.h"
-
namespace qpid {
namespace client {
+template <class T> class HandlePrivate;
+
/**
- * Policy for automatic acknowledgement of messages.
- *
- *
- * \ingroup clientapi
+ * A handle is like a pointer: it points to some underlying object.
+ * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the
+ * implicit conversion to bool to test for a null handle.
*/
-class AckPolicy
-{
- framing::SequenceSet accepted;
- size_t interval;
- size_t count;
-
+template <class T> class Handle {
public:
- /**
- * Sends accepts and marks completion of received transfers.
- *
- *@param n: send an accept for every n messages received.
- *n==0 means no automatic acknowledgement.
- */
- AckPolicy(size_t n=1);
- void ack(const Message& msg, AsyncSession session);
- void ackOutstanding(AsyncSession session);};
+ ~Handle();
+ Handle(const Handle&);
+ Handle& operator=(const Handle&);
-}} // namespace qpid::client
+ /**@return true if handle is valid, i.e. not null. */
+ bool isValid() const { return impl; }
+
+ /**@return true if handle is null. It is an error to call any function on a null handle. */
+ bool isNull() const { return !impl; }
+ operator bool() const { return impl; }
+ bool operator !() const { return impl; }
+ void swap(Handle<T>&);
+
+ protected:
+ Handle(T* =0);
+ T* impl;
+
+ friend class HandlePrivate<T>;
+};
+
+}} // namespace qpid::client
-#endif /*!QPID_CLIENT_ACKPOLICY_H*/
+#endif /*!QPID_CLIENT_HANDLE_H*/
diff --git a/qpid/cpp/src/qpid/client/AckPolicy.cpp b/qpid/cpp/src/qpid/client/HandleAccess.h
index 7956ebad0f..f1747db638 100644
--- a/qpid/cpp/src/qpid/client/AckPolicy.cpp
+++ b/qpid/cpp/src/qpid/client/HandleAccess.h
@@ -1,3 +1,6 @@
+#ifndef QPID_CLIENT_HANDLEACCESS_H
+#define QPID_CLIENT_HANDLEACCESS_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,33 +21,21 @@
* under the License.
*
*/
-#include "AckPolicy.h"
+
+#include <Handle.h>
namespace qpid {
namespace client {
-AckPolicy::AckPolicy(size_t n) : interval(n), count(n) {}
-
-void AckPolicy::ack(const Message& msg, AsyncSession session)
-{
- accepted.add(msg.getId());
- if (interval && --count==0) {
- session.markCompleted(msg.getId(), false, true);
- session.messageAccept(accepted);
- accepted.clear();
- count = interval;
- } else {
- session.markCompleted(msg.getId(), false, false);
- }
-}
-
-void AckPolicy::ackOutstanding(AsyncSession session)
+/**
+ * Provide access to the private impl member of a Handle.
+ */
+template <class T>
+class HandleAccess
{
- if (!accepted.empty()) {
- session.messageAccept(accepted);
- accepted.clear();
- session.sendCompletion();
- }
-}
-
+ public:
+ static boost::shared_ptr<T> getImpl(Handle<T>& h) { return h.impl; }
+};
}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_HANDLEACCESS_H*/
diff --git a/qpid/cpp/src/qpid/client/HandlePrivate.h b/qpid/cpp/src/qpid/client/HandlePrivate.h
new file mode 100644
index 0000000000..488ce48075
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/HandlePrivate.h
@@ -0,0 +1,61 @@
+#ifndef QPID_CLIENT_HANDLEPRIVATE_H
+#define QPID_CLIENT_HANDLEPRIVATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <algorithm>
+
+namespace qpid {
+namespace client {
+
+/** @file
+ * Private implementation of handle, include in .cpp file of handle
+ * subclasses _after_ including the declaration of class T.
+ * T can be any class that can be used with boost::intrusive_ptr.
+ */
+
+template <class T>
+Handle<T>::Handle(T* p) : impl(p) { if (impl) boost::intrusive_ptr_add_ref(impl); }
+
+template <class T>
+Handle<T>::~Handle() { if(impl) boost::intrusive_ptr_release(impl); }
+
+template <class T>
+Handle<T>::Handle(const Handle& h) : impl(h.impl) { if(impl) boost::intrusive_ptr_add_ref(impl); }
+
+template <class T>
+Handle<T>& Handle<T>::operator=(const Handle<T>& h) { Handle<T>(h).swap(*this); return *this; }
+
+template <class T>
+void Handle<T>::swap(Handle<T>& h) { std::swap(impl, h.impl); }
+
+
+/** Access to private impl of a Handle */
+template <class T>
+class HandlePrivate {
+ public:
+ static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+};
+
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_HANDLEPRIVATE_H*/
diff --git a/qpid/cpp/src/qpid/client/LocalQueue.cpp b/qpid/cpp/src/qpid/client/LocalQueue.cpp
index 99ab6f0133..229d3766ef 100644
--- a/qpid/cpp/src/qpid/client/LocalQueue.cpp
+++ b/qpid/cpp/src/qpid/client/LocalQueue.cpp
@@ -22,13 +22,15 @@
#include "qpid/Exception.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/reply_exceptions.h"
+#include "HandlePrivate.h"
+#include "SubscriptionImpl.h"
namespace qpid {
namespace client {
using namespace framing;
-LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
+LocalQueue::LocalQueue() {}
LocalQueue::~LocalQueue() {}
Message LocalQueue::pop() { return get(); }
@@ -48,7 +50,9 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) {
if (!ok) return false;
if (content->isA<MessageTransferBody>()) {
result = Message(*content);
- autoAck.ack(result, session);
+ boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription);
+ assert(si);
+ if (si) si->received(result);
return true;
}
else
@@ -56,9 +60,6 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) {
QPID_MSG("Unexpected method: " << content->getMethod()));
}
-void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
-AckPolicy& LocalQueue::getAckPolicy() { return autoAck; }
-
bool LocalQueue::empty() const
{
if (!queue)
diff --git a/qpid/cpp/src/qpid/client/LocalQueue.h b/qpid/cpp/src/qpid/client/LocalQueue.h
index f81065ef3c..9fe72762c3 100644
--- a/qpid/cpp/src/qpid/client/LocalQueue.h
+++ b/qpid/cpp/src/qpid/client/LocalQueue.h
@@ -23,8 +23,8 @@
*/
#include "qpid/client/Message.h"
+#include "qpid/client/Subscription.h"
#include "qpid/client/Demux.h"
-#include "qpid/client/AckPolicy.h"
#include "qpid/sys/Time.h"
namespace qpid {
@@ -38,17 +38,14 @@ namespace client {
*
* \ingroup clientapi
*/
-class LocalQueue
-{
+class LocalQueue {
public:
/** Create a local queue. Subscribe the local queue to a remote broker
* queue with a SubscriptionManager.
*
* LocalQueue is an alternative to implementing a MessageListener.
- *
- *@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
*/
- LocalQueue(AckPolicy ackPolicy=AckPolicy());
+ LocalQueue();
~LocalQueue();
@@ -74,16 +71,9 @@ class LocalQueue
/** Number of messages on the local queue */
size_t size() const;
- /** Set the message acknowledgement policy. @see AckPolicy. */
- void setAckPolicy(AckPolicy);
-
- /** Get the message acknowledgement policy. @see AckPolicy. */
- AckPolicy& getAckPolicy();
-
private:
- Session session;
Demux::QueuePtr queue;
- AckPolicy autoAck;
+ Subscription subscription;
friend class SubscriptionManager;
};
diff --git a/qpid/cpp/src/qpid/client/Subscription.cpp b/qpid/cpp/src/qpid/client/Subscription.cpp
new file mode 100644
index 0000000000..449c7a736c
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Subscription.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Subscription.h"
+#include "SubscriptionImpl.h"
+#include "HandlePrivate.h"
+
+namespace qpid {
+namespace client {
+
+template class Handle<SubscriptionImpl>;
+
+
+std::string Subscription::getName() const { return impl->getName(); }
+std::string Subscription::getQueue() const { return impl->getQueue(); }
+const SubscriptionSettings& Subscription::getSettings() const { return impl->getSettings(); }
+void Subscription::setFlowControl(const FlowControl& f) { impl->setFlowControl(f); }
+void Subscription::setAutoAck(size_t n) { impl->setAutoAck(n); }
+SequenceSet Subscription::getUnacquired() const { return impl->getUnacquired(); }
+SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); }
+void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); }
+void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); }
+Session Subscription::getSession() const { return impl->getSession(); }
+SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
+void Subscription::cancel() { impl->cancel(); }
+
+}} // namespace qpid::client
+
+
diff --git a/qpid/cpp/src/qpid/client/Subscription.h b/qpid/cpp/src/qpid/client/Subscription.h
new file mode 100644
index 0000000000..2ed56d4e8a
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Subscription.h
@@ -0,0 +1,99 @@
+#ifndef QPID_CLIENT_SUBSCRIPTION_H
+#define QPID_CLIENT_SUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/Handle.h"
+#include "qpid/client/Message.h"
+
+namespace qpid {
+namespace client {
+
+class SubscriptionImpl;
+class SubscriptionManager;
+
+/**
+ * A handle to an active subscription. Provides methods to query the subscription status
+ * and control acknowledgement (acquire and accept) of messages.
+ */
+class Subscription : public Handle<SubscriptionImpl> {
+ public:
+ Subscription(SubscriptionImpl* si=0) : Handle<SubscriptionImpl>(si) {}
+
+ /** The name of the subsctription, used as the "destination" for messages from the broker.
+ * Usually the same as the queue name but can be set differently.
+ */
+ std::string getName() const;
+
+ /** Name of the queue this subscription subscribes to */
+ std::string getQueue() const;
+
+ /** Get the flow control and acknowledgement settings for this subscription */
+ const SubscriptionSettings& getSettings() const;
+
+ /** Set the flow control parameters */
+ void setFlowControl(const FlowControl&);
+
+ /** Automatically acknowledge (acquire and accept) batches of n messages.
+ * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept()
+ * to manually acquire and accept messages.
+ */
+ void setAutoAck(unsigned int n);
+
+ /** Get the set of ID's for messages received by this subscription but not yet acquired.
+ * This will always be empty if getSettings().acquireMode=ACQUIRE_MODE_PRE_ACQUIRED
+ */
+ SequenceSet getUnacquired() const;
+
+ /** Get the set of ID's for messages received by this subscription but not yet accepted. */
+ SequenceSet getUnaccepted() const;
+
+ /** Acquire messageIds and remove them from the unacquired set.
+ * oAdd them to the unaccepted set if getSettings().acceptMode == ACCEPT_MODE_EXPLICIT.
+ */
+ void acquire(const SequenceSet& messageIds);
+
+ /** Accept messageIds and remove them from the unaccepted set.
+ *@pre messageIds is a subset of getUnaccepted()
+ */
+ void accept(const SequenceSet& messageIds);
+
+ /* Acquire a single message */
+ void acquire(const Message& m) { acquire(SequenceSet(m.getId())); }
+
+ /* Accept a single message */
+ void accept(const Message& m) { accept(SequenceSet(m.getId())); }
+
+ /** Get the session associated with this subscription */
+ Session getSession() const;
+
+ /** Get the subscription manager associated with this subscription */
+ SubscriptionManager& getSubscriptionManager() const;
+
+ /** Cancel the subscription. */
+ void cancel();
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
new file mode 100644
index 0000000000..3363dda11f
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SubscriptionImpl.h"
+#include "SubscriptionManager.h"
+#include "SubscriptionSettings.h"
+
+namespace qpid {
+namespace client {
+
+using sys::Mutex;
+
+SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
+ : manager(m), name(n), queue(q), settings(s), listener(l)
+{
+ async(manager.getSession()).messageSubscribe(
+ arg::queue=queue,
+ arg::destination=name,
+ arg::acceptMode=settings.acceptMode,
+ arg::acquireMode=settings.acquireMode);
+ setFlowControl(settings.flowControl);
+}
+
+std::string SubscriptionImpl::getName() const { return name; }
+
+std::string SubscriptionImpl::getQueue() const { return queue; }
+
+const SubscriptionSettings& SubscriptionImpl::getSettings() const {
+ Mutex::ScopedLock l(lock);
+ return settings;
+}
+
+void SubscriptionImpl::setFlowControl(const FlowControl& f) {
+ Mutex::ScopedLock l(lock);
+ AsyncSession s=manager.getSession();
+ if (&settings.flowControl != &f) settings.flowControl = f;
+ s.messageSetFlowMode(name, f.window);
+ s.messageFlow(name, CREDIT_UNIT_MESSAGE, f.messages);
+ s.messageFlow(name, CREDIT_UNIT_BYTE, f.bytes);
+ s.sync();
+}
+
+void SubscriptionImpl::setAutoAck(size_t n) {
+ Mutex::ScopedLock l(lock);
+ settings.autoAck = n;
+}
+
+SequenceSet SubscriptionImpl::getUnacquired() const { Mutex::ScopedLock l(lock); return unacquired; }
+SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); return unaccepted; }
+
+void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().messageAcquire(messageIds);
+ unacquired.remove(messageIds);
+ if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
+ unaccepted.add(messageIds);
+}
+
+void SubscriptionImpl::accept(const SequenceSet& messageIds) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().messageAccept(messageIds);
+ unaccepted.remove(messageIds);
+}
+
+Session SubscriptionImpl::getSession() const { return manager.getSession(); }
+
+SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
+
+void SubscriptionImpl::cancel() { manager.cancel(name); }
+
+void SubscriptionImpl::received(Message& m) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().markCompleted(m.getId(), false, false);
+ if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
+ unacquired.add(m.getId());
+ else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
+ unaccepted.add(m.getId());
+
+ if (listener) {
+ Mutex::ScopedUnlock u(lock);
+ listener->received(m);
+ }
+
+ if (settings.autoAck) {
+ if (unacquired.size() + unaccepted.size() >= settings.autoAck) {
+ if (unacquired.size()) {
+ async(manager.getSession()).messageAcquire(unacquired);
+ unaccepted.add(unacquired);
+ unaccepted.clear();
+ }
+ async(manager.getSession()).messageAccept(unaccepted);
+ unaccepted.clear();
+ }
+ }
+}
+
+}} // namespace qpid::client
+
diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.h b/qpid/cpp/src/qpid/client/SubscriptionImpl.h
new file mode 100644
index 0000000000..44fd1a7d6c
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.h
@@ -0,0 +1,99 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONIMPL_H
+#define QPID_CLIENT_SUBSCRIPTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace client {
+
+class SubscriptionManager;
+
+class SubscriptionImpl : public RefCounted, public MessageListener {
+ public:
+ SubscriptionImpl(SubscriptionManager&, const std::string& queue,
+ const SubscriptionSettings&, const std::string& name, MessageListener* =0);
+
+ /** The name of the subsctription, used as the "destination" for messages from the broker.
+ * Usually the same as the queue name but can be set differently.
+ */
+ std::string getName() const;
+
+ /** Name of the queue this subscription subscribes to */
+ std::string getQueue() const;
+
+ /** Get the flow control and acknowledgement settings for this subscription */
+ const SubscriptionSettings& getSettings() const;
+
+ /** Set the flow control parameters */
+ void setFlowControl(const FlowControl&);
+
+ /** Automatically acknowledge (acquire and accept) batches of n messages.
+ * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept()
+ * to manually acquire and accept messages.
+ */
+ void setAutoAck(size_t n);
+
+ /** Get the set of ID's for messages received by this subscription but not yet acquired.
+ * This will always be empty if acquireMode=ACQUIRE_MODE_PRE_ACQUIRED
+ */
+ SequenceSet getUnacquired() const;
+
+ /** Get the set of ID's for messages acquired by this subscription but not yet accepted. */
+ SequenceSet getUnaccepted() const;
+
+ /** Acquire messageIds and remove them from the un-acquired set for the session. */
+ void acquire(const SequenceSet& messageIds);
+
+ /** Accept messageIds and remove them from the un-acceptd set for the session. */
+ void accept(const SequenceSet& messageIds);
+
+ /** Get the session associated with this subscription */
+ Session getSession() const;
+
+ /** Get the subscription manager associated with this subscription */
+ SubscriptionManager& getSubscriptionManager() const;
+
+ /** Cancel the subscription. */
+ void cancel();
+
+ void received(Message&);
+
+ private:
+
+ mutable sys::Mutex lock;
+ SubscriptionManager& manager;
+ std::string name, queue;
+ SubscriptionSettings settings;
+ framing::SequenceSet unacquired, unaccepted;
+ MessageListener* listener;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
index dde93635c8..7e2f2f8595 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -22,6 +22,7 @@
#define _Subscription_
#include "SubscriptionManager.h"
+#include "SubscriptionImpl.h"
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
@@ -34,83 +35,41 @@ namespace qpid {
namespace client {
SubscriptionManager::SubscriptionManager(const Session& s)
- : dispatcher(s), session(s),
- flowControl(UNLIMITED, UNLIMITED, false),
- acceptMode(0), acquireMode(0),
- autoStop(true)
+ : dispatcher(s), session(s), autoStop(true)
{}
-void SubscriptionManager::subscribeInternal(
- const std::string& q, const std::string& dest, const FlowControl& fc)
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
- session.messageSubscribe(
- arg::queue=q, arg::destination=dest,
- arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- if (fc.messages || fc.bytes) // No need to set if all 0.
- setFlowControl(dest, fc);
+ std::string name=n.empty() ? q:n;
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
+ dispatcher.listen(si);
+ return subscriptions[name] = Subscription(si.get());
}
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
- subscribe(listener, q, getFlowControl(), d);
+ std::string name=n.empty() ? q:n;
+ lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+ lq.subscription = Subscription(si.get());
+ return subscriptions[name] = lq.subscription;
}
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const std::string& n)
{
- std::string dest=d.empty() ? q:d;
- dispatcher.listen(dest, &listener, autoAck);
- return subscribeInternal(q, dest, fc);
+ return subscribe(listener, q, defaultSettings, n);
}
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& n)
{
- subscribe(lq, q, getFlowControl(), d);
+ return subscribe(lq, q, defaultSettings, n);
}
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
-{
- std::string dest=d.empty() ? q:d;
- lq.session=session;
- lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
- return subscribeInternal(q, dest, fc);
-}
-
-void SubscriptionManager::setFlowControl(
- const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
-{
- session.messageSetFlowMode(dest, window);
- session.messageFlow(dest, 0, messages);
- session.messageFlow(dest, 1, bytes);
- session.sync();
-}
-
-void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
- setFlowControl(dest, fc.messages, fc.bytes, fc.window);
-}
-
-void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
-
-void SubscriptionManager::setFlowControl(
- uint32_t messages_, uint32_t bytes_, bool window_)
-{
- setFlowControl(FlowControl(messages_, bytes_, window_));
-}
-
-const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
-
-void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
-
-void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
-
-void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
-
-AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
-
-void SubscriptionManager::cancel(const std::string dest)
+void SubscriptionManager::cancel(const std::string& dest)
{
sync(session).messageCancel(dest);
dispatcher.cancel(dest);
@@ -138,10 +97,11 @@ void SubscriptionManager::stop()
bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
LocalQueue lq;
std::string unique = framing::Uuid(true).str();
- subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+ subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
AutoCancel ac(*this, unique);
//first wait for message to be delivered if a timeout has been specified
- if (timeout && lq.get(result, timeout)) return true;
+ if (timeout && lq.get(result, timeout))
+ return true;
//make sure message is not on queue before final check
sync(session).messageFlush(unique);
return lq.get(result, 0);
@@ -149,6 +109,10 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du
Session SubscriptionManager::getSession() const { return session; }
+Subscription SubscriptionManager::getSubscription(const std::string& name) const {
+ return subscriptions.at(name);
+}
+
void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
dispatcher.registerFailoverHandler(fh);
}
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.h b/qpid/cpp/src/qpid/client/SubscriptionManager.h
index 07faa48fee..8b27a2c9b9 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionManager.h
+++ b/qpid/cpp/src/qpid/client/SubscriptionManager.h
@@ -25,9 +25,10 @@
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Completion.h>
#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
-#include <qpid/client/FlowControl.h>
+#include <qpid/client/Subscription.h>
#include <qpid/sys/Runnable.h>
#include <set>
#include <sstream>
@@ -48,15 +49,10 @@ class SubscriptionManager : public sys::Runnable
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
-
qpid::client::Dispatcher dispatcher;
qpid::client::AsyncSession session;
- FlowControl flowControl;
- AckPolicy autoAck;
- bool acceptMode;
- bool acquireMode;
bool autoStop;
+ SubscriptionSettings defaultSettings;
public:
/** Create a new SubscriptionManager associated with a session */
@@ -70,14 +66,13 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
- *@param flow initial FlowControl for the subscription.
- *@param tag Unique destination tag for the listener.
- * If not specified, the queue name is used.
+ *@param settings settings for the subscription.
+ *@param name unique destination name for the subscription, defaults to queue name.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const FlowControl& flow,
- const std::string& tag=std::string());
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -86,13 +81,13 @@ class SubscriptionManager : public sys::Runnable
*
*@param queue Name of the queue to subscribe to.
*@param flow initial FlowControl for the subscription.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
- const std::string& queue,
- const FlowControl& flow,
- const std::string& tag=std::string());
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
/**
* Subscribe a MessagesListener to receive messages from queue.
@@ -102,12 +97,12 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& name=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -115,12 +110,12 @@ class SubscriptionManager : public sys::Runnable
* Incoming messages are stored in the queue for you to retrieve.
*
*@param queue Name of the queue to subscribe to.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
- const std::string& queue,
- const std::string& tag=std::string());
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const std::string& name=std::string());
/** Get a single message from a queue.
@@ -131,8 +126,13 @@ class SubscriptionManager : public sys::Runnable
*/
bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
- /** Cancel a subscription. */
- void cancel(const std::string tag);
+ /** Get a subscription by name, returns a null Subscription handle
+ * if not found.
+ */
+ Subscription getSubscription(const std::string& name) const;
+
+ /** Cancel a subscription. See also: Subscription.cancel() */
+ void cancel(const std::string& name);
/** Deliver messages in the current thread until stop() is called.
* Only one thread may be running in a SubscriptionManager at a time.
@@ -157,53 +157,65 @@ class SubscriptionManager : public sys::Runnable
static const uint32_t UNLIMITED=0xFFFFFFFF;
- /** Set the flow control for destination. */
- void setFlowControl(const std::string& destintion, const FlowControl& flow);
-
- /** Set the default initial flow control for subscriptions that do not specify it. */
- void setFlowControl(const FlowControl& flow);
+ /** Set the flow control for a subscription. */
+ void setFlowControl(const std::string& name, const FlowControl& flow) {
+ getSubscription(name).setFlowControl(flow);
+ }
- /** Get the default flow control for new subscriptions that do not specify it. */
- const FlowControl& getFlowControl() const;
-
- /** Set the flow control for destination tag.
- *@param tag: name of the destination.
+ /** Set the flow control for a subscription.
+ *@param name: name of the subscription.
*@param messages: message credit.
*@param bytes: byte credit.
*@param window: if true use window-based flow control.
*/
- void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true);
+ void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true) {
+ setFlowControl(name, messages, bytes, window);
+ }
- /** Set the initial flow control settings to be applied to each new subscribtion.
- *@param messages: message credit.
- *@param bytes: byte credit.
- *@param window: if true use window-based flow control.
+ /** Set the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; }
+
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
*/
- void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
+ const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; }
- /** Set the accept-mode for new subscriptions. Defaults to true.
- *@param required: if true messages must be confirmed by calling
- *Message::acknowledge() or automatically via an AckPolicy, see setAckPolicy()
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
*/
- void setAcceptMode(bool required);
+ SubscriptionSettings& getDefaultSettings() { return defaultSettings; }
- /** Set the acquire-mode for new subscriptions. Defaults to false.
- *@param acquire: if false messages pre-acquired, if true
- * messages are dequed on acknowledgement or on transfer
- * depending on acceptMode.
+ /**
+ * Set the default flow control settings for subscribe() calls
+ * that don't include a SubscriptionSettings parameter.
+ *
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
*/
- void setAcquireMode(bool acquire);
+ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) {
+ defaultSettings.flowControl = FlowControl(messages, bytes, window);
+ }
- /** Set the acknowledgement policy for new subscriptions.
- * Default is to acknowledge every message automatically.
+ /**
+ *Set the default accept-mode for subscribe() calls that don't
+ *include a SubscriptionSettings parameter.
*/
- void setAckPolicy(const AckPolicy& autoAck);
+ void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; }
- AckPolicy& getAckPolicy();
+ /**
+ * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings.
+ */
+ void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; }
void registerFailoverHandler ( boost::function<void ()> fh );
Session getSession() const;
+
+ private:
+ std::map<std::string, Subscription> subscriptions;
};
/** AutoCancel cancels a subscription in its destructor */
diff --git a/qpid/cpp/src/qpid/client/SubscriptionSettings.h b/qpid/cpp/src/qpid/client/SubscriptionSettings.h
new file mode 100644
index 0000000000..924814c809
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SubscriptionSettings.h
@@ -0,0 +1,62 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+#define QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/FlowControl.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace client {
+
+/** Bring AMQP enum definitions for message class into this namespace. */
+using namespace qpid::framing::message;
+
+/**
+ * Settings for a subscription.
+ */
+struct SubscriptionSettings
+{
+ SubscriptionSettings(
+ FlowControl flow=FlowControl::unlimited(),
+ AcceptMode accept=ACCEPT_MODE_EXPLICIT,
+ AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
+ unsigned int autoAck_=1
+ ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_) {}
+
+ FlowControl flowControl; ///@< Flow control settings. @see FlowControl
+ AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
+ AcquireMode acquireMode; ///@< ACQUIRE_MODE_PRE_ACQUIRED or ACQUIRE_MODE_NOT_ACQUIRED
+
+ /** Automatically acknowledge (acquire and accept) batches of autoAck messages.
+ * 0 means no automatic acknowledgement. What it means to "acknowledge" a message depends on
+ * acceptMode and acquireMode:
+ * - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing
+ * - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire" command
+ * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept" command
+ * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire" and "accept" commands
+ */
+ unsigned int autoAck;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTIONSETTINGS_H*/
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 440605a2e4..abe317aad8 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -20,8 +20,7 @@
*/
#include "unit_test.h"
#include "BrokerFixture.h"
-#include "qpid/client/AckPolicy.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
@@ -52,22 +51,22 @@ struct DummyListener : public sys::Runnable, public MessageListener {
std::vector<Message> messages;
string name;
uint expected;
- Dispatcher dispatcher;
+ SubscriptionManager submgr;
DummyListener(Session& session, const string& n, uint ex) :
- name(n), expected(ex), dispatcher(session) {}
+ name(n), expected(ex), submgr(session) {}
void run()
{
- dispatcher.listen(name, this);
- dispatcher.run();
+ submgr.subscribe(*this, name);
+ submgr.run();
}
void received(Message& msg)
{
messages.push_back(msg);
if (--expected == 0) {
- dispatcher.stop();
+ submgr.stop();
}
}
};
@@ -95,53 +94,30 @@ struct SimpleListener : public MessageListener
struct ClientSessionFixture : public ProxySessionFixture
{
- ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {}
-
- void declareSubscribe(const string& q="my-queue",
- const string& dest="my-dest")
- {
- session.queueDeclare(arg::queue=q);
- session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::acquireMode=1);
- session.messageFlow(arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);//messages
- session.messageFlow(arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);//bytes
+ ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+ session.queueDeclare(arg::queue="my-queue");
}
};
QPID_AUTO_TEST_CASE(testQueueQuery) {
ClientSessionFixture fix;
fix.session = fix.connection.newSession();
- fix.session.queueDeclare(arg::queue="my-queue", arg::alternateExchange="amq.fanout", arg::exclusive=true, arg::autoDelete=true);
- QueueQueryResult result = fix.session.queueQuery(string("my-queue"));
+ fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout",
+ arg::exclusive=true, arg::autoDelete=true);
+ QueueQueryResult result = fix.session.queueQuery("q");
BOOST_CHECK_EQUAL(false, result.getDurable());
BOOST_CHECK_EQUAL(true, result.getExclusive());
- BOOST_CHECK_EQUAL(string("amq.fanout"),
- result.getAlternateExchange());
-}
-
-QPID_AUTO_TEST_CASE(testTransfer)
-{
- ClientSessionFixture fix;
- fix.session=fix.connection.newSession();
- fix.declareSubscribe();
- fix.session.messageTransfer(arg::acceptMode=1, arg::content=TransferContent("my-message", "my-queue"));
- //get & test the message:
- FrameSet::shared_ptr msg = fix.session.get();
- BOOST_CHECK(msg->isA<MessageTransferBody>());
- BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
- //confirm receipt:
- AckPolicy autoAck;
- autoAck.ack(Message(*msg), fix.session);
+ BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange());
}
QPID_AUTO_TEST_CASE(testDispatcher)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
- fix.declareSubscribe();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
- DummyListener listener(fix.session, "my-dest", count);
+ DummyListener listener(fix.session, "my-queue", count);
listener.run();
BOOST_CHECK_EQUAL(count, listener.messages.size());
for (size_t i = 0; i < count; ++i)
@@ -152,9 +128,8 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
- fix.declareSubscribe();
size_t count = 10;
- DummyListener listener(fix.session, "my-dest", count);
+ DummyListener listener(fix.session, "my-queue", count);
sys::Thread t(listener);
for (size_t i = 0; i < count; ++i) {
fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
@@ -190,7 +165,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
{
ClientSessionFixture fix;
fix.session.timeout(60);
- fix.declareSubscribe();
fix.session.suspend();
// Make sure we are still subscribed after resume.
fix.connection.resume(fix.session);
@@ -234,7 +208,7 @@ QPID_AUTO_TEST_CASE(testLocalQueue) {
BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
BOOST_CHECK(lq.empty()); // Credit exhausted.
- fix.subs.setFlowControl("lq", FlowControl::unlimited());
+ fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited());
BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
}
diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp
index f7fe81a709..28f555cf6a 100644
--- a/qpid/cpp/src/tests/QueuePolicyTest.cpp
+++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp
@@ -168,8 +168,10 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy)
ProxySessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- LocalQueue incoming(AckPolicy(0));//no automatic acknowledgements
- f.subs.subscribe(incoming, q);
+ LocalQueue incoming;
+ SubscriptionSettings settings(FlowControl::unlimited());
+ settings.autoAck = 0; // no auto ack.
+ Subscription sub = f.subs.subscribe(incoming, q, settings);
for (int i = 0; i < 5; i++) {
f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
}
diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
index 534ecf70f2..98558f0a76 100644
--- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
@@ -29,7 +29,7 @@
#include "qpid/framing/TransferContent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/LocalQueue.h"
#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
@@ -54,30 +54,6 @@ using std::endl;
Shlib shlib("../.libs/xml.so");
-struct DummyListener : public sys::Runnable, public MessageListener {
- std::vector<Message> messages;
- string name;
- uint expected;
- Dispatcher dispatcher;
-
- DummyListener(Session& session, const string& n, uint ex) :
- name(n), expected(ex), dispatcher(session) {}
-
- void run()
- {
- dispatcher.listen(name, this);
- dispatcher.run();
- }
-
- void received(Message& msg)
- {
- messages.push_back(msg);
- if (--expected == 0)
- dispatcher.stop();
- }
-};
-
-
class SubscribedLocalQueue : public LocalQueue {
private:
SubscriptionManager& subscriptions;
diff --git a/qpid/cpp/src/tests/consume.cpp b/qpid/cpp/src/tests/consume.cpp
index 29c61ada1b..4d74b8ae57 100644
--- a/qpid/cpp/src/tests/consume.cpp
+++ b/qpid/cpp/src/tests/consume.cpp
@@ -75,11 +75,11 @@ struct Client
if (opts.declare)
session.queueDeclare(opts.queue);
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.ack));
- subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
- subs.setFlowControl(opts.count, SubscriptionManager::UNLIMITED,
- false);
- subs.subscribe(lq, opts.queue);
+ LocalQueue lq;
+ SubscriptionSettings settings;
+ settings.acceptMode = opts.ack > 0 ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl(opts.count, SubscriptionManager::UNLIMITED,false);
+ Subscription sub = subs.subscribe(lq, opts.queue, settings);
Message msg;
AbsTime begin=now();
for (size_t i = 0; i < opts.count; ++i) {
@@ -87,7 +87,7 @@ struct Client
QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
}
if (opts.ack != 0)
- subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+ sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch.
AbsTime end=now();
double secs(double(Duration(begin,end))/TIME_SEC);
if (opts.summary) cout << opts.count/secs << endl;
diff --git a/qpid/cpp/src/tests/echotest.cpp b/qpid/cpp/src/tests/echotest.cpp
index a57e2de5ad..7cbf3e7df4 100644
--- a/qpid/cpp/src/tests/echotest.cpp
+++ b/qpid/cpp/src/tests/echotest.cpp
@@ -92,9 +92,7 @@ void Listener::start(uint size)
{
session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true);
request.getDeliveryProperties().setRoutingKey(queue);
- subscriptions.setAcceptMode(1/*not-required*/);
- subscriptions.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
- subscriptions.subscribe(*this, queue);
+ subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
request.getDeliveryProperties().setTimestamp(current_time());
if (size) request.setData(std::string(size, 'X'));
diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp
index 524870a0e8..a980a43322 100644
--- a/qpid/cpp/src/tests/latencytest.cpp
+++ b/qpid/cpp/src/tests/latencytest.cpp
@@ -204,14 +204,15 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0
std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl;
session.queuePurge(arg::queue=queue);
}
+ SubscriptionSettings settings;
if (opts.prefetch) {
- mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
- mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true);
+ settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2));
+ settings.flowControl = FlowControl::messageWindow(opts.prefetch);
} else {
- mgr.setAcceptMode(1/*not-required*/);
- mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+ settings.acceptMode = ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl::unlimited();
}
- mgr.subscribe(*this, queue);
+ mgr.subscribe(*this, queue, settings);
}
void Receiver::test()
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp
index 2e15489525..eb7235a09b 100644
--- a/qpid/cpp/src/tests/perftest.cpp
+++ b/qpid/cpp/src/tests/perftest.cpp
@@ -560,11 +560,12 @@ struct SubscribeThread : public Client {
try {
if (opts.txSub) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack));
- subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1);
- subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
- false);
- subs.subscribe(lq, queue);
+ SubscriptionSettings settings;
+ settings.autoAck = opts.txSub ? opts.txSub : opts.ack;
+ settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT);
+ settings.flowControl = FlowControl::messageCredit(opts.subQuota);
+ LocalQueue lq;
+ Subscription subscription = subs.subscribe(lq, queue, settings);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
if (opts.txSub) {
@@ -603,7 +604,7 @@ struct SubscribeThread : public Client {
}
}
if (opts.txSub || opts.ack)
- lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+ subscription.accept(subscription.getUnaccepted());
if (opts.txSub) {
if (opts.commitAsync) session.txCommit();
else sync(session).txCommit();
diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp
index 0da26fa2c4..7bdc2c32de 100644
--- a/qpid/cpp/src/tests/topic_listener.cpp
+++ b/qpid/cpp/src/tests/topic_listener.cpp
@@ -66,6 +66,7 @@ class Listener : public MessageListener{
public:
Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
virtual void received(Message& msg);
+ Subscription subscription;
};
/**
@@ -118,14 +119,15 @@ int main(int argc, char** argv){
//set up listener
SubscriptionManager mgr(session);
Listener listener(session, mgr, "response", args.transactional);
+ SubscriptionSettings settings;
if (args.prefetch) {
- mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2)));
- mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true);
+ settings.autoAck = (args.ack ? args.ack : (args.prefetch / 2));
+ settings.flowControl = FlowControl::messageCredit(args.prefetch);
} else {
- mgr.setAcceptMode(1/*-not-required*/);
- mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+ settings.acceptMode = ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl::unlimited();
}
- mgr.subscribe(listener, control);
+ listener.subscription = mgr.subscribe(listener, control, settings);
session.sync();
if( args.statusqueue.length() > 0 ) {
@@ -170,7 +172,7 @@ void Listener::received(Message& message){
if(string("TERMINATION_REQUEST") == type){
shutdown();
}else if(string("REPORT_REQUEST") == type){
- mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point
+ subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point
cout <<"Batch ended, sending report." << endl;
//send a report:
report();
diff --git a/qpid/cpp/src/tests/txtest.cpp b/qpid/cpp/src/tests/txtest.cpp
index c285ff9fcc..9d253ddb7f 100644
--- a/qpid/cpp/src/tests/txtest.cpp
+++ b/qpid/cpp/src/tests/txtest.cpp
@@ -139,9 +139,10 @@ struct Transfer : public Client, public Runnable
else session.txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(0));//manual acking
- subs.setFlowControl(opts.msgsPerTx, SubscriptionManager::UNLIMITED, true);
- subs.subscribe(lq, src);
+ LocalQueue lq;
+ SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx));
+ settings.autoAck = 0; // Disabled
+ Subscription sub = subs.subscribe(lq, src, settings);
for (uint t = 0; t < opts.txCount; t++) {
Message in;
@@ -157,7 +158,7 @@ struct Transfer : public Client, public Runnable
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
- lq.getAckPolicy().ackOutstanding(session);
+ sub.accept(sub.getUnaccepted());
if (opts.dtx) {
session.dtxEnd(arg::xid=xid);
session.dtxPrepare(arg::xid=xid);
@@ -230,8 +231,6 @@ struct Controller : public Client
int check()
{
SubscriptionManager subs(session);
- subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
- subs.setAcceptMode(1/*not-required*/);
// Recover DTX transactions (if any)
if (opts.dtx) {
@@ -262,9 +261,10 @@ struct Controller : public Client
StringSet drained;
//drain each queue and verify the correct set of messages are available
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
- //subscribe, allocate credit and flush
- LocalQueue lq(AckPolicy(0));//manual acking
- subs.subscribe(lq, *i, *i);
+ //subscribe, allocate credit and flushn
+ LocalQueue lq;
+ SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE);
+ subs.subscribe(lq, *i, settings);
session.messageFlush(arg::destination=*i);
session.sync();