summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp41
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp1
3 files changed, 37 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index b8981b4877..c91cfba2f8 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -73,21 +73,34 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
userID(getSession().getConnection().getUserId()),
userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
- isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size()))
+ isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
+ closeComplete(false)
{
acl = getSession().getBroker().getAcl();
}
SemanticState::~SemanticState() {
- //cancel all consumers
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- cancel(i->second);
- }
+ closed();
+}
- if (dtxBuffer.get()) {
- dtxBuffer->fail();
+void SemanticState::closed() {
+ if (!closeComplete) {
+ //prevent requeued messages being redelivered to consumers
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ disable(i->second);
+ }
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
+ recover(true);
+
+ //now unsubscribe, which may trigger queue deletion and thus
+ //needs to occur after the requeueing of unacked messages
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ unsubscribe(i->second);
+ }
+ closeComplete = true;
}
- recover(true);
}
bool SemanticState::exists(const string& consumerTag){
@@ -389,11 +402,15 @@ SemanticState::ConsumerImpl::~ConsumerImpl()
mgmtObject->resourceDestroy ();
}
-void SemanticState::cancel(ConsumerImpl::shared_ptr c)
+void SemanticState::disable(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
if (session.isAttached())
session.getConnection().outputTasks.removeOutputTask(c.get());
+}
+
+void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c)
+{
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
@@ -403,6 +420,12 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
}
}
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
+{
+ disable(c);
+ unsubscribe(c);
+}
+
void SemanticState::handle(intrusive_ptr<Message> msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index cae852732d..2b314920e6 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -157,6 +157,7 @@ class SemanticState : private boost::noncopyable {
const string userID;
const string userName;
const bool isDefaultRealm;
+ bool closeComplete;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
@@ -165,6 +166,8 @@ class SemanticState : private boost::noncopyable {
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void cancel(ConsumerImpl::shared_ptr);
+ void unsubscribe(ConsumerImpl::shared_ptr);
+ void disable(ConsumerImpl::shared_ptr);
public:
SemanticState(DeliveryAdapter&, SessionContext&);
@@ -220,6 +223,7 @@ class SemanticState : private boost::noncopyable {
void attached();
void detached();
+ void closed();
// Used by cluster to re-create sessions
template <class F> void eachConsumer(F f) {
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index ddf68cad2f..be4f8c7b40 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -88,6 +88,7 @@ SessionState::SessionState(
}
SessionState::~SessionState() {
+ semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();