diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 2382205268..08606516d4 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -421,15 +421,14 @@ void Broker::setStore () { // static void Broker::recoverComplete(const AsyncResultHandle* const arh) { - std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; + std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } // static void Broker::configureComplete(const AsyncResultHandle* const arh) { - std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; + std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } - void Broker::run() { if (config.workerThreads > 0) { QPID_LOG(notice, "Broker running"); @@ -1124,7 +1123,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate); if (result.second) { //add default binding: - result.first->bind(exchanges.getDefault(), name); + result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable()); if (managementAgent.get()) { //TODO: debatable whether we should raise an event here for @@ -1206,6 +1205,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( if (durable) { // store->create(*result.first, arguments); ConfigHandle ch = asyncStore->createConfigHandle(); + result.first->setHandle(ch); boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); asyncStore->submitCreate(ch, result.first.get(), bc); } @@ -1249,8 +1249,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); // if (exchange->isDurable()) store->destroy(*exchange); if (exchange->isDurable()) { -// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); -// asyncStore->submitDestroy(exchange.getHandle(), bc); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + asyncStore->submitDestroy(exchange->getHandle(), bc); + exchange->resetHandle(); } if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); exchanges.destroy(name); @@ -1326,11 +1327,11 @@ void Broker::unbind(const std::string& queueName, } else if (!exchange) { throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName)); } else { - if (exchange->unbind(queue, key, 0)) { - if (exchange->isDurable() && queue->isDurable()) { + if (exchange->unbind(queue, key, 0, asyncStore.get())) { +// Move this block into Exchange which keeps the broker context. +// if (exchange->isDurable() && queue->isDurable()) { // store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); - // TODO: kpvdr: Async config destroy here - } +// } getConfigurationObservers().unbind( exchange, queue, key, framing::FieldTable()); if (managementAgent.get()) { |