diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-17 08:59:44 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-17 08:59:44 +0000 |
commit | c619794e8a903e716bc5117179ea0ab1e24e1254 (patch) | |
tree | e4cf22d8de792053a4bb7b594b0e1cc2b2ca8abc /cpp/src/qpid/broker/Queue.cpp | |
parent | de86223091817b091b8f49774853d927c00eed9b (diff) | |
download | qpid-python-c619794e8a903e716bc5117179ea0ab1e24e1254.tar.gz |
Use shared pointers for consumers (held by queues and sessions) to prevent having to hold lock across deliver() while avoiding invocation on stale pointers.
Ensure auto-deleted queues are properly cleaned up (i.e. are unbound from exchanges) to avoid leaking memory as messages are accumulated in inaccessible queues. (some cleanup to follow on this)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585417 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e4a6449e08..8c990795e7 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -22,6 +22,7 @@ #include <boost/format.hpp> #include "qpid/log/Statement.h" +#include "Broker.h" #include "Queue.h" #include "Exchange.h" #include "DeliverableMessage.h" @@ -47,7 +48,6 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), next(0), - exclusive(0), persistenceId(0), serializer(false), dispatchCallback(*this) @@ -80,7 +80,7 @@ void Queue::deliver(Message::shared_ptr& msg){ }else { push(msg); } - QPID_LOG(debug, "Message Enqueued: " << msg->getApplicationHeaders()); + QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); serializer.execute(dispatchCallback); } } @@ -124,7 +124,7 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -void Queue::requestDispatch(Consumer* c){ +void Queue::requestDispatch(Consumer::ptr c){ if (!c || c->preAcquires()) { serializer.execute(dispatchCallback); } else { @@ -138,12 +138,12 @@ void Queue::flush(DispatchCompletion& completion) serializer.execute(f); } -Consumer* Queue::allocate() +Consumer::ptr Queue::allocate() { RWlock::ScopedWlock locker(consumerLock); if(acquirers.empty()){ - return 0; + return Consumer::ptr(); }else if(exclusive){ return exclusive; }else{ @@ -154,14 +154,16 @@ Consumer* Queue::allocate() bool Queue::dispatch(QueuedMessage& msg) { - Consumer* c = allocate(); - Consumer* first = c; + Consumer::ptr c = allocate(); + Consumer::ptr first = c; while(c){ if(c->deliver(msg)) { return true; } else { c = allocate(); - if (c == first) c = 0; + if (c == first) { + break; + } } } return false; @@ -199,7 +201,7 @@ void Queue::serviceAllBrowsers() } } -void Queue::serviceBrowser(Consumer* browser) +void Queue::serviceBrowser(Consumer::ptr browser) { QueuedMessage msg; while (seek(msg, browser->position) && browser->deliver(msg)) { @@ -219,7 +221,7 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { return false; } -void Queue::consume(Consumer* c, bool requestExclusive){ +void Queue::consume(Consumer::ptr c, bool requestExclusive){ RWlock::ScopedWlock locker(consumerLock); if(exclusive) { throw ChannelException( @@ -242,17 +244,17 @@ void Queue::consume(Consumer* c, bool requestExclusive){ } } -void Queue::cancel(Consumer* c){ +void Queue::cancel(Consumer::ptr c){ RWlock::ScopedWlock locker(consumerLock); if (c->preAcquires()) { cancel(c, acquirers); } else { cancel(c, browsers); } - if(exclusive == c) exclusive = 0; + if(exclusive == c) exclusive.reset(); } -void Queue::cancel(Consumer* c, Consumers& consumers) +void Queue::cancel(Consumer::ptr c, Consumers& consumers) { Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); if (i != consumers.end()) @@ -442,3 +444,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (broker.getQueues().destroyIf(queue->getName(), + boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + queue->unbind(broker.getExchanges(), queue); + queue->destroy(); + } + +} |