summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp41
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp1
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py40
4 files changed, 77 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index b8981b4877..c91cfba2f8 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -73,21 +73,34 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
userID(getSession().getConnection().getUserId()),
userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
- isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size()))
+ isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
+ closeComplete(false)
{
acl = getSession().getBroker().getAcl();
}
SemanticState::~SemanticState() {
- //cancel all consumers
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- cancel(i->second);
- }
+ closed();
+}
- if (dtxBuffer.get()) {
- dtxBuffer->fail();
+void SemanticState::closed() {
+ if (!closeComplete) {
+ //prevent requeued messages being redelivered to consumers
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ disable(i->second);
+ }
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
+ recover(true);
+
+ //now unsubscribe, which may trigger queue deletion and thus
+ //needs to occur after the requeueing of unacked messages
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ unsubscribe(i->second);
+ }
+ closeComplete = true;
}
- recover(true);
}
bool SemanticState::exists(const string& consumerTag){
@@ -389,11 +402,15 @@ SemanticState::ConsumerImpl::~ConsumerImpl()
mgmtObject->resourceDestroy ();
}
-void SemanticState::cancel(ConsumerImpl::shared_ptr c)
+void SemanticState::disable(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
if (session.isAttached())
session.getConnection().outputTasks.removeOutputTask(c.get());
+}
+
+void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c)
+{
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
@@ -403,6 +420,12 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
}
}
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
+{
+ disable(c);
+ unsubscribe(c);
+}
+
void SemanticState::handle(intrusive_ptr<Message> msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index cae852732d..2b314920e6 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -157,6 +157,7 @@ class SemanticState : private boost::noncopyable {
const string userID;
const string userName;
const bool isDefaultRealm;
+ bool closeComplete;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
@@ -165,6 +166,8 @@ class SemanticState : private boost::noncopyable {
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void cancel(ConsumerImpl::shared_ptr);
+ void unsubscribe(ConsumerImpl::shared_ptr);
+ void disable(ConsumerImpl::shared_ptr);
public:
SemanticState(DeliveryAdapter&, SessionContext&);
@@ -220,6 +223,7 @@ class SemanticState : private boost::noncopyable {
void attached();
void detached();
+ void closed();
// Used by cluster to re-create sessions
template <class F> void eachConsumer(F f) {
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index ddf68cad2f..be4f8c7b40 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -88,6 +88,7 @@ SessionState::SessionState(
}
SessionState::~SessionState() {
+ semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
index 4d8617eb8e..0ffeb57172 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
@@ -196,6 +196,46 @@ class AlternateExchangeTests(TestBase010):
session.exchange_delete(exchange="onealternate")
session.exchange_delete(exchange="alt1")
+ def test_queue_autodelete(self):
+ """
+ Test that messages in a queue being auto-deleted are delivered
+ to the alternate-exchange if specified, including messages
+ that are acquired but not accepted
+ """
+ session = self.session
+ #set up a 'dead letter queue':
+ session.exchange_declare(exchange="dlq", type="fanout")
+ session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="dlq", queue="deleted")
+ session.message_subscribe(destination="dlq", queue="deleted")
+ session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ dlq = session.incoming("dlq")
+
+ #on a separate session, create an auto-deleted queue using the
+ #dlq as its alternate exchange (handling of auto-delete is
+ #different for exclusive and non-exclusive queues, so test
+ #both modes):
+ for mode in [True, False]:
+ session2 = self.conn.session("another-session")
+ session2.queue_declare(queue="my-queue", alternate_exchange="dlq", exclusive=mode, auto_delete=True)
+ #send it some messages:
+ dp=session2.delivery_properties(routing_key="my-queue")
+ session2.message_transfer(message=Message(dp, "One"))
+ session2.message_transfer(message=Message(dp, "Two"))
+ session2.message_transfer(message=Message(dp, "Three"))
+ session2.message_subscribe(destination="incoming", queue="my-queue")
+ session2.message_flow(destination="incoming", unit=session.credit_unit.message, value=1)
+ session2.message_flow(destination="incoming", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ self.assertEqual("One", session2.incoming("incoming").get(timeout=1).body)
+ session2.close()
+
+ #check the messages were delivered to the dlq:
+ self.assertEqual("One", dlq.get(timeout=1).body)
+ self.assertEqual("Two", dlq.get(timeout=1).body)
+ self.assertEqual("Three", dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
def assertEmpty(self, queue):
try: