summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-06-22 13:18:05 +0000
committerGordon Sim <gsim@apache.org>2012-06-22 13:18:05 +0000
commitff5e1370d72982cb7910affd9e386d3326465205 (patch)
tree13b82a24b406495c263f3361d6576f37da55c7a6
parent9234e7ab4efdb111cf96087faa7fe7bc76c14fe3 (diff)
downloadqpid-python-ff5e1370d72982cb7910affd9e386d3326465205.tar.gz
QPID-4075: Raise delete event for autodeleted queues also
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1352874 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp20
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.h3
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp5
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py1
8 files changed, 33 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f3f206d571..3202a2676f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -49,6 +49,7 @@
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <iostream>
#include <algorithm>
@@ -1484,12 +1485,15 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
+
+ if (broker.getManagementAgent())
+ broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
}
}
@@ -1497,9 +1501,11 @@ struct AutoDeleteTask : qpid::sys::TimerTask
{
Broker& broker;
Queue::shared_ptr queue;
+ std::string connectionId;
+ std::string userId;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {}
void fire()
{
@@ -1507,19 +1513,19 @@ struct AutoDeleteTask : qpid::sys::TimerTask
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
};
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
- queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index ed1f63504b..a31e0002ea 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -344,7 +344,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* exclusive owner
*/
static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
- static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+ static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 5786370598..9a84db547c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
- closeComplete(false)
+ closeComplete(false),
+ connectionId(getSession().getConnection().getUrl())
{}
SemanticState::~SemanticState() {
@@ -428,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(session.getBroker(), queue);
+ Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
}
}
c->cancel();
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index a3cced9c67..15928ce599 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -182,6 +182,8 @@ class SemanticState : private boost::noncopyable {
const bool authMsg;
const std::string userID;
bool closeComplete;
+ //needed for queue delete events in auto-delete:
+ const std::string connectionId;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index f7dee0bcab..7469fb3af3 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -206,7 +206,10 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
}
}
-SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker())
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
+ : HandlerHelper(session), broker(getBroker()),
+ //record connection id and userid for deleting exclsuive queues after session has ended:
+ connectionId(getConnection().getUrl()), userId(getConnection().getUserId())
{}
@@ -225,7 +228,7 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q);
+ Queue::tryAutoDelete(broker, q, connectionId, userId);
}
exclusiveQueues.erase(exclusiveQueues.begin());
}
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h
index bc056538b1..3cc745f96c 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.h
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h
@@ -121,6 +121,9 @@ class Queue;
{
Broker& broker;
std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+ //connectionId and userId are needed for queue-delete events for auto deleted, exclusive queues
+ std::string connectionId;
+ std::string userId;
public:
QueueHandlerImpl(SemanticState& session);
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 6deeb566dc..7a02b871db 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -68,6 +68,7 @@ struct Options : public qpid::Options
bool reportHeader;
string readyAddress;
uint receiveRate;
+ std::string replyto;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -114,6 +115,7 @@ struct Options : public qpid::Options
("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
+ ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -246,6 +248,9 @@ int main(int argc, char ** argv)
s = session.createSender(msg.getReplyTo());
s.setCapacity(opts.capacity);
}
+ if (!opts.replyto.empty()) {
+ msg.setReplyTo(Address(opts.replyto));
+ }
s.send(msg);
}
if (opts.receiveRate) {
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
index 107b34c82b..312dc22645 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -36,3 +36,4 @@ from extensions import *
from msg_groups import *
from new_api import *
from stats import *
+from qmf_events import *