summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.h
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-17 08:59:44 +0000
committerGordon Sim <gsim@apache.org>2007-10-17 08:59:44 +0000
commitc619794e8a903e716bc5117179ea0ab1e24e1254 (patch)
treee4cf22d8de792053a4bb7b594b0e1cc2b2ca8abc /cpp/src/qpid/broker/SemanticState.h
parentde86223091817b091b8f49774853d927c00eed9b (diff)
downloadqpid-python-c619794e8a903e716bc5117179ea0ab1e24e1254.tar.gz
Use shared pointers for consumers (held by queues and sessions) to prevent having to hold lock across deliver() while avoiding invocation on stale pointers.
Ensure auto-deleted queues are properly cleaned up (i.e. are unbound from exchanges) to avoid leaking memory as messages are accumulated in inaccessible queues. (some cleanup to follow on this) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585417 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r--cpp/src/qpid/broker/SemanticState.h16
1 files changed, 10 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 65e67283cc..d2c2d4b188 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -37,9 +37,8 @@
#include "qpid/framing/Uuid.h"
#include "qpid/shared_ptr.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
#include <list>
+#include <map>
#include <vector>
namespace qpid {
@@ -72,13 +71,13 @@ class SemanticState : public framing::FrameHandler::Chains,
bool checkCredit(Message::shared_ptr& msg);
public:
+ typedef shared_ptr<ConsumerImpl> shared_ptr;
+
ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
- void cancel();
- void requestDispatch();
void setWindowMode();
void setCreditMode();
@@ -87,6 +86,8 @@ class SemanticState : public framing::FrameHandler::Chains,
void flush();
void stop();
void acknowledged(const DeliveryRecord&);
+ Queue::shared_ptr getQueue() { return queue; }
+ bool isBlocked() const { return blocked; }
};
struct FlushCompletion : DispatchCompletion
@@ -100,7 +101,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void completed();
};
- typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+ typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap;
SessionState& session;
DeliveryAdapter& deliveryAdapter;
@@ -124,10 +125,13 @@ class SemanticState : public framing::FrameHandler::Chains,
void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
void checkDtxTimeout();
- ConsumerImpl& find(const std::string& destination);
+ ConsumerImpl::shared_ptr find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void acknowledged(const DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
+ void requestDispatch();
+ void requestDispatch(ConsumerImpl::shared_ptr);
+ void cancel(ConsumerImpl::shared_ptr);
public:
SemanticState(DeliveryAdapter&, SessionState&);