diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 57 |
1 files changed, 29 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 77030855ff..024516fb7b 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -137,17 +137,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri broker.getExchanges().destroy(name); } -void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) +ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(broker.getExchanges().get(name)); - client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - client.queryOk("", false, true, FieldTable()); + return ExchangeQueryResult("", false, true, FieldTable()); } } -void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, +BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, const std::string& exchangeName, const std::string& queueName, const std::string& key, @@ -164,24 +164,40 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, } if (!exchange) { - client.queryOk(true, false, false, false, false); + return BindingQueryResult(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - client.queryOk(false, true, false, false, false); + return BindingQueryResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - client.queryOk(false, false, false, false, false); + return BindingQueryResult(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched); + return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched); } } +QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) +{ + Queue::shared_ptr queue = getQueue(name); + Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); + + return QueueQueryResult(queue->getName(), + alternateExchange ? alternateExchange->getName() : "", + queue->isDurable(), + queue->hasExclusiveOwner(), + queue->isAutoDelete(), + queue->getSettings(), + queue->getMessageCount(), + queue->getConsumerCount()); + +} + void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + bool autoDelete, const qpid::framing::FieldTable& arguments){ Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { alternate = broker.getExchanges().get(alternateExchange); @@ -223,11 +239,6 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& 405, format("Cannot grant exclusive access to queue '%s'") % queue->getName()); - if (!nowait) { - string queueName = queue->getName(); - client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount()); - } } void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, @@ -269,17 +280,13 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, } -void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ - - Queue::shared_ptr queue = getQueue(queueName); - int count = queue->purge(); - if(!nowait) client.purgeOk( count); +void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ + getQueue(queue)->purge(); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ + bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - int count(0); Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw ChannelException(406, "Queue not empty."); @@ -291,14 +298,10 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); } - count = q->getMessageCount(); q->destroy(); broker.getQueues().destroy(queue); q->unbind(broker.getExchanges(), q); } - - if(!nowait) - client.deleteOk(count); } @@ -333,10 +336,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, queue->requestDispatch(); } -void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ channel.cancel(consumerTag); - - if(!nowait) client.cancelOk(consumerTag); } void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, |