summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Exchange.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-10-18 13:37:42 +0000
committerKim van der Riet <kpvdr@apache.org>2012-10-18 13:37:42 +0000
commit172d9b2a16cfb817bbe632d050acba7e31401cd2 (patch)
tree7c5dd5ccba8734a455f20bccaae1cb80a5483b91 /cpp/src/qpid/broker/Exchange.cpp
parentc095a631dcb2c7be5e167ed50f658f7c24330a45 (diff)
downloadqpid-python-172d9b2a16cfb817bbe632d050acba7e31401cd2.tar.gz
WIP - async store interface working for configuration (adding and removing queues, links and exchanges) and for enqueues and dequeues of messages. Transactions are not yet included, and hence some tests will fail.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1399662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp34
1 files changed, 30 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index bb5dc2b807..2414981481 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -19,7 +19,9 @@
*
*/
+#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/ConfigAsyncContext.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -299,7 +301,7 @@ ManagementObject* Exchange::GetManagementObject (void) const
return (ManagementObject*) mgmtExchange;
}
-void Exchange::registerDynamicBridge(DynamicBridge* db)
+void Exchange::registerDynamicBridge(DynamicBridge* db, AsyncStore* const store)
{
if (!supportsDynamicBinding())
throw Exception("Exchange type does not support dynamic binding");
@@ -315,7 +317,7 @@ void Exchange::registerDynamicBridge(DynamicBridge* db)
FieldTable args;
args.setString(qpidFedOp, fedOpReorigin);
- bind(Queue::shared_ptr(), string(), &args);
+ bind(Queue::shared_ptr(), string(), &args, store);
}
void Exchange::removeDynamicBridge(DynamicBridge* db)
@@ -344,8 +346,8 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons
}
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
- FieldTable _args, const string& _origin)
- : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0)
+ FieldTable _args, const string& _origin, ConfigHandle _cfgHandle)
+ : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle), mgmtBinding(0)
{
}
@@ -388,6 +390,30 @@ ManagementObject* Exchange::Binding::GetManagementObject () const
return (ManagementObject*) mgmtBinding;
}
+uint64_t Exchange::Binding::getSize() { return 0; } // TODO: kpvdr: implement persistence
+void Exchange::Binding::write(char* /*target*/) {} // TODO: kpvdr: implement persistence
+
+void Exchange::persistBind(Binding::shared_ptr b, AsyncStore* const s) {
+ if (s && broker != 0 && b->queue->isDurable() && isDurable()) {
+ b->cfgHandle = s->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue()));
+ s->submitCreate(b->cfgHandle, b.get(), bc);
+ }
+}
+
+void Exchange::persistUnbind(Binding::shared_ptr b, AsyncStore* const s) {
+ if (s && broker != 0 && b->queue->isDurable() && isDurable()) {
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue()));
+ s->submitDestroy(b->cfgHandle, bc);
+ b->cfgHandle.reset();
+ }
+}
+
+// static
+void Exchange::configureComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Exchange: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {}
bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)