diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 41 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 1 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py | 40 |
4 files changed, 77 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index b8981b4877..c91cfba2f8 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -73,21 +73,34 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId()), userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), - isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())) + isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), + closeComplete(false) { acl = getSession().getBroker().getAcl(); } SemanticState::~SemanticState() { - //cancel all consumers - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(i->second); - } + closed(); +} - if (dtxBuffer.get()) { - dtxBuffer->fail(); +void SemanticState::closed() { + if (!closeComplete) { + //prevent requeued messages being redelivered to consumers + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + disable(i->second); + } + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); + + //now unsubscribe, which may trigger queue deletion and thus + //needs to occur after the requeueing of unacked messages + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + unsubscribe(i->second); + } + closeComplete = true; } - recover(true); } bool SemanticState::exists(const string& consumerTag){ @@ -389,11 +402,15 @@ SemanticState::ConsumerImpl::~ConsumerImpl() mgmtObject->resourceDestroy (); } -void SemanticState::cancel(ConsumerImpl::shared_ptr c) +void SemanticState::disable(ConsumerImpl::shared_ptr c) { c->disableNotify(); if (session.isAttached()) session.getConnection().outputTasks.removeOutputTask(c.get()); +} + +void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) +{ Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -403,6 +420,12 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) } } +void SemanticState::cancel(ConsumerImpl::shared_ptr c) +{ + disable(c); + unsubscribe(c); +} + void SemanticState::handle(intrusive_ptr<Message> msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index cae852732d..2b314920e6 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -157,6 +157,7 @@ class SemanticState : private boost::noncopyable { const string userID; const string userName; const bool isDefaultRealm; + bool closeComplete; void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void checkDtxTimeout(); @@ -165,6 +166,8 @@ class SemanticState : private boost::noncopyable { AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void cancel(ConsumerImpl::shared_ptr); + void unsubscribe(ConsumerImpl::shared_ptr); + void disable(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionContext&); @@ -220,6 +223,7 @@ class SemanticState : private boost::noncopyable { void attached(); void detached(); + void closed(); // Used by cluster to re-create sessions template <class F> void eachConsumer(F f) { diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index ddf68cad2f..be4f8c7b40 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -88,6 +88,7 @@ SessionState::SessionState( } SessionState::~SessionState() { + semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py index 4d8617eb8e..0ffeb57172 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py @@ -196,6 +196,46 @@ class AlternateExchangeTests(TestBase010): session.exchange_delete(exchange="onealternate") session.exchange_delete(exchange="alt1") + def test_queue_autodelete(self): + """ + Test that messages in a queue being auto-deleted are delivered + to the alternate-exchange if specified, including messages + that are acquired but not accepted + """ + session = self.session + #set up a 'dead letter queue': + session.exchange_declare(exchange="dlq", type="fanout") + session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="dlq", queue="deleted") + session.message_subscribe(destination="dlq", queue="deleted") + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + dlq = session.incoming("dlq") + + #on a separate session, create an auto-deleted queue using the + #dlq as its alternate exchange (handling of auto-delete is + #different for exclusive and non-exclusive queues, so test + #both modes): + for mode in [True, False]: + session2 = self.conn.session("another-session") + session2.queue_declare(queue="my-queue", alternate_exchange="dlq", exclusive=mode, auto_delete=True) + #send it some messages: + dp=session2.delivery_properties(routing_key="my-queue") + session2.message_transfer(message=Message(dp, "One")) + session2.message_transfer(message=Message(dp, "Two")) + session2.message_transfer(message=Message(dp, "Three")) + session2.message_subscribe(destination="incoming", queue="my-queue") + session2.message_flow(destination="incoming", unit=session.credit_unit.message, value=1) + session2.message_flow(destination="incoming", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + self.assertEqual("One", session2.incoming("incoming").get(timeout=1).body) + session2.close() + + #check the messages were delivered to the dlq: + self.assertEqual("One", dlq.get(timeout=1).body) + self.assertEqual("Two", dlq.get(timeout=1).body) + self.assertEqual("Three", dlq.get(timeout=1).body) + self.assertEmpty(dlq) + def assertEmpty(self, queue): try: |