summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-04-07 21:22:32 +0000
committerAlan Conway <aconway@apache.org>2014-04-07 21:22:32 +0000
commit8ea0f79d78edd0a0825547ecc618e3fa63a2b93f (patch)
tree1351b13a08a8827f6d64dac1b2700efa93343e56 /qpid
parent8b0808b498dd5e3fe5e5d04e9e9c9492206036e2 (diff)
downloadqpid-python-8ea0f79d78edd0a0825547ecc618e3fa63a2b93f.tar.gz
QPID-5667: C++ broker: QMF subscribe events are not raised with AMQP 1.0
The raise event logic for subscribe/unsubscribe events was in 0-10 specific code. Moved it into Queue.cpp so events are generated regardless of protocol. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1585587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h8
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h10
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp19
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp2
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
8 files changed, 34 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 662b0f937d..e01eff98be 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -49,8 +49,8 @@ class Consumer : public QueueCursor {
public:
typedef boost::shared_ptr<Consumer> shared_ptr;
- Consumer(const std::string& _name, SubscriptionType type)
- : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name) {}
+ Consumer(const std::string& _name, SubscriptionType type, const std::string& _tag)
+ : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name), tag(_tag) {}
virtual ~Consumer(){}
bool preAcquires() const { return acquires; }
@@ -88,8 +88,12 @@ class Consumer : public QueueCursor {
QueueCursor getCursor() const { return *this; }
void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; }
+
+ const std::string& getTag() const { return tag; }
+
protected:
//framing::SequenceNumber position;
+ const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
private:
friend class QueueListeners;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index d8bc59eaf1..941c0c88a2 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -53,6 +53,8 @@
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
#include <iostream>
#include <algorithm>
@@ -534,7 +536,9 @@ void Queue::releaseFromUse(bool controlling)
if (trydelete) scheduleAutoDelete();
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive,
+ const framing::FieldTable& arguments,
+ const std::string& connectionId, const std::string& userId)
{
{
Mutex::ScopedLock locker(messageLock);
@@ -573,9 +577,15 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
if (mgmtObject != 0 && c->isCounted()) {
mgmtObject->inc_consumerCount();
}
+ ManagementAgent* agent = broker->getManagementAgent();
+ if (agent) {
+ agent->raiseEvent(
+ _qmf::EventSubscribe(connectionId, userId, name,
+ c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+ }
}
-void Queue::cancel(Consumer::shared_ptr c)
+void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, const std::string& userId)
{
removeListener(c);
if(c->isCounted())
@@ -599,6 +609,8 @@ void Queue::cancel(Consumer::shared_ptr c)
scheduleAutoDelete();
}
}
+ ManagementAgent* agent = broker->getManagementAgent();
+ if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
}
/**
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 21a8b7ef99..683468e9a4 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -348,8 +348,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN void recover(Message& msg);
QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
- bool exclusive = false);
- QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
+ bool exclusive = false,
+ const framing::FieldTable& arguments = framing::FieldTable(),
+ const std::string& connectionId=std::string(),
+ const std::string& userId=std::string());
+
+ QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c,
+ const std::string& connectionId=std::string(),
+ const std::string& userId=std::string());
/**
* Used to indicate that the queue is being used in some other
* context than by a subscriber. The controlling flag should only
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 8d7acda673..82fa3a8f19 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -141,7 +141,7 @@ void SemanticState::consume(const string& tag,
c = ConsumerImpl::shared_ptr(
new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag,
resumeId, resumeTtl, arguments));
- queue->consume(c, exclusive);//may throw exception
+ queue->consume(c, exclusive, arguments, connectionId, userID);//may throw exception
consumers[tag] = c;
}
@@ -323,7 +323,7 @@ SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent,
const framing::FieldTable& _arguments
) :
-Consumer(_name, type),
+ Consumer(_name, type, _tag),
parent(_parent),
queue(_queue),
ackExpected(ack),
@@ -331,7 +331,6 @@ Consumer(_name, type),
blocked(true),
exclusive(_exclusive),
resumeId(_resumeId),
- tag(_tag),
selector(returnSelector(_arguments.getAsString(APACHE_SELECTOR))),
resumeTtl(_resumeTtl),
arguments(_arguments),
@@ -472,7 +471,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
disable(c);
Queue::shared_ptr queue = c->getQueue();
if(queue) {
- queue->cancel(c);
+ queue->cancel(c, connectionId, userID);
}
c->cancel();
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 8fb796add7..e5456067a8 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -209,7 +209,6 @@ class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask,
bool blocked;
bool exclusive;
std::string resumeId;
- const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
boost::shared_ptr<Selector> selector;
uint64_t resumeTtl;
framing::FieldTable arguments;
@@ -270,7 +269,6 @@ class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask,
bool isAcquire() const { return acquire; }
bool isExclusive() const { return exclusive; }
std::string getResumeId() const { return resumeId; };
- const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
uint32_t getDeliveryCount() const { return deliveryCount; }
void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 7c2d1cf9f5..888dc68833 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -29,15 +29,7 @@
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/SessionState.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
-#include "qmf/org/apache/qpid/broker/EventBind.h"
-#include "qmf/org/apache/qpid/broker/EventUnbind.h"
-#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
-#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
-#include <boost/format.hpp>
+ #include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -50,7 +42,6 @@ using namespace qpid;
using namespace qpid::framing;
using namespace qpid::framing::dtx;
using namespace qpid::management;
-namespace _qmf = qmf::org::apache::qpid::broker;
typedef std::vector<Queue::shared_ptr> QueueVector;
@@ -425,10 +416,6 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
acceptMode == 0, acquireMode == 0, exclusive,
resumeId, resumeTtl, arguments);
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventSubscribe(getConnection().getMgmtId(), getConnection().getUserId(),
- queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
<< " destination:" << destination
<< " user:" << getConnection().getUserId()
@@ -443,10 +430,6 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
if (!state.cancel(destination)) {
throw NotFoundException(QPID_MSG("No such subscription: " << destination));
}
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getMgmtId(), getConnection().getUserId(), destination));
QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
<< " user:" << getConnection().getUserId()
<< " rhost:" << getConnection().getMgmtId() );
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 2ae676a66f..7a1dbef9db 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -59,7 +59,7 @@ bool requested_unreliable(pn_link_t* link)
OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
- Consumer(pn_link_name(l), type),
+ Consumer(pn_link_name(l), type, target),
exclusive(e),
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index a9769740d6..0fd35a000b 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -64,7 +64,7 @@ public:
QueueCursor lastCursor;
Message lastMessage;
bool received;
- TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {};
+ TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER, ""), received(false) {};
virtual bool deliver(const QueueCursor& cursor, const Message& message){
lastCursor = cursor;