diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/SubscriptionManager.cpp | 94 |
1 files changed, 29 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp index dde93635c8..7e2f2f8595 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp @@ -22,6 +22,7 @@ #define _Subscription_ #include "SubscriptionManager.h" +#include "SubscriptionImpl.h" #include <qpid/client/Dispatcher.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> @@ -34,83 +35,41 @@ namespace qpid { namespace client { SubscriptionManager::SubscriptionManager(const Session& s) - : dispatcher(s), session(s), - flowControl(UNLIMITED, UNLIMITED, false), - acceptMode(0), acquireMode(0), - autoStop(true) + : dispatcher(s), session(s), autoStop(true) {} -void SubscriptionManager::subscribeInternal( - const std::string& q, const std::string& dest, const FlowControl& fc) +Subscription SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { - session.messageSubscribe( - arg::queue=q, arg::destination=dest, - arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - if (fc.messages || fc.bytes) // No need to set if all 0. - setFlowControl(dest, fc); + std::string name=n.empty() ? q:n; + boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener); + dispatcher.listen(si); + return subscriptions[name] = Subscription(si.get()); } -void SubscriptionManager::subscribe( - MessageListener& listener, const std::string& q, const std::string& d) +Subscription SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { - subscribe(listener, q, getFlowControl(), d); + std::string name=n.empty() ? q:n; + lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name)); + boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); + lq.subscription = Subscription(si.get()); + return subscriptions[name] = lq.subscription; } -void SubscriptionManager::subscribe( - MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d) +Subscription SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const std::string& n) { - std::string dest=d.empty() ? q:d; - dispatcher.listen(dest, &listener, autoAck); - return subscribeInternal(q, dest, fc); + return subscribe(listener, q, defaultSettings, n); } -void SubscriptionManager::subscribe( - LocalQueue& lq, const std::string& q, const std::string& d) +Subscription SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const std::string& n) { - subscribe(lq, q, getFlowControl(), d); + return subscribe(lq, q, defaultSettings, n); } -void SubscriptionManager::subscribe( - LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d) -{ - std::string dest=d.empty() ? q:d; - lq.session=session; - lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); - return subscribeInternal(q, dest, fc); -} - -void SubscriptionManager::setFlowControl( - const std::string& dest, uint32_t messages, uint32_t bytes, bool window) -{ - session.messageSetFlowMode(dest, window); - session.messageFlow(dest, 0, messages); - session.messageFlow(dest, 1, bytes); - session.sync(); -} - -void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) { - setFlowControl(dest, fc.messages, fc.bytes, fc.window); -} - -void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; } - -void SubscriptionManager::setFlowControl( - uint32_t messages_, uint32_t bytes_, bool window_) -{ - setFlowControl(FlowControl(messages_, bytes_, window_)); -} - -const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; } - -void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } - -void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } - -void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; } - -AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } - -void SubscriptionManager::cancel(const std::string dest) +void SubscriptionManager::cancel(const std::string& dest) { sync(session).messageCancel(dest); dispatcher.cancel(dest); @@ -138,10 +97,11 @@ void SubscriptionManager::stop() bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { LocalQueue lq; std::string unique = framing::Uuid(true).str(); - subscribe(lq, queue, FlowControl::messageCredit(1), unique); + subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique); AutoCancel ac(*this, unique); //first wait for message to be delivered if a timeout has been specified - if (timeout && lq.get(result, timeout)) return true; + if (timeout && lq.get(result, timeout)) + return true; //make sure message is not on queue before final check sync(session).messageFlush(unique); return lq.get(result, 0); @@ -149,6 +109,10 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du Session SubscriptionManager::getSession() const { return session; } +Subscription SubscriptionManager::getSubscription(const std::string& name) const { + return subscriptions.at(name); +} + void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) { dispatcher.registerFailoverHandler(fh); } |