summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp57
1 files changed, 29 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 77030855ff..024516fb7b 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -137,17 +137,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri
broker.getExchanges().destroy(name);
}
-void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
+ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
- client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- client.queryOk("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
-void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
+BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
@@ -164,24 +164,40 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
if (!exchange) {
- client.queryOk(true, false, false, false, false);
+ return BindingQueryResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- client.queryOk(false, true, false, false, false);
+ return BindingQueryResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- client.queryOk(false, false, false, false, false);
+ return BindingQueryResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
+QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
+{
+ Queue::shared_ptr queue = getQueue(name);
+ Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
+
+ return QueueQueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
+ queue->hasExclusiveOwner(),
+ queue->isAutoDelete(),
+ queue->getSettings(),
+ queue->getMessageCount(),
+ queue->getConsumerCount());
+
+}
+
void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ bool autoDelete, const qpid::framing::FieldTable& arguments){
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = broker.getExchanges().get(alternateExchange);
@@ -223,11 +239,6 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
405,
format("Cannot grant exclusive access to queue '%s'")
% queue->getName());
- if (!nowait) {
- string queueName = queue->getName();
- client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount());
- }
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
@@ -269,17 +280,13 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
}
-void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
-
- Queue::shared_ptr queue = getQueue(queueName);
- int count = queue->purge();
- if(!nowait) client.purgeOk( count);
+void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
+ getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+ bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- int count(0);
Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw ChannelException(406, "Queue not empty.");
@@ -291,14 +298,10 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
}
- count = q->getMessageCount();
q->destroy();
broker.getQueues().destroy(queue);
q->unbind(broker.getExchanges(), q);
}
-
- if(!nowait)
- client.deleteOk(count);
}
@@ -333,10 +336,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
queue->requestDispatch();
}
-void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
channel.cancel(consumerTag);
-
- if(!nowait) client.cancelOk(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,