summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-17 08:59:44 +0000
committerGordon Sim <gsim@apache.org>2007-10-17 08:59:44 +0000
commitc619794e8a903e716bc5117179ea0ab1e24e1254 (patch)
treee4cf22d8de792053a4bb7b594b0e1cc2b2ca8abc /cpp/src
parentde86223091817b091b8f49774853d927c00eed9b (diff)
downloadqpid-python-c619794e8a903e716bc5117179ea0ab1e24e1254.tar.gz
Use shared pointers for consumers (held by queues and sessions) to prevent having to hold lock across deliver() while avoiding invocation on stale pointers.
Ensure auto-deleted queues are properly cleaned up (i.e. are unbound from exchanges) to avoid leaking memory as messages are accumulated in inaccessible queues. (some cleanup to follow on this) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585417 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp7
-rw-r--r--cpp/src/qpid/broker/Consumer.h2
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp37
-rw-r--r--cpp/src/qpid/broker/Queue.h22
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp93
-rw-r--r--cpp/src/qpid/broker/SemanticState.h16
-rw-r--r--cpp/src/tests/QueueTest.cpp42
8 files changed, 120 insertions, 101 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 21d759c901..ca0ca20849 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -78,11 +78,8 @@ void Connection::closed(){
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
- if (q->canAutoDelete() &&
- broker.getQueues().destroyIf(q->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), q))) {
-
- q->unbind(broker.getExchanges(), q);
- q->destroy();
+ if (q->canAutoDelete()) {
+ Queue::tryAutoDelete(broker, q);
}
exclusiveQueues.erase(exclusiveQueues.begin());
}
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index c482a44ab1..bf46ecbe1f 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -39,6 +39,8 @@ namespace qpid {
class Consumer {
const bool acquires;
public:
+ typedef shared_ptr<Consumer> ptr;
+
framing::SequenceNumber position;
Consumer(bool preAcquires = true) : acquires(preAcquires) {}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index e6c7b28a49..87b23102e2 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -17,6 +17,7 @@
*/
#include "qpid/QpidError.h"
+#include "qpid/log/Statement.h"
#include "MessageHandlerImpl.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
@@ -156,7 +157,6 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
{
-
if (unit == 0) {
//message
state.addMessageCredit(destination, value);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e4a6449e08..8c990795e7 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -22,6 +22,7 @@
#include <boost/format.hpp>
#include "qpid/log/Statement.h"
+#include "Broker.h"
#include "Queue.h"
#include "Exchange.h"
#include "DeliverableMessage.h"
@@ -47,7 +48,6 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
next(0),
- exclusive(0),
persistenceId(0),
serializer(false),
dispatchCallback(*this)
@@ -80,7 +80,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}else {
push(msg);
}
- QPID_LOG(debug, "Message Enqueued: " << msg->getApplicationHeaders());
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
serializer.execute(dispatchCallback);
}
}
@@ -124,7 +124,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-void Queue::requestDispatch(Consumer* c){
+void Queue::requestDispatch(Consumer::ptr c){
if (!c || c->preAcquires()) {
serializer.execute(dispatchCallback);
} else {
@@ -138,12 +138,12 @@ void Queue::flush(DispatchCompletion& completion)
serializer.execute(f);
}
-Consumer* Queue::allocate()
+Consumer::ptr Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
if(acquirers.empty()){
- return 0;
+ return Consumer::ptr();
}else if(exclusive){
return exclusive;
}else{
@@ -154,14 +154,16 @@ Consumer* Queue::allocate()
bool Queue::dispatch(QueuedMessage& msg)
{
- Consumer* c = allocate();
- Consumer* first = c;
+ Consumer::ptr c = allocate();
+ Consumer::ptr first = c;
while(c){
if(c->deliver(msg)) {
return true;
} else {
c = allocate();
- if (c == first) c = 0;
+ if (c == first) {
+ break;
+ }
}
}
return false;
@@ -199,7 +201,7 @@ void Queue::serviceAllBrowsers()
}
}
-void Queue::serviceBrowser(Consumer* browser)
+void Queue::serviceBrowser(Consumer::ptr browser)
{
QueuedMessage msg;
while (seek(msg, browser->position) && browser->deliver(msg)) {
@@ -219,7 +221,7 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
return false;
}
-void Queue::consume(Consumer* c, bool requestExclusive){
+void Queue::consume(Consumer::ptr c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
if(exclusive) {
throw ChannelException(
@@ -242,17 +244,17 @@ void Queue::consume(Consumer* c, bool requestExclusive){
}
}
-void Queue::cancel(Consumer* c){
+void Queue::cancel(Consumer::ptr c){
RWlock::ScopedWlock locker(consumerLock);
if (c->preAcquires()) {
cancel(c, acquirers);
} else {
cancel(c, browsers);
}
- if(exclusive == c) exclusive = 0;
+ if(exclusive == c) exclusive.reset();
}
-void Queue::cancel(Consumer* c, Consumers& consumers)
+void Queue::cancel(Consumer::ptr c, Consumers& consumers)
{
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
@@ -442,3 +444,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ queue->unbind(broker.getExchanges(), queue);
+ queue->destroy();
+ }
+
+}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 6e859e67bb..d3c8fb7e81 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -36,11 +36,9 @@
#include "QueuePolicy.h"
#include "QueueBindings.h"
-// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
-// enforce ownership of Consumers.
-
namespace qpid {
namespace broker {
+ class Broker;
class MessageStore;
class QueueRegistry;
class TransactionContext;
@@ -61,7 +59,7 @@ namespace qpid {
* or more consumers registers.
*/
class Queue : public PersistableQueue {
- typedef std::vector<Consumer*> Consumers;
+ typedef std::vector<Consumer::ptr> Consumers;
typedef std::deque<QueuedMessage> Messages;
struct DispatchFunctor
@@ -86,7 +84,7 @@ namespace qpid {
int next;
mutable qpid::sys::RWlock consumerLock;
mutable qpid::sys::Mutex messageLock;
- Consumer* exclusive;
+ Consumer::ptr exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
@@ -104,10 +102,10 @@ namespace qpid {
* only called by serilizer
*/
void dispatch();
- void cancel(Consumer* c, Consumers& set);
+ void cancel(Consumer::ptr c, Consumers& set);
void serviceAllBrowsers();
- void serviceBrowser(Consumer* c);
- Consumer* allocate();
+ void serviceBrowser(Consumer::ptr c);
+ Consumer::ptr allocate();
bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
protected:
@@ -117,7 +115,6 @@ namespace qpid {
virtual void notifyDurableIOComplete();
public:
-
typedef boost::shared_ptr<Queue> shared_ptr;
typedef std::vector<shared_ptr> vector;
@@ -162,10 +159,10 @@ namespace qpid {
* at any time, so this call schedules the despatch based on
* the serilizer policy.
*/
- void requestDispatch(Consumer* c = 0);
+ void requestDispatch(Consumer::ptr c = Consumer::ptr());
void flush(DispatchCompletion& callback);
- void consume(Consumer* c, bool exclusive = false);
- void cancel(Consumer* c);
+ void consume(Consumer::ptr c, bool exclusive = false);
+ void cancel(Consumer::ptr c);
uint32_t purge();
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
@@ -202,6 +199,7 @@ namespace qpid {
uint32_t encodedSize() const;
static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
+ static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
};
}
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index d826fef22c..d605e92b72 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -69,7 +69,11 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
}
SemanticState::~SemanticState() {
- consumers.clear();
+ //cancel all consumers
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ cancel(i->second);
+ }
+
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
@@ -86,16 +90,15 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
- queue->consume(c.get(), exclusive);//may throw exception
- consumers.insert(tagInOut, c.release());
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
+ queue->consume(c, exclusive);//may throw exception
+ consumers[tagInOut] = c;
}
void SemanticState::cancel(const string& tag){
- // consumers is a ptr_map so erase will delete the consumer
- // which will call cancel.
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
+ cancel(i->second);
consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
@@ -287,28 +290,19 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
}
}
-SemanticState::ConsumerImpl::~ConsumerImpl() {
- cancel();
-}
+SemanticState::ConsumerImpl::~ConsumerImpl() {}
-void SemanticState::ConsumerImpl::cancel()
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
+ Queue::shared_ptr queue = c->getQueue();
if(queue) {
- queue->cancel(this);
+ queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- parent->getSession().getBroker().getQueues().destroyIf(
- queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ Queue::tryAutoDelete(getSession().getBroker(), queue);
}
}
}
-void SemanticState::ConsumerImpl::requestDispatch()
-{
- if(blocked)
- queue->requestDispatch(this);
-}
-
void SemanticState::handle(Message::shared_ptr msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
@@ -389,7 +383,21 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
//if the prefetch limit had previously been reached, or credit
//had expired in windowing mode there may be messages that can
//be now be delivered
- for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+ requestDispatch();
+}
+
+void SemanticState::requestDispatch()
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ requestDispatch(i->second);
+ }
+}
+
+void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c)
+{
+ if(c->isBlocked()) {
+ c->getQueue()->requestDispatch(c);
+ }
}
void SemanticState::acknowledged(const DeliveryRecord& delivery)
@@ -397,7 +405,7 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery)
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- i->acknowledged(delivery);
+ i->second->acknowledged(delivery);
}
}
@@ -458,52 +466,55 @@ void SemanticState::flow(bool active)
flowActive = active;
if (requestDelivery) {
//there may be messages that can be now be delivered
- std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+ requestDispatch();
}
}
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination)
{
ConsumerImplMap::iterator i = consumers.find(destination);
if (i == consumers.end()) {
throw NotFoundException(QPID_MSG("Unknown destination " << destination));
} else {
- return *i;
+ return i->second;
}
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination).setWindowMode();
+ find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination).setCreditMode();
+ find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- find(destination).addByteCredit(value);
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addByteCredit(value);
+ requestDispatch(c);
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- find(destination).addMessageCredit(value);
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addMessageCredit(value);
+ requestDispatch(c);
}
void SemanticState::flush(const std::string& destination)
{
- ConsumerImpl& c = find(destination);
- c.flush();
+ find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination).stop();
+ find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
@@ -518,24 +529,18 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
- {
- Mutex::ScopedLock l(lock);
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit += value;
- }
+ Mutex::ScopedLock l(lock);
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit += value;
}
- requestDispatch();
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
- {
- Mutex::ScopedLock l(lock);
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit += value;
- }
+ Mutex::ScopedLock l(lock);
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit += value;
}
- requestDispatch();
}
void SemanticState::ConsumerImpl::flush()
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 65e67283cc..d2c2d4b188 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -37,9 +37,8 @@
#include "qpid/framing/Uuid.h"
#include "qpid/shared_ptr.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
#include <list>
+#include <map>
#include <vector>
namespace qpid {
@@ -72,13 +71,13 @@ class SemanticState : public framing::FrameHandler::Chains,
bool checkCredit(Message::shared_ptr& msg);
public:
+ typedef shared_ptr<ConsumerImpl> shared_ptr;
+
ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
- void cancel();
- void requestDispatch();
void setWindowMode();
void setCreditMode();
@@ -87,6 +86,8 @@ class SemanticState : public framing::FrameHandler::Chains,
void flush();
void stop();
void acknowledged(const DeliveryRecord&);
+ Queue::shared_ptr getQueue() { return queue; }
+ bool isBlocked() const { return blocked; }
};
struct FlushCompletion : DispatchCompletion
@@ -100,7 +101,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void completed();
};
- typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+ typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap;
SessionState& session;
DeliveryAdapter& deliveryAdapter;
@@ -124,10 +125,13 @@ class SemanticState : public framing::FrameHandler::Chains,
void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
void checkDtxTimeout();
- ConsumerImpl& find(const std::string& destination);
+ ConsumerImpl::shared_ptr find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void acknowledged(const DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
+ void requestDispatch();
+ void requestDispatch(ConsumerImpl::shared_ptr);
+ void cancel(ConsumerImpl::shared_ptr);
public:
SemanticState(DeliveryAdapter&, SessionState&);
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index f2f1b3bf84..114e0045f5 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -36,6 +36,8 @@ using namespace qpid::sys;
class TestConsumer : public virtual Consumer{
public:
+ typedef shared_ptr<TestConsumer> shared_ptr;
+
Message::shared_ptr last;
bool received;
TestConsumer(): received(false) {};
@@ -85,8 +87,8 @@ class QueueTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_test_queue", true));
Message::shared_ptr received;
- TestConsumer c1;
- queue->consume(&c1);
+ TestConsumer::shared_ptr c1(new TestConsumer());
+ queue->consume(c1);
//Test basic delivery:
@@ -95,7 +97,7 @@ class QueueTest : public CppUnit::TestCase
queue->process(msg1);
sleep(2);
- CPPUNIT_ASSERT(!c1.received);
+ CPPUNIT_ASSERT(!c1->received);
msg1->enqueueComplete();
received = queue->dequeue().payload;
@@ -124,10 +126,10 @@ class QueueTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_queue", true));
//Test adding consumers:
- TestConsumer c1;
- TestConsumer c2;
- queue->consume(&c1);
- queue->consume(&c2);
+ TestConsumer::shared_ptr c1(new TestConsumer());
+ TestConsumer::shared_ptr c2(new TestConsumer());
+ queue->consume(c1);
+ queue->consume(c2);
CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getConsumerCount());
@@ -137,25 +139,25 @@ class QueueTest : public CppUnit::TestCase
Message::shared_ptr msg3 = message("e", "C");
queue->deliver(msg1);
- if (!c1.received)
+ if (!c1->received)
sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get());
queue->deliver(msg2);
- if (!c2.received)
+ if (!c2->received)
sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get());
- c1.received = false;
+ c1->received = false;
queue->deliver(msg3);
- if (!c1.received)
+ if (!c1->received)
sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get());
//Test cancellation:
- queue->cancel(&c1);
+ queue->cancel(c1);
CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getConsumerCount());
- queue->cancel(&c2);
+ queue->cancel(c2);
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getConsumerCount());
}
@@ -200,13 +202,13 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
- TestConsumer consumer;
- queue->consume(&consumer);
+ TestConsumer::shared_ptr consumer(new TestConsumer());
+ queue->consume(consumer);
queue->requestDispatch();
- if (!consumer.received)
+ if (!consumer->received)
sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->dequeue().payload;