summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp21
1 files changed, 11 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 2382205268..08606516d4 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -421,15 +421,14 @@ void Broker::setStore () {
// static
void Broker::recoverComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
// static
void Broker::configureComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
-
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(notice, "Broker running");
@@ -1124,7 +1123,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
if (result.second) {
//add default binding:
- result.first->bind(exchanges.getDefault(), name);
+ result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable());
if (managementAgent.get()) {
//TODO: debatable whether we should raise an event here for
@@ -1206,6 +1205,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
if (durable) {
// store->create(*result.first, arguments);
ConfigHandle ch = asyncStore->createConfigHandle();
+ result.first->setHandle(ch);
boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
asyncStore->submitCreate(ch, result.first.get(), bc);
}
@@ -1249,8 +1249,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
// if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->isDurable()) {
-// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
-// asyncStore->submitDestroy(exchange.getHandle(), bc);
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitDestroy(exchange->getHandle(), bc);
+ exchange->resetHandle();
}
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name);
@@ -1326,11 +1327,11 @@ void Broker::unbind(const std::string& queueName,
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
} else {
- if (exchange->unbind(queue, key, 0)) {
- if (exchange->isDurable() && queue->isDurable()) {
+ if (exchange->unbind(queue, key, 0, asyncStore.get())) {
+// Move this block into Exchange which keeps the broker context.
+// if (exchange->isDurable() && queue->isDurable()) {
// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
- // TODO: kpvdr: Async config destroy here
- }
+// }
getConfigurationObservers().unbind(
exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {