summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-17 08:59:44 +0000
committerGordon Sim <gsim@apache.org>2007-10-17 08:59:44 +0000
commitc619794e8a903e716bc5117179ea0ab1e24e1254 (patch)
treee4cf22d8de792053a4bb7b594b0e1cc2b2ca8abc /cpp/src/qpid/broker/Queue.cpp
parentde86223091817b091b8f49774853d927c00eed9b (diff)
downloadqpid-python-c619794e8a903e716bc5117179ea0ab1e24e1254.tar.gz
Use shared pointers for consumers (held by queues and sessions) to prevent having to hold lock across deliver() while avoiding invocation on stale pointers.
Ensure auto-deleted queues are properly cleaned up (i.e. are unbound from exchanges) to avoid leaking memory as messages are accumulated in inaccessible queues. (some cleanup to follow on this) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585417 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp37
1 files changed, 24 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e4a6449e08..8c990795e7 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -22,6 +22,7 @@
#include <boost/format.hpp>
#include "qpid/log/Statement.h"
+#include "Broker.h"
#include "Queue.h"
#include "Exchange.h"
#include "DeliverableMessage.h"
@@ -47,7 +48,6 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
next(0),
- exclusive(0),
persistenceId(0),
serializer(false),
dispatchCallback(*this)
@@ -80,7 +80,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}else {
push(msg);
}
- QPID_LOG(debug, "Message Enqueued: " << msg->getApplicationHeaders());
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
serializer.execute(dispatchCallback);
}
}
@@ -124,7 +124,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-void Queue::requestDispatch(Consumer* c){
+void Queue::requestDispatch(Consumer::ptr c){
if (!c || c->preAcquires()) {
serializer.execute(dispatchCallback);
} else {
@@ -138,12 +138,12 @@ void Queue::flush(DispatchCompletion& completion)
serializer.execute(f);
}
-Consumer* Queue::allocate()
+Consumer::ptr Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
if(acquirers.empty()){
- return 0;
+ return Consumer::ptr();
}else if(exclusive){
return exclusive;
}else{
@@ -154,14 +154,16 @@ Consumer* Queue::allocate()
bool Queue::dispatch(QueuedMessage& msg)
{
- Consumer* c = allocate();
- Consumer* first = c;
+ Consumer::ptr c = allocate();
+ Consumer::ptr first = c;
while(c){
if(c->deliver(msg)) {
return true;
} else {
c = allocate();
- if (c == first) c = 0;
+ if (c == first) {
+ break;
+ }
}
}
return false;
@@ -199,7 +201,7 @@ void Queue::serviceAllBrowsers()
}
}
-void Queue::serviceBrowser(Consumer* browser)
+void Queue::serviceBrowser(Consumer::ptr browser)
{
QueuedMessage msg;
while (seek(msg, browser->position) && browser->deliver(msg)) {
@@ -219,7 +221,7 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
return false;
}
-void Queue::consume(Consumer* c, bool requestExclusive){
+void Queue::consume(Consumer::ptr c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
if(exclusive) {
throw ChannelException(
@@ -242,17 +244,17 @@ void Queue::consume(Consumer* c, bool requestExclusive){
}
}
-void Queue::cancel(Consumer* c){
+void Queue::cancel(Consumer::ptr c){
RWlock::ScopedWlock locker(consumerLock);
if (c->preAcquires()) {
cancel(c, acquirers);
} else {
cancel(c, browsers);
}
- if(exclusive == c) exclusive = 0;
+ if(exclusive == c) exclusive.reset();
}
-void Queue::cancel(Consumer* c, Consumers& consumers)
+void Queue::cancel(Consumer::ptr c, Consumers& consumers)
{
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
@@ -442,3 +444,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ queue->unbind(broker.getExchanges(), queue);
+ queue->destroy();
+ }
+
+}