diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/client/Subscription.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 12 |
4 files changed, 35 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h index 6d9342bf09..47bb5d42a5 100644 --- a/cpp/src/qpid/client/Subscription.h +++ b/cpp/src/qpid/client/Subscription.h @@ -107,6 +107,8 @@ class Subscription : public Handle<SubscriptionImpl> { /** Grant the specified amount of byte credit */ void grantByteCredit(uint32_t); + + friend class SubscriptionManager; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp index 5ea87110c2..e09a4c142e 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -145,5 +145,15 @@ void SubscriptionImpl::received(Message& m) { } } +Demux::QueuePtr SubscriptionImpl::divert() +{ + demuxRule = std::auto_ptr<ScopedDivert>(new ScopedDivert(name, manager.getSession().getExecution().getDemux())); + return demuxRule->getQueue(); +} + +void SubscriptionImpl::cancelDiversion() { + demuxRule.reset(); +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h index c4c486daeb..74fbacb951 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.h +++ b/cpp/src/qpid/client/SubscriptionImpl.h @@ -25,10 +25,12 @@ #include "qpid/client/SubscriptionSettings.h" #include "qpid/client/Session.h" #include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" #include "qpid/framing/enum.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" #include "qpid/RefCounted.h" +#include <memory> namespace qpid { namespace client { @@ -93,7 +95,17 @@ class SubscriptionImpl : public RefCounted, public MessageListener { void grantCredit(framing::message::CreditUnit unit, uint32_t value); void received(Message&); - + + /** + * Set up demux diversion for messages sent to this subscription + */ + Demux::QueuePtr divert(); + /** + * Cancel any demux diversion that may have been setup for this + * subscription + */ + void cancelDiversion(); + private: mutable sys::Mutex lock; @@ -102,6 +114,7 @@ class SubscriptionImpl : public RefCounted, public MessageListener { SubscriptionSettings settings; framing::SequenceSet unacquired, unaccepted; MessageListener* listener; + std::auto_ptr<ScopedDivert> demuxRule; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index c91ae178ac..f9d0ecbf80 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -53,8 +53,8 @@ Subscription SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { std::string name=n.empty() ? q:n; - lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name)); boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); + lq.queue=si->divert(); si->subscribe(); lq.subscription = Subscription(si.get()); return subscriptions[name] = lq.subscription; @@ -74,8 +74,14 @@ Subscription SubscriptionManager::subscribe( void SubscriptionManager::cancel(const std::string& dest) { - sync(session).messageCancel(dest); - dispatcher.cancel(dest); + std::map<std::string, Subscription>::iterator i = subscriptions.find(dest); + if (i != subscriptions.end()) { + sync(session).messageCancel(dest); + dispatcher.cancel(dest); + Subscription s = i->second; + if (s.isValid()) subscriptions[dest].impl->cancelDiversion(); + subscriptions.erase(dest); + } } void SubscriptionManager::setAutoStop(bool set) { autoStop=set; } |