diff options
author | Alan Conway <aconway@apache.org> | 2014-04-07 21:22:32 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-04-07 21:22:32 +0000 |
commit | 8ea0f79d78edd0a0825547ecc618e3fa63a2b93f (patch) | |
tree | 1351b13a08a8827f6d64dac1b2700efa93343e56 /qpid | |
parent | 8b0808b498dd5e3fe5e5d04e9e9c9492206036e2 (diff) | |
download | qpid-python-8ea0f79d78edd0a0825547ecc618e3fa63a2b93f.tar.gz |
QPID-5667: C++ broker: QMF subscribe events are not raised with AMQP 1.0
The raise event logic for subscribe/unsubscribe events was in 0-10 specific code.
Moved it into Queue.cpp so events are generated regardless of protocol.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1585587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 2 |
8 files changed, 34 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 662b0f937d..e01eff98be 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -49,8 +49,8 @@ class Consumer : public QueueCursor { public: typedef boost::shared_ptr<Consumer> shared_ptr; - Consumer(const std::string& _name, SubscriptionType type) - : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name) {} + Consumer(const std::string& _name, SubscriptionType type, const std::string& _tag) + : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name), tag(_tag) {} virtual ~Consumer(){} bool preAcquires() const { return acquires; } @@ -88,8 +88,12 @@ class Consumer : public QueueCursor { QueueCursor getCursor() const { return *this; } void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; } + + const std::string& getTag() const { return tag; } + protected: //framing::SequenceNumber position; + const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command private: friend class QueueListeners; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d8bc59eaf1..941c0c88a2 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -53,6 +53,8 @@ #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" +#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h" #include <iostream> #include <algorithm> @@ -534,7 +536,9 @@ void Queue::releaseFromUse(bool controlling) if (trydelete) scheduleAutoDelete(); } -void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive, + const framing::FieldTable& arguments, + const std::string& connectionId, const std::string& userId) { { Mutex::ScopedLock locker(messageLock); @@ -573,9 +577,15 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) if (mgmtObject != 0 && c->isCounted()) { mgmtObject->inc_consumerCount(); } + ManagementAgent* agent = broker->getManagementAgent(); + if (agent) { + agent->raiseEvent( + _qmf::EventSubscribe(connectionId, userId, name, + c->getTag(), requestExclusive, ManagementAgent::toMap(arguments))); + } } -void Queue::cancel(Consumer::shared_ptr c) +void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, const std::string& userId) { removeListener(c); if(c->isCounted()) @@ -599,6 +609,8 @@ void Queue::cancel(Consumer::shared_ptr c) scheduleAutoDelete(); } } + ManagementAgent* agent = broker->getManagementAgent(); + if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag())); } /** diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 21a8b7ef99..683468e9a4 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -348,8 +348,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void recover(Message& msg); QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c, - bool exclusive = false); - QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); + bool exclusive = false, + const framing::FieldTable& arguments = framing::FieldTable(), + const std::string& connectionId=std::string(), + const std::string& userId=std::string()); + + QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c, + const std::string& connectionId=std::string(), + const std::string& userId=std::string()); /** * Used to indicate that the queue is being used in some other * context than by a subscriber. The controlling flag should only diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 8d7acda673..82fa3a8f19 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -141,7 +141,7 @@ void SemanticState::consume(const string& tag, c = ConsumerImpl::shared_ptr( new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag, resumeId, resumeTtl, arguments)); - queue->consume(c, exclusive);//may throw exception + queue->consume(c, exclusive, arguments, connectionId, userID);//may throw exception consumers[tag] = c; } @@ -323,7 +323,7 @@ SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments ) : -Consumer(_name, type), + Consumer(_name, type, _tag), parent(_parent), queue(_queue), ackExpected(ack), @@ -331,7 +331,6 @@ Consumer(_name, type), blocked(true), exclusive(_exclusive), resumeId(_resumeId), - tag(_tag), selector(returnSelector(_arguments.getAsString(APACHE_SELECTOR))), resumeTtl(_resumeTtl), arguments(_arguments), @@ -472,7 +471,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) disable(c); Queue::shared_ptr queue = c->getQueue(); if(queue) { - queue->cancel(c); + queue->cancel(c, connectionId, userID); } c->cancel(); } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 8fb796add7..e5456067a8 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -209,7 +209,6 @@ class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask, bool blocked; bool exclusive; std::string resumeId; - const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command boost::shared_ptr<Selector> selector; uint64_t resumeTtl; framing::FieldTable arguments; @@ -270,7 +269,6 @@ class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask, bool isAcquire() const { return acquire; } bool isExclusive() const { return exclusive; } std::string getResumeId() const { return resumeId; }; - const std::string& getTag() const { return tag; } uint64_t getResumeTtl() const { return resumeTtl; } uint32_t getDeliveryCount() const { return deliveryCount; } void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 7c2d1cf9f5..888dc68833 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -29,15 +29,7 @@ #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" #include "qpid/broker/SessionState.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" -#include "qmf/org/apache/qpid/broker/EventBind.h" -#include "qmf/org/apache/qpid/broker/EventUnbind.h" -#include "qmf/org/apache/qpid/broker/EventSubscribe.h" -#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h" -#include <boost/format.hpp> + #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -50,7 +42,6 @@ using namespace qpid; using namespace qpid::framing; using namespace qpid::framing::dtx; using namespace qpid::management; -namespace _qmf = qmf::org::apache::qpid::broker; typedef std::vector<Queue::shared_ptr> QueueVector; @@ -425,10 +416,6 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments); - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventSubscribe(getConnection().getMgmtId(), getConnection().getUserId(), - queueName, destination, exclusive, ManagementAgent::toMap(arguments))); QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName << " destination:" << destination << " user:" << getConnection().getUserId() @@ -443,10 +430,6 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) if (!state.cancel(destination)) { throw NotFoundException(QPID_MSG("No such subscription: " << destination)); } - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getMgmtId(), getConnection().getUserId(), destination)); QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination << " user:" << getConnection().getUserId() << " rhost:" << getConnection().getMgmtId() ); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 2ae676a66f..7a1dbef9db 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -59,7 +59,7 @@ bool requested_unreliable(pn_link_t* link) OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p) : Outgoing(broker, session, source, target, pn_link_name(l)), - Consumer(pn_link_name(l), type), + Consumer(pn_link_name(l), type, target), exclusive(e), isControllingUser(p), queue(q), deliveries(5000), link(l), out(o), diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index a9769740d6..0fd35a000b 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -64,7 +64,7 @@ public: QueueCursor lastCursor; Message lastMessage; bool received; - TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {}; + TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER, ""), received(false) {}; virtual bool deliver(const QueueCursor& cursor, const Message& message){ lastCursor = cursor; |