diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-17 08:59:44 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-17 08:59:44 +0000 |
commit | c619794e8a903e716bc5117179ea0ab1e24e1254 (patch) | |
tree | e4cf22d8de792053a4bb7b594b0e1cc2b2ca8abc /cpp/src/qpid/broker/SemanticState.h | |
parent | de86223091817b091b8f49774853d927c00eed9b (diff) | |
download | qpid-python-c619794e8a903e716bc5117179ea0ab1e24e1254.tar.gz |
Use shared pointers for consumers (held by queues and sessions) to prevent having to hold lock across deliver() while avoiding invocation on stale pointers.
Ensure auto-deleted queues are properly cleaned up (i.e. are unbound from exchanges) to avoid leaking memory as messages are accumulated in inaccessible queues. (some cleanup to follow on this)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585417 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 65e67283cc..d2c2d4b188 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -37,9 +37,8 @@ #include "qpid/framing/Uuid.h" #include "qpid/shared_ptr.h" -#include <boost/ptr_container/ptr_map.hpp> - #include <list> +#include <map> #include <vector> namespace qpid { @@ -72,13 +71,13 @@ class SemanticState : public framing::FrameHandler::Chains, bool checkCredit(Message::shared_ptr& msg); public: + typedef shared_ptr<ConsumerImpl> shared_ptr; + ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); bool deliver(QueuedMessage& msg); - void cancel(); - void requestDispatch(); void setWindowMode(); void setCreditMode(); @@ -87,6 +86,8 @@ class SemanticState : public framing::FrameHandler::Chains, void flush(); void stop(); void acknowledged(const DeliveryRecord&); + Queue::shared_ptr getQueue() { return queue; } + bool isBlocked() const { return blocked; } }; struct FlushCompletion : DispatchCompletion @@ -100,7 +101,7 @@ class SemanticState : public framing::FrameHandler::Chains, void completed(); }; - typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; + typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap; SessionState& session; DeliveryAdapter& deliveryAdapter; @@ -124,10 +125,13 @@ class SemanticState : public framing::FrameHandler::Chains, void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); void checkDtxTimeout(); - ConsumerImpl& find(const std::string& destination); + ConsumerImpl::shared_ptr find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void acknowledged(const DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); + void requestDispatch(); + void requestDispatch(ConsumerImpl::shared_ptr); + void cancel(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionState&); |