summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-03-13 18:34:58 +0000
committerTed Ross <tross@apache.org>2013-03-13 18:34:58 +0000
commit89eadd5da45b77e8c6eba79cffacc222334a13f7 (patch)
tree64efa8188ce7c2f1d98376d8e437c0bcae91f0ea
parenta5fe06779d2c196ea954fc91a79dcba18203e38e (diff)
downloadqpid-python-89eadd5da45b77e8c6eba79cffacc222334a13f7.tar.gz
QPID-4559 - Added handling of the queue-delete preconditions in the qmf broker method.
Patch contributed by Ernie Allen. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1456081 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rwxr-xr-xqpid/cpp/src/tests/cli_tests.py39
-rwxr-xr-xqpid/tools/src/py/qpid-config3
-rw-r--r--qpid/tools/src/py/qpidtoollibs/broker.py9
5 files changed, 70 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 47296d94b0..a9887f9c35 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -624,6 +624,10 @@ const std::string SRC_IS_QUEUE("srcIsQueue");
const std::string SRC_IS_LOCAL("srcIsLocal");
const std::string DYNAMIC("dynamic");
const std::string SYNC("sync");
+
+// parameters for deleting a Queue object
+const std::string IF_EMPTY("if_empty");
+const std::string IF_UNUSED("if_unused");
}
struct InvalidBindingIdentifier : public qpid::Exception
@@ -890,7 +894,14 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
}
QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
if (type == TYPE_QUEUE) {
- deleteQueue(name, userId, connectionId);
+ // extract ifEmpty and ifUnused from options
+ bool ifUnused = false, ifEmpty = false;
+ for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
+ if (i->first == IF_UNUSED) ifUnused = i->second.asBool();
+ else if (i->first == IF_EMPTY) ifEmpty = i->second.asBool();
+ }
+ deleteQueue(name, userId, connectionId,
+ boost::bind(&Broker::checkDeleteQueue, this, _1, ifUnused, ifEmpty));
} else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
deleteExchange(name, userId, connectionId);
} else if (type == TYPE_BINDING) {
@@ -909,7 +920,17 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
} else {
throw UnknownObjectType(type);
}
+}
+void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
+{
+ if(ifEmpty && queue->getMessageCount() > 0) {
+ throw qpid::framing::PreconditionFailedException(QPID_MSG("Cannot delete queue "
+ << queue->getName() << "; queue not empty"));
+ } else if(ifUnused && queue->getConsumerCount() > 0) {
+ throw qpid::framing::PreconditionFailedException(QPID_MSG("Cannot delete queue "
+ << queue->getName() << "; queue in use"));
+ }
}
Manageable::status_t Broker::queryObject(const std::string& type,
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index be2b9bef75..cfd96c9913 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -152,6 +152,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
void deleteObject(const std::string& type, const std::string& name,
const qpid::types::Variant::Map& options, const ConnectionState* context);
+ void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
Manageable::status_t queryObject(const std::string& type, const std::string& name,
qpid::types::Variant::Map& results, const ConnectionState* context);
Manageable::status_t queryQueue( const std::string& name,
diff --git a/qpid/cpp/src/tests/cli_tests.py b/qpid/cpp/src/tests/cli_tests.py
index 7ac5b1deed..dceafc5427 100755
--- a/qpid/cpp/src/tests/cli_tests.py
+++ b/qpid/cpp/src/tests/cli_tests.py
@@ -135,6 +135,44 @@ class CliTests(TestBase010):
found = True
self.assertEqual(found, False)
+ def test_qpid_config_del_nonempty_queue(self):
+ self.startBrokerAccess();
+ qname = "test_qpid_config_del"
+
+ ret = os.system(self.qpid_config_command(" add queue " + qname))
+ self.assertEqual(ret, 0)
+ queues = self.broker_access.getAllQueues()
+ found = False
+ for queue in queues:
+ if queue.name == qname:
+ self.assertEqual(queue.durable, False)
+ found = True
+ self.assertEqual(found, True)
+
+ self.startBrokerAccess()
+
+ sess = self.broker_conn.session()
+ tx = sess.sender(qname)
+ tx.send("MESSAGE")
+
+ ret = os.system(self.qpid_config_command(" del queue " + qname))
+ queues = self.broker_access.getAllQueues()
+ found = False
+ for queue in queues:
+ if queue.name == qname:
+ found = True
+ self.assertEqual(found, True)
+
+ ret = os.system(self.qpid_config_command(" del queue " + qname + " --force"))
+ self.assertEqual(ret, 0)
+ queues = self.broker_access.getAllQueues()
+ found = False
+ for queue in queues:
+ if queue.name == qname:
+ found = True
+ self.assertEqual(found, False)
+
+
def test_qpid_config_api(self):
self.startBrokerAccess();
qname = "test_qpid_config_api"
@@ -222,7 +260,6 @@ class CliTests(TestBase010):
self.assertEqual(ret, 0)
self.helper_find_queue(qname, False)
-
# test the bind-queue-to-header-exchange functionality
def test_qpid_config_headers(self):
self.startBrokerAccess();
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index 2bab892c95..66567d0246 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -624,7 +624,8 @@ class BrokerManager:
if len(args) < 1:
Usage()
qname = args[0]
- self.broker.delQueue(qname)
+ self.broker.delQueue(qname, if_empty=config._if_empty, if_unused=config._if_unused)
+
def Bind(self, args):
diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py
index ea31aeabb0..d8b75c3c60 100644
--- a/qpid/tools/src/py/qpidtoollibs/broker.py
+++ b/qpid/tools/src/py/qpidtoollibs/broker.py
@@ -250,8 +250,13 @@ class BrokerAgent(object):
'strict': True}
self._method('create', args)
- def delQueue(self, name):
- args = {'type': 'queue', 'name': name}
+ def delQueue(self, name, if_empty=True, if_unused=True):
+ options = {'if_empty': if_empty,
+ 'if_unused': if_unused}
+
+ args = {'type': 'queue',
+ 'name': name,
+ 'options': options}
self._method('delete', args)
def bind(self, exchange, queue, key, options={}, **kwargs):