summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-10-18 13:37:42 +0000
committerKim van der Riet <kpvdr@apache.org>2012-10-18 13:37:42 +0000
commit172d9b2a16cfb817bbe632d050acba7e31401cd2 (patch)
tree7c5dd5ccba8734a455f20bccaae1cb80a5483b91 /cpp/src/qpid/broker/Broker.cpp
parentc095a631dcb2c7be5e167ed50f658f7c24330a45 (diff)
downloadqpid-python-172d9b2a16cfb817bbe632d050acba7e31401cd2.tar.gz
WIP - async store interface working for configuration (adding and removing queues, links and exchanges) and for enqueues and dequeues of messages. Transactions are not yet included, and hence some tests will fail.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1399662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp21
1 files changed, 11 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 2382205268..08606516d4 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -421,15 +421,14 @@ void Broker::setStore () {
// static
void Broker::recoverComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
// static
void Broker::configureComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
-
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(notice, "Broker running");
@@ -1124,7 +1123,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
if (result.second) {
//add default binding:
- result.first->bind(exchanges.getDefault(), name);
+ result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable());
if (managementAgent.get()) {
//TODO: debatable whether we should raise an event here for
@@ -1206,6 +1205,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
if (durable) {
// store->create(*result.first, arguments);
ConfigHandle ch = asyncStore->createConfigHandle();
+ result.first->setHandle(ch);
boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
asyncStore->submitCreate(ch, result.first.get(), bc);
}
@@ -1249,8 +1249,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
// if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->isDurable()) {
-// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
-// asyncStore->submitDestroy(exchange.getHandle(), bc);
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitDestroy(exchange->getHandle(), bc);
+ exchange->resetHandle();
}
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name);
@@ -1326,11 +1327,11 @@ void Broker::unbind(const std::string& queueName,
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
} else {
- if (exchange->unbind(queue, key, 0)) {
- if (exchange->isDurable() && queue->isDurable()) {
+ if (exchange->unbind(queue, key, 0, asyncStore.get())) {
+// Move this block into Exchange which keeps the broker context.
+// if (exchange->isDurable() && queue->isDurable()) {
// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
- // TODO: kpvdr: Async config destroy here
- }
+// }
getConfigurationObservers().unbind(
exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {