summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionManager.cpp94
1 files changed, 29 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
index dde93635c8..7e2f2f8595 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -22,6 +22,7 @@
#define _Subscription_
#include "SubscriptionManager.h"
+#include "SubscriptionImpl.h"
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
@@ -34,83 +35,41 @@ namespace qpid {
namespace client {
SubscriptionManager::SubscriptionManager(const Session& s)
- : dispatcher(s), session(s),
- flowControl(UNLIMITED, UNLIMITED, false),
- acceptMode(0), acquireMode(0),
- autoStop(true)
+ : dispatcher(s), session(s), autoStop(true)
{}
-void SubscriptionManager::subscribeInternal(
- const std::string& q, const std::string& dest, const FlowControl& fc)
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
- session.messageSubscribe(
- arg::queue=q, arg::destination=dest,
- arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- if (fc.messages || fc.bytes) // No need to set if all 0.
- setFlowControl(dest, fc);
+ std::string name=n.empty() ? q:n;
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
+ dispatcher.listen(si);
+ return subscriptions[name] = Subscription(si.get());
}
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
- subscribe(listener, q, getFlowControl(), d);
+ std::string name=n.empty() ? q:n;
+ lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+ lq.subscription = Subscription(si.get());
+ return subscriptions[name] = lq.subscription;
}
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const std::string& n)
{
- std::string dest=d.empty() ? q:d;
- dispatcher.listen(dest, &listener, autoAck);
- return subscribeInternal(q, dest, fc);
+ return subscribe(listener, q, defaultSettings, n);
}
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& n)
{
- subscribe(lq, q, getFlowControl(), d);
+ return subscribe(lq, q, defaultSettings, n);
}
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
-{
- std::string dest=d.empty() ? q:d;
- lq.session=session;
- lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
- return subscribeInternal(q, dest, fc);
-}
-
-void SubscriptionManager::setFlowControl(
- const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
-{
- session.messageSetFlowMode(dest, window);
- session.messageFlow(dest, 0, messages);
- session.messageFlow(dest, 1, bytes);
- session.sync();
-}
-
-void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
- setFlowControl(dest, fc.messages, fc.bytes, fc.window);
-}
-
-void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
-
-void SubscriptionManager::setFlowControl(
- uint32_t messages_, uint32_t bytes_, bool window_)
-{
- setFlowControl(FlowControl(messages_, bytes_, window_));
-}
-
-const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
-
-void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
-
-void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
-
-void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
-
-AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
-
-void SubscriptionManager::cancel(const std::string dest)
+void SubscriptionManager::cancel(const std::string& dest)
{
sync(session).messageCancel(dest);
dispatcher.cancel(dest);
@@ -138,10 +97,11 @@ void SubscriptionManager::stop()
bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
LocalQueue lq;
std::string unique = framing::Uuid(true).str();
- subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+ subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
AutoCancel ac(*this, unique);
//first wait for message to be delivered if a timeout has been specified
- if (timeout && lq.get(result, timeout)) return true;
+ if (timeout && lq.get(result, timeout))
+ return true;
//make sure message is not on queue before final check
sync(session).messageFlush(unique);
return lq.get(result, 0);
@@ -149,6 +109,10 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du
Session SubscriptionManager::getSession() const { return session; }
+Subscription SubscriptionManager::getSubscription(const std::string& name) const {
+ return subscriptions.at(name);
+}
+
void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
dispatcher.registerFailoverHandler(fh);
}