diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-17 08:59:44 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-17 08:59:44 +0000 |
commit | c619794e8a903e716bc5117179ea0ab1e24e1254 (patch) | |
tree | e4cf22d8de792053a4bb7b594b0e1cc2b2ca8abc /cpp/src | |
parent | de86223091817b091b8f49774853d927c00eed9b (diff) | |
download | qpid-python-c619794e8a903e716bc5117179ea0ab1e24e1254.tar.gz |
Use shared pointers for consumers (held by queues and sessions) to prevent having to hold lock across deliver() while avoiding invocation on stale pointers.
Ensure auto-deleted queues are properly cleaned up (i.e. are unbound from exchanges) to avoid leaking memory as messages are accumulated in inaccessible queues. (some cleanup to follow on this)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585417 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 93 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 16 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 42 |
8 files changed, 120 insertions, 101 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 21d759c901..ca0ca20849 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -78,11 +78,8 @@ void Connection::closed(){ while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); - if (q->canAutoDelete() && - broker.getQueues().destroyIf(q->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), q))) { - - q->unbind(broker.getExchanges(), q); - q->destroy(); + if (q->canAutoDelete()) { + Queue::tryAutoDelete(broker, q); } exclusiveQueues.erase(exclusiveQueues.begin()); } diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index c482a44ab1..bf46ecbe1f 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -39,6 +39,8 @@ namespace qpid { class Consumer { const bool acquires; public: + typedef shared_ptr<Consumer> ptr; + framing::SequenceNumber position; Consumer(bool preAcquires = true) : acquires(preAcquires) {} diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index e6c7b28a49..87b23102e2 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -17,6 +17,7 @@ */ #include "qpid/QpidError.h" +#include "qpid/log/Statement.h" #include "MessageHandlerImpl.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" @@ -156,7 +157,6 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) { - if (unit == 0) { //message state.addMessageCredit(destination, value); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e4a6449e08..8c990795e7 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -22,6 +22,7 @@ #include <boost/format.hpp> #include "qpid/log/Statement.h" +#include "Broker.h" #include "Queue.h" #include "Exchange.h" #include "DeliverableMessage.h" @@ -47,7 +48,6 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), next(0), - exclusive(0), persistenceId(0), serializer(false), dispatchCallback(*this) @@ -80,7 +80,7 @@ void Queue::deliver(Message::shared_ptr& msg){ }else { push(msg); } - QPID_LOG(debug, "Message Enqueued: " << msg->getApplicationHeaders()); + QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); serializer.execute(dispatchCallback); } } @@ -124,7 +124,7 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -void Queue::requestDispatch(Consumer* c){ +void Queue::requestDispatch(Consumer::ptr c){ if (!c || c->preAcquires()) { serializer.execute(dispatchCallback); } else { @@ -138,12 +138,12 @@ void Queue::flush(DispatchCompletion& completion) serializer.execute(f); } -Consumer* Queue::allocate() +Consumer::ptr Queue::allocate() { RWlock::ScopedWlock locker(consumerLock); if(acquirers.empty()){ - return 0; + return Consumer::ptr(); }else if(exclusive){ return exclusive; }else{ @@ -154,14 +154,16 @@ Consumer* Queue::allocate() bool Queue::dispatch(QueuedMessage& msg) { - Consumer* c = allocate(); - Consumer* first = c; + Consumer::ptr c = allocate(); + Consumer::ptr first = c; while(c){ if(c->deliver(msg)) { return true; } else { c = allocate(); - if (c == first) c = 0; + if (c == first) { + break; + } } } return false; @@ -199,7 +201,7 @@ void Queue::serviceAllBrowsers() } } -void Queue::serviceBrowser(Consumer* browser) +void Queue::serviceBrowser(Consumer::ptr browser) { QueuedMessage msg; while (seek(msg, browser->position) && browser->deliver(msg)) { @@ -219,7 +221,7 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { return false; } -void Queue::consume(Consumer* c, bool requestExclusive){ +void Queue::consume(Consumer::ptr c, bool requestExclusive){ RWlock::ScopedWlock locker(consumerLock); if(exclusive) { throw ChannelException( @@ -242,17 +244,17 @@ void Queue::consume(Consumer* c, bool requestExclusive){ } } -void Queue::cancel(Consumer* c){ +void Queue::cancel(Consumer::ptr c){ RWlock::ScopedWlock locker(consumerLock); if (c->preAcquires()) { cancel(c, acquirers); } else { cancel(c, browsers); } - if(exclusive == c) exclusive = 0; + if(exclusive == c) exclusive.reset(); } -void Queue::cancel(Consumer* c, Consumers& consumers) +void Queue::cancel(Consumer::ptr c, Consumers& consumers) { Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); if (i != consumers.end()) @@ -442,3 +444,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (broker.getQueues().destroyIf(queue->getName(), + boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + queue->unbind(broker.getExchanges(), queue); + queue->destroy(); + } + +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 6e859e67bb..d3c8fb7e81 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -36,11 +36,9 @@ #include "QueuePolicy.h" #include "QueueBindings.h" -// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to -// enforce ownership of Consumers. - namespace qpid { namespace broker { + class Broker; class MessageStore; class QueueRegistry; class TransactionContext; @@ -61,7 +59,7 @@ namespace qpid { * or more consumers registers. */ class Queue : public PersistableQueue { - typedef std::vector<Consumer*> Consumers; + typedef std::vector<Consumer::ptr> Consumers; typedef std::deque<QueuedMessage> Messages; struct DispatchFunctor @@ -86,7 +84,7 @@ namespace qpid { int next; mutable qpid::sys::RWlock consumerLock; mutable qpid::sys::Mutex messageLock; - Consumer* exclusive; + Consumer::ptr exclusive; mutable uint64_t persistenceId; framing::FieldTable settings; std::auto_ptr<QueuePolicy> policy; @@ -104,10 +102,10 @@ namespace qpid { * only called by serilizer */ void dispatch(); - void cancel(Consumer* c, Consumers& set); + void cancel(Consumer::ptr c, Consumers& set); void serviceAllBrowsers(); - void serviceBrowser(Consumer* c); - Consumer* allocate(); + void serviceBrowser(Consumer::ptr c); + Consumer::ptr allocate(); bool seek(QueuedMessage& msg, const framing::SequenceNumber& position); protected: @@ -117,7 +115,6 @@ namespace qpid { virtual void notifyDurableIOComplete(); public: - typedef boost::shared_ptr<Queue> shared_ptr; typedef std::vector<shared_ptr> vector; @@ -162,10 +159,10 @@ namespace qpid { * at any time, so this call schedules the despatch based on * the serilizer policy. */ - void requestDispatch(Consumer* c = 0); + void requestDispatch(Consumer::ptr c = Consumer::ptr()); void flush(DispatchCompletion& callback); - void consume(Consumer* c, bool exclusive = false); - void cancel(Consumer* c); + void consume(Consumer::ptr c, bool exclusive = false); + void cancel(Consumer::ptr c); uint32_t purge(); uint32_t getMessageCount() const; uint32_t getConsumerCount() const; @@ -202,6 +199,7 @@ namespace qpid { uint32_t encodedSize() const; static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer); + static void tryAutoDelete(Broker& broker, Queue::shared_ptr); }; } } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index d826fef22c..d605e92b72 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -69,7 +69,11 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) } SemanticState::~SemanticState() { - consumers.clear(); + //cancel all consumers + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + cancel(i->second); + } + if (dtxBuffer.get()) { dtxBuffer->fail(); } @@ -86,16 +90,15 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); - queue->consume(c.get(), exclusive);//may throw exception - consumers.insert(tagInOut, c.release()); + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); + queue->consume(c, exclusive);//may throw exception + consumers[tagInOut] = c; } void SemanticState::cancel(const string& tag){ - // consumers is a ptr_map so erase will delete the consumer - // which will call cancel. ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { + cancel(i->second); consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery @@ -287,28 +290,19 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) } } -SemanticState::ConsumerImpl::~ConsumerImpl() { - cancel(); -} +SemanticState::ConsumerImpl::~ConsumerImpl() {} -void SemanticState::ConsumerImpl::cancel() +void SemanticState::cancel(ConsumerImpl::shared_ptr c) { + Queue::shared_ptr queue = c->getQueue(); if(queue) { - queue->cancel(this); + queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - parent->getSession().getBroker().getQueues().destroyIf( - queue->getName(), - boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); + Queue::tryAutoDelete(getSession().getBroker(), queue); } } } -void SemanticState::ConsumerImpl::requestDispatch() -{ - if(blocked) - queue->requestDispatch(this); -} - void SemanticState::handle(Message::shared_ptr msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); @@ -389,7 +383,21 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) //if the prefetch limit had previously been reached, or credit //had expired in windowing mode there may be messages that can //be now be delivered - for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); + requestDispatch(); +} + +void SemanticState::requestDispatch() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + requestDispatch(i->second); + } +} + +void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c) +{ + if(c->isBlocked()) { + c->getQueue()->requestDispatch(c); + } } void SemanticState::acknowledged(const DeliveryRecord& delivery) @@ -397,7 +405,7 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery) delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - i->acknowledged(delivery); + i->second->acknowledged(delivery); } } @@ -458,52 +466,55 @@ void SemanticState::flow(bool active) flowActive = active; if (requestDelivery) { //there may be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); + requestDispatch(); } } -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) { ConsumerImplMap::iterator i = consumers.find(destination); if (i == consumers.end()) { throw NotFoundException(QPID_MSG("Unknown destination " << destination)); } else { - return *i; + return i->second; } } void SemanticState::setWindowMode(const std::string& destination) { - find(destination).setWindowMode(); + find(destination)->setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination).setCreditMode(); + find(destination)->setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - find(destination).addByteCredit(value); + ConsumerImpl::shared_ptr c = find(destination); + c->addByteCredit(value); + requestDispatch(c); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - find(destination).addMessageCredit(value); + ConsumerImpl::shared_ptr c = find(destination); + c->addMessageCredit(value); + requestDispatch(c); } void SemanticState::flush(const std::string& destination) { - ConsumerImpl& c = find(destination); - c.flush(); + find(destination)->flush(); } void SemanticState::stop(const std::string& destination) { - find(destination).stop(); + find(destination)->stop(); } void SemanticState::ConsumerImpl::setWindowMode() @@ -518,24 +529,18 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { - { - Mutex::ScopedLock l(lock); - if (byteCredit != 0xFFFFFFFF) { - byteCredit += value; - } + Mutex::ScopedLock l(lock); + if (byteCredit != 0xFFFFFFFF) { + byteCredit += value; } - requestDispatch(); } void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { - { - Mutex::ScopedLock l(lock); - if (msgCredit != 0xFFFFFFFF) { - msgCredit += value; - } + Mutex::ScopedLock l(lock); + if (msgCredit != 0xFFFFFFFF) { + msgCredit += value; } - requestDispatch(); } void SemanticState::ConsumerImpl::flush() diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 65e67283cc..d2c2d4b188 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -37,9 +37,8 @@ #include "qpid/framing/Uuid.h" #include "qpid/shared_ptr.h" -#include <boost/ptr_container/ptr_map.hpp> - #include <list> +#include <map> #include <vector> namespace qpid { @@ -72,13 +71,13 @@ class SemanticState : public framing::FrameHandler::Chains, bool checkCredit(Message::shared_ptr& msg); public: + typedef shared_ptr<ConsumerImpl> shared_ptr; + ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); bool deliver(QueuedMessage& msg); - void cancel(); - void requestDispatch(); void setWindowMode(); void setCreditMode(); @@ -87,6 +86,8 @@ class SemanticState : public framing::FrameHandler::Chains, void flush(); void stop(); void acknowledged(const DeliveryRecord&); + Queue::shared_ptr getQueue() { return queue; } + bool isBlocked() const { return blocked; } }; struct FlushCompletion : DispatchCompletion @@ -100,7 +101,7 @@ class SemanticState : public framing::FrameHandler::Chains, void completed(); }; - typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; + typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap; SessionState& session; DeliveryAdapter& deliveryAdapter; @@ -124,10 +125,13 @@ class SemanticState : public framing::FrameHandler::Chains, void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); void checkDtxTimeout(); - ConsumerImpl& find(const std::string& destination); + ConsumerImpl::shared_ptr find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void acknowledged(const DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); + void requestDispatch(); + void requestDispatch(ConsumerImpl::shared_ptr); + void cancel(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionState&); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index f2f1b3bf84..114e0045f5 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -36,6 +36,8 @@ using namespace qpid::sys; class TestConsumer : public virtual Consumer{ public: + typedef shared_ptr<TestConsumer> shared_ptr; + Message::shared_ptr last; bool received; TestConsumer(): received(false) {}; @@ -85,8 +87,8 @@ class QueueTest : public CppUnit::TestCase Queue::shared_ptr queue(new Queue("my_test_queue", true)); Message::shared_ptr received; - TestConsumer c1; - queue->consume(&c1); + TestConsumer::shared_ptr c1(new TestConsumer()); + queue->consume(c1); //Test basic delivery: @@ -95,7 +97,7 @@ class QueueTest : public CppUnit::TestCase queue->process(msg1); sleep(2); - CPPUNIT_ASSERT(!c1.received); + CPPUNIT_ASSERT(!c1->received); msg1->enqueueComplete(); received = queue->dequeue().payload; @@ -124,10 +126,10 @@ class QueueTest : public CppUnit::TestCase Queue::shared_ptr queue(new Queue("my_queue", true)); //Test adding consumers: - TestConsumer c1; - TestConsumer c2; - queue->consume(&c1); - queue->consume(&c2); + TestConsumer::shared_ptr c1(new TestConsumer()); + TestConsumer::shared_ptr c2(new TestConsumer()); + queue->consume(c1); + queue->consume(c2); CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getConsumerCount()); @@ -137,25 +139,25 @@ class QueueTest : public CppUnit::TestCase Message::shared_ptr msg3 = message("e", "C"); queue->deliver(msg1); - if (!c1.received) + if (!c1->received) sleep(2); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get()); queue->deliver(msg2); - if (!c2.received) + if (!c2->received) sleep(2); - CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); + CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get()); - c1.received = false; + c1->received = false; queue->deliver(msg3); - if (!c1.received) + if (!c1->received) sleep(2); - CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); + CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get()); //Test cancellation: - queue->cancel(&c1); + queue->cancel(c1); CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getConsumerCount()); - queue->cancel(&c2); + queue->cancel(c2); CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getConsumerCount()); } @@ -200,13 +202,13 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount()); - TestConsumer consumer; - queue->consume(&consumer); + TestConsumer::shared_ptr consumer(new TestConsumer()); + queue->consume(consumer); queue->requestDispatch(); - if (!consumer.received) + if (!consumer->received) sleep(2); - CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->dequeue().payload; |