summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/Subscription.h2
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.cpp10
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.h15
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp12
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp19
5 files changed, 54 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h
index 6d9342bf09..47bb5d42a5 100644
--- a/cpp/src/qpid/client/Subscription.h
+++ b/cpp/src/qpid/client/Subscription.h
@@ -107,6 +107,8 @@ class Subscription : public Handle<SubscriptionImpl> {
/** Grant the specified amount of byte credit */
void grantByteCredit(uint32_t);
+
+ friend class SubscriptionManager;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp
index 5ea87110c2..e09a4c142e 100644
--- a/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ b/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -145,5 +145,15 @@ void SubscriptionImpl::received(Message& m) {
}
}
+Demux::QueuePtr SubscriptionImpl::divert()
+{
+ demuxRule = std::auto_ptr<ScopedDivert>(new ScopedDivert(name, manager.getSession().getExecution().getDemux()));
+ return demuxRule->getQueue();
+}
+
+void SubscriptionImpl::cancelDiversion() {
+ demuxRule.reset();
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h
index c4c486daeb..74fbacb951 100644
--- a/cpp/src/qpid/client/SubscriptionImpl.h
+++ b/cpp/src/qpid/client/SubscriptionImpl.h
@@ -25,10 +25,12 @@
#include "qpid/client/SubscriptionSettings.h"
#include "qpid/client/Session.h"
#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Mutex.h"
#include "qpid/RefCounted.h"
+#include <memory>
namespace qpid {
namespace client {
@@ -93,7 +95,17 @@ class SubscriptionImpl : public RefCounted, public MessageListener {
void grantCredit(framing::message::CreditUnit unit, uint32_t value);
void received(Message&);
-
+
+ /**
+ * Set up demux diversion for messages sent to this subscription
+ */
+ Demux::QueuePtr divert();
+ /**
+ * Cancel any demux diversion that may have been setup for this
+ * subscription
+ */
+ void cancelDiversion();
+
private:
mutable sys::Mutex lock;
@@ -102,6 +114,7 @@ class SubscriptionImpl : public RefCounted, public MessageListener {
SubscriptionSettings settings;
framing::SequenceSet unacquired, unaccepted;
MessageListener* listener;
+ std::auto_ptr<ScopedDivert> demuxRule;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index c91ae178ac..f9d0ecbf80 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -53,8 +53,8 @@ Subscription SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
std::string name=n.empty() ? q:n;
- lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+ lq.queue=si->divert();
si->subscribe();
lq.subscription = Subscription(si.get());
return subscriptions[name] = lq.subscription;
@@ -74,8 +74,14 @@ Subscription SubscriptionManager::subscribe(
void SubscriptionManager::cancel(const std::string& dest)
{
- sync(session).messageCancel(dest);
- dispatcher.cancel(dest);
+ std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
+ if (i != subscriptions.end()) {
+ sync(session).messageCancel(dest);
+ dispatcher.cancel(dest);
+ Subscription s = i->second;
+ if (s.isValid()) subscriptions[dest].impl->cancelDiversion();
+ subscriptions.erase(dest);
+ }
}
void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 454632dd39..2d9239131e 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -468,6 +468,25 @@ QPID_AUTO_TEST_CASE(testExclusiveBinding) {
BOOST_CHECK(!fix.subs.get(got, "queue-2"));
}
+QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) {
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(arg::queue="some-queue", arg::exclusive=true, arg::autoDelete=true);
+ LocalQueue p, q;
+ fix.subs.subscribe(p, "some-queue");
+ fix.subs.cancel("some-queue");
+ fix.subs.subscribe(q, "some-queue");
+
+ fix.session.messageTransfer(arg::content=Message("some-data", "some-queue"));
+ fix.session.messageFlush(arg::destination="some-queue");
+
+ Message got;
+ BOOST_CHECK(!p.get(got));
+
+ BOOST_CHECK(q.get(got));
+ BOOST_CHECK_EQUAL("some-data", got.getData());
+ BOOST_CHECK(!q.get(got));
+}
+
QPID_AUTO_TEST_SUITE_END()