diff options
author | Gordon Sim <gsim@apache.org> | 2012-06-22 13:18:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2012-06-22 13:18:05 +0000 |
commit | ff5e1370d72982cb7910affd9e386d3326465205 (patch) | |
tree | 13b82a24b406495c263f3361d6576f37da55c7a6 | |
parent | 9234e7ab4efdb111cf96087faa7fe7bc76c14fe3 (diff) | |
download | qpid-python-ff5e1370d72982cb7910affd9e386d3326465205.tar.gz |
QPID-4075: Raise delete event for autodeleted queues also
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1352874 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 5 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py | 1 |
8 files changed, 33 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f3f206d571..3202a2676f 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -49,6 +49,7 @@ #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include <iostream> #include <algorithm> @@ -1484,12 +1485,15 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) +void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->destroyed(); + + if (broker.getManagementAgent()) + broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName())); } } @@ -1497,9 +1501,11 @@ struct AutoDeleteTask : qpid::sys::TimerTask { Broker& broker; Queue::shared_ptr queue; + std::string connectionId; + std::string userId; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {} + AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {} void fire() { @@ -1507,19 +1513,19 @@ struct AutoDeleteTask : qpid::sys::TimerTask //created, but then became unused again before the task fired; //in this case ignore this request as there will have already //been a later task added - tryAutoDeleteImpl(broker, queue); + tryAutoDeleteImpl(broker, queue, connectionId, userId); } }; -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) { if (queue->autoDeleteTimeout && queue->canAutoDelete()) { AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); - queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); + queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time)); broker.getClusterTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); } else { - tryAutoDeleteImpl(broker, queue); + tryAutoDeleteImpl(broker, queue, connectionId, userId); } } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index ed1f63504b..a31e0002ea 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -344,7 +344,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, * exclusive owner */ static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer); - static void tryAutoDelete(Broker& broker, Queue::shared_ptr); + static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId); virtual void setExternalQueueStore(ExternalQueueStore* inst); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 5786370598..9a84db547c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) dtxSelected(false), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), userID(getSession().getConnection().getUserId()), - closeComplete(false) + closeComplete(false), + connectionId(getSession().getConnection().getUrl()) {} SemanticState::~SemanticState() { @@ -428,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - Queue::tryAutoDelete(session.getBroker(), queue); + Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID); } } c->cancel(); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index a3cced9c67..15928ce599 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -182,6 +182,8 @@ class SemanticState : private boost::noncopyable { const bool authMsg; const std::string userID; bool closeComplete; + //needed for queue delete events in auto-delete: + const std::string connectionId; void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void checkDtxTimeout(); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index f7dee0bcab..7469fb3af3 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -206,7 +206,10 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string } } -SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker()) +SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) + : HandlerHelper(session), broker(getBroker()), + //record connection id and userid for deleting exclsuive queues after session has ended: + connectionId(getConnection().getUrl()), userId(getConnection().getUserId()) {} @@ -225,7 +228,7 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); if (q->canAutoDelete()) { - Queue::tryAutoDelete(broker, q); + Queue::tryAutoDelete(broker, q, connectionId, userId); } exclusiveQueues.erase(exclusiveQueues.begin()); } diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h index bc056538b1..3cc745f96c 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.h +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h @@ -121,6 +121,9 @@ class Queue; { Broker& broker; std::vector< boost::shared_ptr<Queue> > exclusiveQueues; + //connectionId and userId are needed for queue-delete events for auto deleted, exclusive queues + std::string connectionId; + std::string userId; public: QueueHandlerImpl(SemanticState& session); diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 6deeb566dc..7a02b871db 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -68,6 +68,7 @@ struct Options : public qpid::Options bool reportHeader; string readyAddress; uint receiveRate; + std::string replyto; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -114,6 +115,7 @@ struct Options : public qpid::Options ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.") + ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -246,6 +248,9 @@ int main(int argc, char ** argv) s = session.createSender(msg.getReplyTo()); s.setCapacity(opts.capacity); } + if (!opts.replyto.empty()) { + msg.setReplyTo(Address(opts.replyto)); + } s.send(msg); } if (opts.receiveRate) { diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py index 107b34c82b..312dc22645 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -36,3 +36,4 @@ from extensions import * from msg_groups import * from new_api import * from stats import * +from qmf_events import * |