summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp8
-rw-r--r--cpp/src/qpid/broker/Connection.cpp11
-rw-r--r--cpp/src/qpid/broker/Queue.h4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--python/tests_0-10/alternate-exchange.py12
-rw-r--r--python/tests_0-10/dtx.py26
-rw-r--r--python/tests_0-10/message.py44
-rw-r--r--python/tests_0-10/query.py12
-rw-r--r--python/tests_0-10/queue.py38
-rw-r--r--python/tests_0-10/tx.py6
11 files changed, 91 insertions, 77 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 0fb521d626..820cc2f397 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -180,7 +180,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().getQueues().declare(
name, durable,
- autoDelete && !exclusive,
+ autoDelete,
exclusive ? &getConnection() : 0);
queue = queue_created.first;
assert(queue);
@@ -202,7 +202,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
if (exclusive) {
getConnection().exclusiveQueues.push_back(queue);
}
- }
+ } else {
+ if (exclusive && !queue->hasExclusiveOwner()) {
+ queue->setExclusiveOwner(&getConnection());
+ }
+ }
}
if (exclusive && !queue->isExclusiveOwner(&getConnection()))
throw ResourceLockedException(
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index a21db0f603..21d759c901 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -29,6 +29,7 @@
#include "SemanticHandler.h"
#include <boost/utility/in_place_factory.hpp>
+#include <boost/bind.hpp>
using namespace boost;
using namespace qpid::sys;
@@ -76,10 +77,14 @@ void Connection::closed(){
try {
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
- broker.getQueues().destroy(q->getName());
+ q->releaseExclusiveOwnership();
+ if (q->canAutoDelete() &&
+ broker.getQueues().destroyIf(q->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), q))) {
+
+ q->unbind(broker.getExchanges(), q);
+ q->destroy();
+ }
exclusiveQueues.erase(exclusiveQueues.begin());
- q->unbind(broker.getExchanges(), q);
- q->destroy();
}
} catch(std::exception& e) {
QPID_LOG(error, " Unhandled exception while closing session: " <<
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 17ace522c3..7ee9106ef0 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -67,7 +67,7 @@ namespace qpid {
const string name;
const bool autodelete;
MessageStore* const store;
- const ConnectionToken* const owner;
+ const ConnectionToken* owner;
Consumers acquirers;
Consumers browsers;
Messages messages;
@@ -155,6 +155,8 @@ namespace qpid {
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+ inline void releaseExclusiveOwnership() { owner = 0; }
+ inline void setExclusiveOwner(const ConnectionToken* const o) { owner = o; }
inline bool hasExclusiveConsumer() const { return exclusive; }
inline bool hasExclusiveOwner() const { return owner != 0; }
inline bool isDurable() const { return store != 0; }
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 1a766b810a..f73f467945 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -62,11 +62,14 @@ class QueueRegistry{
*
*/
void destroy(const string& name);
- template <class Test> void destroyIf(const string& name, Test test)
+ template <class Test> bool destroyIf(const string& name, Test test)
{
qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
queues.erase(name);
+ return true;
+ } else {
+ return false;
}
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 4214f67bfe..1d49e08eb0 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -290,7 +290,7 @@ void SemanticState::ConsumerImpl::cancel()
{
if(queue) {
queue->cancel(this);
- if (queue->canAutoDelete()) {
+ if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
parent->getSession().getBroker().getQueues().destroyIf(
queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
diff --git a/python/tests_0-10/alternate-exchange.py b/python/tests_0-10/alternate-exchange.py
index a857496a48..83f8d85811 100644
--- a/python/tests_0-10/alternate-exchange.py
+++ b/python/tests_0-10/alternate-exchange.py
@@ -37,13 +37,13 @@ class AlternateExchangeTests(TestBase):
channel.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary")
#declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
- channel.queue_declare(queue="returns", exclusive=True)
+ channel.queue_declare(queue="returns", exclusive=True, auto_delete=True)
channel.queue_bind(queue="returns", exchange="secondary")
self.subscribe(destination="a", queue="returns")
returned = self.client.queue("a")
#declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
- channel.queue_declare(queue="processed", exclusive=True)
+ channel.queue_declare(queue="processed", exclusive=True, auto_delete=True)
channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
self.subscribe(destination="b", queue="processed")
processed = self.client.queue("b")
@@ -71,7 +71,7 @@ class AlternateExchangeTests(TestBase):
channel = self.channel
#set up a 'dead letter queue':
channel.exchange_declare(exchange="dlq", type="fanout")
- channel.queue_declare(queue="deleted", exclusive=True)
+ channel.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
channel.queue_bind(exchange="dlq", queue="deleted")
self.subscribe(destination="dlq", queue="deleted")
dlq = self.client.queue("dlq")
@@ -101,13 +101,13 @@ class AlternateExchangeTests(TestBase):
channel = self.channel
#set up a 'dead letter queue':
channel.exchange_declare(exchange="dlq", type="fanout")
- channel.queue_declare(queue="immediate", exclusive=True)
+ channel.queue_declare(queue="immediate", exclusive=True, auto_delete=True)
channel.queue_bind(exchange="dlq", queue="immediate")
self.subscribe(destination="dlq", queue="immediate")
dlq = self.client.queue("dlq")
#create a queue using the dlq as its alternate exchange:
- channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True)
+ channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True, auto_delete=True)
#send it some messages:
#TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK
channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"}))
@@ -128,7 +128,7 @@ class AlternateExchangeTests(TestBase):
"""
channel = self.channel
channel.exchange_declare(exchange="alternate", type="fanout")
- channel.queue_declare(queue="q", exclusive=True, alternate_exchange="alternate")
+ channel.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate")
try:
channel.exchange_delete(exchange="alternate")
self.fail("Expected deletion of in-use alternate-exchange to fail")
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index ce6e467e0a..8fdd32c2f5 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -246,8 +246,8 @@ class DtxTests(TestBase):
channel2.dtx_demarcation_select()
#setup
- channel1.queue_declare(queue="one", exclusive=True)
- channel1.queue_declare(queue="two", exclusive=True)
+ channel1.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ channel1.queue_declare(queue="two", exclusive=True, auto_delete=True)
channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
@@ -282,8 +282,8 @@ class DtxTests(TestBase):
channel.dtx_demarcation_select()
#setup
- channel.queue_declare(queue="one", exclusive=True)
- channel.queue_declare(queue="two", exclusive=True)
+ channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
@@ -352,7 +352,7 @@ class DtxTests(TestBase):
"""
channel = self.client.channel(2)
channel.session_open()
- channel.queue_declare(queue="tx-queue", exclusive=True)
+ channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
#publish a message under a transaction
channel.dtx_demarcation_select()
@@ -389,7 +389,7 @@ class DtxTests(TestBase):
other = self.connect()
tester = other.channel(1)
tester.session_open()
- tester.queue_declare(queue="dummy", exclusive=True)
+ tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
tester.dtx_demarcation_select()
tx = self.xid("dummy")
tester.dtx_demarcation_start(xid=tx)
@@ -423,7 +423,7 @@ class DtxTests(TestBase):
other = self.connect()
tester = other.channel(1)
tester.session_open()
- tester.queue_declare(queue="dummy", exclusive=True)
+ tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
tester.dtx_demarcation_select()
tx = self.xid("dummy")
tester.dtx_demarcation_start(xid=tx)
@@ -455,7 +455,7 @@ class DtxTests(TestBase):
channel2.session_open()
#setup:
- channel2.queue_declare(queue="dummy", exclusive=True)
+ channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
tx = self.xid("dummy")
@@ -498,8 +498,8 @@ class DtxTests(TestBase):
channel.session_open()
#setup:
tx = self.xid("dummy")
- channel.queue_declare(queue="queue-a", exclusive=True)
- channel.queue_declare(queue="queue-b", exclusive=True)
+ channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage"))
channel.dtx_demarcation_select()
@@ -524,7 +524,7 @@ class DtxTests(TestBase):
channel = self.channel
channel.dtx_demarcation_select()
- channel.queue_declare(queue="dummy", exclusive=True)
+ channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
prepared = []
for i in range(1, 10):
@@ -575,8 +575,8 @@ class DtxTests(TestBase):
def txswap(self, tx, id):
channel = self.channel
#declare two queues:
- channel.queue_declare(queue="queue-a", exclusive=True)
- channel.queue_declare(queue="queue-b", exclusive=True)
+ channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
#put message with specified id on one queue:
channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage"))
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index 3e014838aa..c414b15214 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -31,8 +31,8 @@ class MessageTests(TestBase):
"""
channel = self.channel
#setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True)
+ channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
#establish two consumers one of which excludes delivery of locally sent messages
self.subscribe(destination="local_included", queue="test-queue-1a")
self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
@@ -58,7 +58,7 @@ class MessageTests(TestBase):
"""
channel = self.channel
#setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True)
+ channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
#check that an exclusive consumer prevents other consumer being created:
self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
@@ -107,7 +107,7 @@ class MessageTests(TestBase):
"""
channel = self.channel
#setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True)
+ channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
#check that attempts to use duplicate tags are detected and prevented:
self.subscribe(destination="first", queue="test-queue-3")
@@ -123,7 +123,7 @@ class MessageTests(TestBase):
"""
channel = self.channel
#setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True)
+ channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
self.subscribe(destination="my-consumer", queue="test-queue-4")
channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
@@ -148,7 +148,7 @@ class MessageTests(TestBase):
Test basic ack/recover behaviour
"""
channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True)
+ channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True)
self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
@@ -193,9 +193,9 @@ class MessageTests(TestBase):
Test recover behaviour
"""
channel = self.channel
- channel.queue_declare(queue="queue-a", exclusive=True)
+ channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
channel.queue_bind(exchange="amq.fanout", queue="queue-a")
- channel.queue_declare(queue="queue-b", exclusive=True)
+ channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
channel.queue_bind(exchange="amq.fanout", queue="queue-b")
self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
@@ -233,7 +233,7 @@ class MessageTests(TestBase):
Test requeing on recovery
"""
channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True)
+ channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
@@ -296,7 +296,7 @@ class MessageTests(TestBase):
"""
#setup: declare queue and subscribe
channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True)
subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
@@ -338,7 +338,7 @@ class MessageTests(TestBase):
"""
#setup: declare queue and subscribe
channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True)
subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
@@ -382,8 +382,8 @@ class MessageTests(TestBase):
def test_reject(self):
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, alternate_exchange="amq.fanout")
- channel.queue_declare(queue = "r", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+ channel.queue_declare(queue = "r", exclusive=True, auto_delete=True)
channel.queue_bind(queue = "r", exchange = "amq.fanout")
self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
@@ -402,7 +402,7 @@ class MessageTests(TestBase):
"""
#declare an exclusive queue
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
@@ -432,7 +432,7 @@ class MessageTests(TestBase):
"""
#declare an exclusive queue
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
@@ -464,7 +464,7 @@ class MessageTests(TestBase):
"""
#declare an exclusive queue
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
@@ -496,7 +496,7 @@ class MessageTests(TestBase):
"""
#declare an exclusive queue
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
@@ -535,7 +535,7 @@ class MessageTests(TestBase):
#existing tests twice.
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 6):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
@@ -562,7 +562,7 @@ class MessageTests(TestBase):
Test explicit acquire function
"""
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
@@ -578,7 +578,7 @@ class MessageTests(TestBase):
Test explicit release function
"""
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
@@ -595,7 +595,7 @@ class MessageTests(TestBase):
Test order of released messages is as expected
"""
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range (1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i)))
@@ -618,7 +618,7 @@ class MessageTests(TestBase):
Test acking of messages ranges
"""
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range (1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i)))
diff --git a/python/tests_0-10/query.py b/python/tests_0-10/query.py
index 06f33be85b..eba2ee6dd1 100644
--- a/python/tests_0-10/query.py
+++ b/python/tests_0-10/query.py
@@ -65,8 +65,8 @@ class QueryTests(TestBase):
def binding_query_with_key(self, exchange_name):
channel = self.channel
#setup: create two queues
- channel.queue_declare(queue="used-queue", exclusive=True)
- channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
@@ -135,8 +135,8 @@ class QueryTests(TestBase):
"""
channel = self.channel
#setup
- channel.queue_declare(queue="used-queue", exclusive=True)
- channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
channel.queue_bind(exchange="amq.fanout", queue="used-queue")
# test detection of any binding to specific queue
@@ -163,8 +163,8 @@ class QueryTests(TestBase):
"""
channel = self.channel
#setup
- channel.queue_declare(queue="used-queue", exclusive=True)
- channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
# test detection of any binding to specific queue
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index ba017bb286..d72d12f92d 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -31,7 +31,7 @@ class QueueTests(TestBase):
channel = self.channel
#setup, declare a queue and add some messages to it:
channel.exchange_declare(exchange="test-exchange", type="direct")
- channel.queue_declare(queue="test-queue", exclusive=True)
+ channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"}))
channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"}))
@@ -91,10 +91,10 @@ class QueueTests(TestBase):
c2.session_open()
#declare an exclusive queue:
- c1.queue_declare(queue="exclusive-queue", exclusive="True")
+ c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
try:
#other connection should not be allowed to declare this:
- c2.queue_declare(queue="exclusive-queue", exclusive="True")
+ c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
self.fail("Expected second exclusive queue_declare to raise a channel exception")
except Closed, e:
self.assertChannelException(405, e.args[0])
@@ -106,11 +106,11 @@ class QueueTests(TestBase):
"""
channel = self.channel
#declare an exclusive queue:
- channel.queue_declare(queue="passive-queue-1", exclusive="True")
- channel.queue_declare(queue="passive-queue-1", passive="True")
+ channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="passive-queue-1", passive=True)
try:
#other connection should not be allowed to declare this:
- channel.queue_declare(queue="passive-queue-2", passive="True")
+ channel.queue_declare(queue="passive-queue-2", passive=True)
self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -121,7 +121,7 @@ class QueueTests(TestBase):
Test various permutations of the queue.bind method
"""
channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive="True")
+ channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
#straightforward case, both exchange & queue exist so no errors expected:
channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
@@ -166,8 +166,8 @@ class QueueTests(TestBase):
#bind two queues and consume from them
channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive="True")
- channel.queue_declare(queue="queue-2", exclusive="True")
+ channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
self.subscribe(queue="queue-1", destination="queue-1")
self.subscribe(queue="queue-2", destination="queue-2")
@@ -218,7 +218,7 @@ class QueueTests(TestBase):
channel.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
- channel.queue_declare(queue="delete-me", passive="True")
+ channel.queue_declare(queue="delete-me", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -227,7 +227,7 @@ class QueueTests(TestBase):
channel = self.client.channel(2)
channel.session_open()
try:
- channel.queue_delete(queue="i-dont-exist", if_empty="True")
+ channel.queue_delete(queue="i-dont-exist", if_empty=True)
self.fail("Expected delete of non-existant queue to fail")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -242,12 +242,12 @@ class QueueTests(TestBase):
#create a queue and add a message to it (use default binding):
channel.queue_declare(queue="delete-me-2")
- channel.queue_declare(queue="delete-me-2", passive="True")
+ channel.queue_declare(queue="delete-me-2", passive=True)
channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
#try to delete, but only if empty:
try:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
+ channel.queue_delete(queue="delete-me-2", if_empty=True)
self.fail("Expected delete if_empty to fail for non-empty queue")
except Closed, e:
self.assertChannelException(406, e.args[0])
@@ -264,11 +264,11 @@ class QueueTests(TestBase):
channel.message_cancel(destination="consumer_tag")
#retry deletion on empty queue:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
+ channel.queue_delete(queue="delete-me-2", if_empty=True)
#check that it has gone by declaring passively:
try:
- channel.queue_declare(queue="delete-me-2", passive="True")
+ channel.queue_declare(queue="delete-me-2", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -281,7 +281,7 @@ class QueueTests(TestBase):
#create a queue and register a consumer:
channel.queue_declare(queue="delete-me-3")
- channel.queue_declare(queue="delete-me-3", passive="True")
+ channel.queue_declare(queue="delete-me-3", passive=True)
self.subscribe(destination="consumer_tag", queue="delete-me-3")
#need new channel now:
@@ -289,17 +289,17 @@ class QueueTests(TestBase):
channel2.session_open()
#try to delete, but only if empty:
try:
- channel2.queue_delete(queue="delete-me-3", if_unused="True")
+ channel2.queue_delete(queue="delete-me-3", if_unused=True)
self.fail("Expected delete if_unused to fail for queue with existing consumer")
except Closed, e:
self.assertChannelException(406, e.args[0])
channel.message_cancel(destination="consumer_tag")
- channel.queue_delete(queue="delete-me-3", if_unused="True")
+ channel.queue_delete(queue="delete-me-3", if_unused=True)
#check that it has gone by declaring passively:
try:
- channel.queue_declare(queue="delete-me-3", passive="True")
+ channel.queue_declare(queue="delete-me-3", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py
index 84c07d51c1..6a682e9ae7 100644
--- a/python/tests_0-10/tx.py
+++ b/python/tests_0-10/tx.py
@@ -155,9 +155,9 @@ class TxTests(TestBase):
commit and rollback
"""
#setup:
- channel.queue_declare(queue=name_a, exclusive=True)
- channel.queue_declare(queue=name_b, exclusive=True)
- channel.queue_declare(queue=name_c, exclusive=True)
+ channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
+ channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
+ channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
key = "my_key_" + name_b
topic = "my_topic_" + name_c