summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt6
-rw-r--r--cpp/src/qpid/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp4
-rw-r--r--cpp/src/qpid/broker/Bridge.h2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp21
-rw-r--r--cpp/src/qpid/broker/Broker.h4
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp14
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h9
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxManager.h2
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.h4
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp34
-rw-r--r--cpp/src/qpid/broker/Exchange.h25
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h8
-rw-r--r--cpp/src/qpid/broker/Handle.h2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h8
-rw-r--r--cpp/src/qpid/broker/Link.cpp6
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h2
-rw-r--r--cpp/src/qpid/broker/Lvq.cpp2
-rw-r--r--cpp/src/qpid/broker/PersistableExchange.h10
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp39
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h13
-rw-r--r--cpp/src/qpid/broker/PersistableQueue.h13
-rw-r--r--cpp/src/qpid/broker/Queue.cpp92
-rw-r--r--cpp/src/qpid/broker/Queue.h14
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.cpp52
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.h14
-rw-r--r--cpp/src/qpid/broker/QueueBindings.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueBindings.h2
-rw-r--r--cpp/src/qpid/broker/QueueFactory.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueFactory.h2
-rw-r--r--cpp/src/qpid/broker/QueueHandle.cpp8
-rw-r--r--cpp/src/qpid/broker/QueueHandle.h3
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--cpp/src/qpid/broker/RecoverableExchange.h4
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp11
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.h2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/SimpleQueue.h3
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h8
-rw-r--r--cpp/src/qpid/broker/TransactionalStore.h1
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp10
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h5
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp4
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h5
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.cpp5
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.h3
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp10
-rw-r--r--cpp/src/qpid/xml/XmlExchange.h6
-rw-r--r--cpp/src/tests/ExchangeTest.cpp46
-rw-r--r--cpp/src/tests/HeadersExchangeTest.cpp2
-rw-r--r--cpp/src/tests/QueueTest.cpp6
61 files changed, 393 insertions, 198 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index ee25393f25..9b641558f8 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1139,6 +1139,7 @@ set (qpidbroker_SOURCES
qpid/broker/DtxManager.cpp
qpid/broker/DtxTimeout.cpp
qpid/broker/DtxWorkRecord.cpp
+ qpid/broker/EnqueueHandle.cpp
qpid/broker/ExchangeRegistry.cpp
qpid/broker/FanOutExchange.cpp
qpid/broker/HeadersExchange.cpp
@@ -1153,11 +1154,13 @@ set (qpidbroker_SOURCES
# qpid/broker/MessageStoreModule.cpp
qpid/broker/NameGenerator.cpp
# qpid/broker/NullMessageStore.cpp
+ qpid/broker/QueueAsyncContext.cpp
qpid/broker/QueueBindings.cpp
qpid/broker/QueuedMessage.cpp
qpid/broker/QueueCursor.cpp
qpid/broker/QueueDepth.cpp
qpid/broker/QueueFactory.cpp
+ qpid/broker/QueueHandle.cpp
qpid/broker/QueueRegistry.cpp
qpid/broker/QueueSettings.cpp
qpid/broker/QueueFlowLimit.cpp
@@ -1184,6 +1187,7 @@ set (qpidbroker_SOURCES
qpid/broker/TopicExchange.cpp
qpid/broker/TxAccept.cpp
qpid/broker/TxBuffer.cpp
+ qpid/broker/TxnHandle.cpp
qpid/broker/Vhost.cpp
qpid/broker/amqp_0_10/MessageTransfer.cpp
qpid/management/ManagementAgent.cpp
@@ -1191,8 +1195,6 @@ set (qpidbroker_SOURCES
qpid/management/ManagementTopicExchange.cpp
qpid/sys/TCPIOPlugin.cpp
# New async store objects and new versions of broker objects
-# qpid/broker/AsyncResultHandle.cpp
-# qpid/broker/AsyncResultHandleImpl.cpp
# qpid/broker/IdHandle.cpp
# qpid/broker/TxnAsyncContext.cpp
# qpid/broker/TxnBuffer.cpp
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index e5019604d2..3acf9fe715 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -126,7 +126,7 @@ void SessionState::senderRecord(const AMQFrame& f) {
sender.incomplete += sender.sendPoint.command;
sender.sendPoint.advance(f);
if (config.replayHardLimit && config.replayHardLimit < sender.replaySize)
- throw ResourceLimitExceededException("Replay buffer exceeeded hard limit");
+ throw ResourceLimitExceededException("Replay buffer exceeded hard limit");
}
static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536;
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index d1706b5907..4604ac643f 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -86,7 +86,7 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create(Connection& c)
+void Bridge::create(Connection& c, AsyncStore* const store)
{
detached = false; // Reset detached in case we are recovering.
connState = &c;
@@ -153,7 +153,7 @@ void Bridge::create(Connection& c)
Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
- exchange->registerDynamicBridge(this);
+ exchange->registerDynamicBridge(this, store);
QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index ee298afd45..04ac585d80 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -141,7 +141,7 @@ class Bridge : public PersistableConfig,
bool resetProxy();
// connection Management (called by owning Link)
- void create(Connection& c);
+ void create(Connection& c, AsyncStore* const store);
void cancel(Connection& c);
void closed();
friend class Link; // to call create, cancel, closed()
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 2382205268..08606516d4 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -421,15 +421,14 @@ void Broker::setStore () {
// static
void Broker::recoverComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
// static
void Broker::configureComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
-
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(notice, "Broker running");
@@ -1124,7 +1123,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
if (result.second) {
//add default binding:
- result.first->bind(exchanges.getDefault(), name);
+ result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable());
if (managementAgent.get()) {
//TODO: debatable whether we should raise an event here for
@@ -1206,6 +1205,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
if (durable) {
// store->create(*result.first, arguments);
ConfigHandle ch = asyncStore->createConfigHandle();
+ result.first->setHandle(ch);
boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
asyncStore->submitCreate(ch, result.first.get(), bc);
}
@@ -1249,8 +1249,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
// if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->isDurable()) {
-// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
-// asyncStore->submitDestroy(exchange.getHandle(), bc);
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitDestroy(exchange->getHandle(), bc);
+ exchange->resetHandle();
}
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name);
@@ -1326,11 +1327,11 @@ void Broker::unbind(const std::string& queueName,
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
} else {
- if (exchange->unbind(queue, key, 0)) {
- if (exchange->isDurable() && queue->isDurable()) {
+ if (exchange->unbind(queue, key, 0, asyncStore.get())) {
+// Move this block into Exchange which keeps the broker context.
+// if (exchange->isDurable() && queue->isDurable()) {
// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
- // TODO: kpvdr: Async config destroy here
- }
+// }
getConfigurationObservers().unbind(
exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index e4d1d93423..698d446bca 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -222,7 +222,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
// QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
void setStore(boost::shared_ptr<AsyncStore>& asyncStore);
// MessageStore& getStore() { return *store; }
- AsyncStore& getStore() { return *asyncStore; }
+// AsyncStore& getStore() { return *asyncStore; }
+ AsyncStore* getStore() { return asyncStore.get(); }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
QueueRegistry& getQueues() { return queues; }
@@ -231,6 +232,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
+ AsyncResultQueueImpl& getAsyncResultQueue() { return asyncResultQueue; }
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index c56a1da6cc..b1130c3ec0 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -53,7 +53,7 @@ DirectExchange::DirectExchange(const string& _name, bool _durable,
mgmtExchange->set_type(typeName);
}
-bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const store)
{
string fedOp(fedOpBind);
string fedTags;
@@ -88,6 +88,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
+ persistBind(b, store);
+
} else if (fedOp == fedOpUnbind) {
Mutex::ScopedLock l(lock);
BoundKey& bk = bindings[routingKey];
@@ -97,7 +99,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
- unbind(queue, routingKey, args);
+ unbind(queue, routingKey, args, store);
} else if (fedOp == fedOpReorigin) {
/** gather up all the keys that need rebinding in a local vector
@@ -127,7 +129,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
return true;
}
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
bool propagate = false;
@@ -156,6 +158,12 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
return true;
}
+//boost::shared_ptr<Exchange::Binding> DirectExchange::getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey) {
+// Mutex::ScopedLock l(lock);
+// BoundKey& bk = bindings[routingKey];
+// return bk.queues[routingKey];
+//}
+
void DirectExchange::route(Deliverable& msg)
{
const string& routingKey = msg.getMessage().getRoutingKey();
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index a0e1477d0c..6f0d470c54 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -55,12 +55,17 @@ public:
QPID_BROKER_EXTERN virtual bool bind(boost::shared_ptr<Queue> queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
- virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
+ virtual bool unbind(boost::shared_ptr<Queue> queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue,
const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
+// boost::shared_ptr<Binding> getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey);
QPID_BROKER_EXTERN virtual ~DirectExchange();
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 7e2eb927a3..c55771c4e6 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -175,7 +175,7 @@ void DtxManager::DtxCleanup::fire()
}
//void DtxManager::setStore (TransactionalStore* _store)
-void DtxManager::setStore (AsyncTransactionalStore* _ats)
+void DtxManager::setStore (AsyncTransactionalStore* const _ats)
{
// store = _store;
asyncTxnStore = _ats;
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index fe20a89c32..cbc66d6391 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -68,7 +68,7 @@ public:
uint32_t getTimeout(const std::string& xid);
void timedout(const std::string& xid);
// void setStore(TransactionalStore* store);
- void setStore(AsyncTransactionalStore* ats);
+ void setStore(AsyncTransactionalStore* const ats);
void setTimer(sys::Timer& t) { timer = &t; }
// Used by cluster for replication.
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h
index 9dd86bdcad..579579df2d 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -50,7 +50,7 @@ class DtxWorkRecord
const std::string xid;
// TransactionalStore* const store;
- AsyncTransactionalStore* const asyncTxnStore;
+ AsyncTransactionalStore* asyncTxnStore;
bool completed;
bool rolledback;
bool prepared;
@@ -66,7 +66,7 @@ class DtxWorkRecord
public:
QPID_BROKER_EXTERN DtxWorkRecord(const std::string& xid,
// TransactionalStore* const store);
- AsyncTransactionalStore* const store);
+ AsyncTransactionalStore* const store);
QPID_BROKER_EXTERN ~DtxWorkRecord();
QPID_BROKER_EXTERN bool prepare();
QPID_BROKER_EXTERN bool commit(bool onePhase);
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index bb5dc2b807..2414981481 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -19,7 +19,9 @@
*
*/
+#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/ConfigAsyncContext.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -299,7 +301,7 @@ ManagementObject* Exchange::GetManagementObject (void) const
return (ManagementObject*) mgmtExchange;
}
-void Exchange::registerDynamicBridge(DynamicBridge* db)
+void Exchange::registerDynamicBridge(DynamicBridge* db, AsyncStore* const store)
{
if (!supportsDynamicBinding())
throw Exception("Exchange type does not support dynamic binding");
@@ -315,7 +317,7 @@ void Exchange::registerDynamicBridge(DynamicBridge* db)
FieldTable args;
args.setString(qpidFedOp, fedOpReorigin);
- bind(Queue::shared_ptr(), string(), &args);
+ bind(Queue::shared_ptr(), string(), &args, store);
}
void Exchange::removeDynamicBridge(DynamicBridge* db)
@@ -344,8 +346,8 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons
}
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
- FieldTable _args, const string& _origin)
- : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0)
+ FieldTable _args, const string& _origin, ConfigHandle _cfgHandle)
+ : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle), mgmtBinding(0)
{
}
@@ -388,6 +390,30 @@ ManagementObject* Exchange::Binding::GetManagementObject () const
return (ManagementObject*) mgmtBinding;
}
+uint64_t Exchange::Binding::getSize() { return 0; } // TODO: kpvdr: implement persistence
+void Exchange::Binding::write(char* /*target*/) {} // TODO: kpvdr: implement persistence
+
+void Exchange::persistBind(Binding::shared_ptr b, AsyncStore* const s) {
+ if (s && broker != 0 && b->queue->isDurable() && isDurable()) {
+ b->cfgHandle = s->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue()));
+ s->submitCreate(b->cfgHandle, b.get(), bc);
+ }
+}
+
+void Exchange::persistUnbind(Binding::shared_ptr b, AsyncStore* const s) {
+ if (s && broker != 0 && b->queue->isDurable() && isDurable()) {
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue()));
+ s->submitDestroy(b->cfgHandle, bc);
+ b->cfgHandle.reset();
+ }
+}
+
+// static
+void Exchange::configureComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Exchange: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {}
bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index b4c6b799a4..df6d5d05a4 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -23,7 +23,6 @@
*/
#include <boost/shared_ptr.hpp>
-#include <qpid/broker/AsyncStore.h>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
@@ -36,7 +35,7 @@
#include "qmf/org/apache/qpid/broker/Binding.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
-#include <set>
+//#include <set>
namespace qpid {
namespace broker {
@@ -44,9 +43,9 @@ namespace broker {
class Broker;
class ExchangeRegistry;
-class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public DataSource, public management::Manageable {
+class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
public:
- struct Binding : public management::Manageable {
+ struct Binding : public DataSource, public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
@@ -55,13 +54,19 @@ public:
const std::string key;
const framing::FieldTable args;
std::string origin;
+ ConfigHandle cfgHandle;
qmf::org::apache::qpid::broker::Binding* mgmtBinding;
Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0,
- framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
+ framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string(),
+ ConfigHandle cfgHandle = ConfigHandle());
~Binding();
void startManagement();
management::ManagementObject* GetManagementObject() const;
+
+ // DataSource implementation - allows for persistence
+ uint64_t getSize();
+ void write(char* target);
};
private:
@@ -71,6 +76,7 @@ private:
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ static void configureComplete(const AsyncResultHandle* const);
protected:
mutable qpid::framing::FieldTable args;
@@ -92,6 +98,8 @@ protected:
typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
void doRoute(Deliverable& msg, ConstBindingList b);
void routeIVE();
+ void persistBind(Binding::shared_ptr b, AsyncStore* const s);
+ void persistUnbind(Binding::shared_ptr b, AsyncStore* const s);
struct MatchQueue {
@@ -197,9 +205,10 @@ public:
*
*/
- virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store) = 0;
+ virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store) = 0;
virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
+// virtual boost::shared_ptr<Binding> getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey) = 0;
//QPID_BROKER_EXTERN virtual void setProperties(Message&);
virtual void route(Deliverable& msg) = 0;
@@ -224,7 +233,7 @@ public:
virtual const std::string& getLocalTag() const = 0;
};
- void registerDynamicBridge(DynamicBridge* db);
+ void registerDynamicBridge(DynamicBridge* db, AsyncStore* const store);
void removeDynamicBridge(DynamicBridge* db);
virtual bool supportsDynamicBinding() { return false; }
Broker* getBroker() const { return broker; }
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 5b7e0c7324..941d909778 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -46,7 +46,7 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args, AsyncStore* const store)
{
string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
string fedTags(args ? args->getAsString(qpidFedTags) : "");
@@ -69,7 +69,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
} else if (fedOp == fedOpUnbind) {
propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
if (fedBinding.countFedBindings(queue->getName()) == 0)
- unbind(queue, "", args);
+ unbind(queue, "", args, store);
} else if (fedOp == fedOpReorigin) {
if (fedBinding.hasLocal()) {
propagateFedOp(string(), string(), fedOpBind, string());
@@ -82,7 +82,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
return true;
}
-bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args, AsyncStore* const /*store*/)
{
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
bool propagate = false;
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index dc301a4266..e6a8f726d6 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -50,9 +50,13 @@ class FanOutExchange : public virtual Exchange {
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
diff --git a/cpp/src/qpid/broker/Handle.h b/cpp/src/qpid/broker/Handle.h
index 397f58f2e7..31c029a512 100644
--- a/cpp/src/qpid/broker/Handle.h
+++ b/cpp/src/qpid/broker/Handle.h
@@ -65,6 +65,8 @@ template <class T> class Handle {
void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; }
+ void reset() { impl = 0; }
+
protected:
typedef T Impl;
Handle() :impl() {}
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index ec19765387..76ffa7a922 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -167,7 +167,7 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
+bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args, AsyncStore* const store)
{
string fedOp(fedOpBind);
string fedTags;
@@ -221,7 +221,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
bindings.modify_if(MatchKey(queue, bindingKey), modifier);
propagate = modifier.shouldPropagate;
if (modifier.shouldUnbind) {
- unbind(queue, bindingKey, args);
+ unbind(queue, bindingKey, args, store);
}
} else if (fedOp == fedOpReorigin) {
@@ -246,7 +246,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
return true;
}
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args, AsyncStore* const /*store*/){
bool propagate = false;
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
{
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index ff0fec4212..10bf9c8c0b 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -91,9 +91,13 @@ class HeadersExchange : public virtual Exchange {
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 416c3b7d34..9727040c9b 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -86,8 +86,8 @@ public:
std::string getType() const { return Link::exchangeTypeName; }
// Exchange methods - set up to prevent binding/unbinding etc from clients!
- bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, AsyncStore* const) { return false; }
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, AsyncStore* const) { return false; }
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;}
// Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
@@ -480,7 +480,7 @@ void Link::ioThreadProcessing()
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
active.push_back(*i);
- (*i)->create(*connection);
+ (*i)->create(*connection, broker->getStore());
}
created.clear();
}
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 43ed208eba..a79081b8ed 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -274,7 +274,7 @@ void LinkRegistry::destroyBridge(Bridge *bridge)
}
//void LinkRegistry::setStore (MessageStore* _store)
-void LinkRegistry::setStore (AsyncStore* _asyncStore) {
+void LinkRegistry::setStore (AsyncStore* const _asyncStore) {
asyncStore = _asyncStore;
}
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index a30f91e2a0..17f45a60c8 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -132,7 +132,7 @@ namespace broker {
* Set the store to use. May only be called once.
*/
// QPID_BROKER_EXTERN void setStore (MessageStore*);
- QPID_BROKER_EXTERN void setStore (AsyncStore*);
+ QPID_BROKER_EXTERN void setStore (AsyncStore* const);
/**
* Return the message store used.
diff --git a/cpp/src/qpid/broker/Lvq.cpp b/cpp/src/qpid/broker/Lvq.cpp
index 0bededb966..f5e66c8a74 100644
--- a/cpp/src/qpid/broker/Lvq.cpp
+++ b/cpp/src/qpid/broker/Lvq.cpp
@@ -27,7 +27,7 @@ namespace qpid {
namespace broker {
//Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
// : Queue(n, s, ms, p, b), messageMap(*m)
-Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* as, management::Manageable* p, Broker* b)
: Queue(n, s, as, p, b), messageMap(*m)
{
messages = m;
diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h
index e1a0853247..ffea40493d 100644
--- a/cpp/src/qpid/broker/PersistableExchange.h
+++ b/cpp/src/qpid/broker/PersistableExchange.h
@@ -23,6 +23,8 @@
*/
#include <string>
+#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/Persistable.h"
namespace qpid {
@@ -32,11 +34,17 @@ namespace broker {
* The interface exchanges must expose to the MessageStore in order to be
* persistable.
*/
-class PersistableExchange : public Persistable
+class PersistableExchange : public Persistable, public DataSource
{
public:
virtual const std::string& getName() const = 0;
virtual ~PersistableExchange() {};
+ ConfigHandle& getHandle() { return configHandle; }
+ void setHandle(ConfigHandle& ch) { configHandle = ch; }
+ void resetHandle() { configHandle.reset(); }
+
+protected:
+ ConfigHandle configHandle;
};
}}
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 2ef9fbfcbb..4645e5526d 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/PersistableMessage.h"
//#include "qpid/broker/MessageStore.h"
//#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/EnqueueHandle.h"
#include <iostream>
using namespace qpid::broker;
@@ -83,6 +84,44 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*)
bool PersistableMessage::isDequeueComplete() { return false; }
void PersistableMessage::dequeueComplete() {}
+MessageHandle& PersistableMessage::createMessageHandle(AsyncStore* const store) {
+ assert (store != 0);
+ msgHandle = store->createMessageHandle(this);
+ return msgHandle;
+}
+
+EnqueueHandle& PersistableMessage::createEnqueueHandle(QueueHandle& queueHandle, AsyncStore* const asyncStore) {
+ std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle);
+ if (ehi == enqueueHandles.end()) {
+ assert (asyncStore != 0);
+ ehi = enqueueHandles.insert(std::pair<QueueHandle, EnqueueHandle>(queueHandle,
+ asyncStore->createEnqueueHandle(msgHandle, queueHandle))).first;
+ }
+ return ehi->second;
+}
+
+void PersistableMessage::removeEnqueueHandle(QueueHandle& queueHandle) {
+ std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle);
+ if (ehi != enqueueHandles.end()) {
+ enqueueHandles.erase(ehi);
+ }
+}
+
+EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) {
+ std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle);
+ assert (ehi != enqueueHandles.end());
+ return ehi->second;
+}
+
+const EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) const {
+ std::map<QueueHandle, EnqueueHandle>::const_iterator ehci = enqueueHandles.find(queueHandle);
+ assert (ehci != enqueueHandles.end());
+ return ehci->second;
+}
+
+uint64_t PersistableMessage::getSize() { return 0; } // TODO: kpvdr: implement
+void PersistableMessage::write(char* /*target*/) {} // TODO: kpvdr: implement
+
}}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 0fd3d169b4..a69d00ca71 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -41,14 +41,16 @@ class Variant;
}
namespace broker {
+class EnqueueHandle;
class MessageStore;
class AsyncStore;
class Queue;
+class QueueHandle;
/**
* Base class for persistable messages.
*/
-class PersistableMessage : public Persistable
+class PersistableMessage : public Persistable, public DataSource
{
/**
* "Ingress" messages == messages sent _to_ the broker.
@@ -63,6 +65,7 @@ class PersistableMessage : public Persistable
boost::intrusive_ptr<AsyncCompletion> holder;
mutable uint64_t persistenceId;
MessageHandle msgHandle;
+ std::map<QueueHandle, EnqueueHandle> enqueueHandles;
public:
PersistableMessage();
@@ -96,9 +99,17 @@ class PersistableMessage : public Persistable
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+ MessageHandle& createMessageHandle(AsyncStore* const store);
MessageHandle& getMessageHandle() { return msgHandle; }
const MessageHandle& getMessagehandle() const { return msgHandle; }
+ EnqueueHandle& createEnqueueHandle(QueueHandle& queueHandle, AsyncStore* const asyncStore);
+ void removeEnqueueHandle(QueueHandle& queueHandle);
+ EnqueueHandle& getEnqueueHandle(QueueHandle& queueHandle);
+ const EnqueueHandle& getEnqueueHandle(QueueHandle& queueHandle) const;
+
+ uint64_t getSize();
+ void write(char* target);
virtual void decodeHeader(framing::Buffer& buffer) = 0;
virtual void decodeContent(framing::Buffer& buffer) = 0;
diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h
index 655d26bc74..ed8c193245 100644
--- a/cpp/src/qpid/broker/PersistableQueue.h
+++ b/cpp/src/qpid/broker/PersistableQueue.h
@@ -23,12 +23,15 @@
*/
#include <string>
+#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/Persistable.h"
+#include "qpid/broker/QueueHandle.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+class AsyncResultHandle;
/**
@@ -48,7 +51,7 @@ public:
* The interface queues must expose to the MessageStore in order to be
* persistable.
*/
-class PersistableQueue : public Persistable
+class PersistableQueue : public Persistable, public DataSource
{
public:
typedef boost::shared_ptr<PersistableQueue> shared_ptr;
@@ -62,14 +65,14 @@ public:
virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
virtual void flush() = 0;
- inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;};
+ inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;}
+ inline QueueHandle& getQueueHandle() { return queueHandle; }
- PersistableQueue():externalQueueStore(NULL){
- };
+ PersistableQueue():externalQueueStore(NULL) {}
protected:
ExternalQueueStore* externalQueueStore;
-
+ QueueHandle queueHandle;
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 40574ded3b..f595b81724 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
//#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/QueueRegistry.h"
//TODO: get rid of this
@@ -818,9 +819,17 @@ bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
// pmsg->enqueueAsync(shared_from_this(), store);
- pmsg->enqueueAsync(shared_from_this(), asyncStore);
// store->enqueue(ctxt, pmsg, *this);
- // TODO - kpvdr: async enqueue here
+ pmsg->enqueueAsync(shared_from_this(), asyncStore);
+ pmsg->createMessageHandle(asyncStore);
+ EnqueueHandle& eh = pmsg->createEnqueueHandle(queueHandle, asyncStore);
+ TxnHandle th; // TODO: kpvdr: Impement transactions
+ boost::shared_ptr<QueueAsyncContext> qac(
+ new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
+ pmsg,
+ &enqueueComplete,
+ &broker->getAsyncResultQueue()));
+ asyncStore->submitEnqueue(eh, th, qac);
}
return true;
}
@@ -893,7 +902,15 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
// if (store && pmsg) {
if (asyncStore && pmsg) {
// store->dequeue(ctxt, pmsg, *this);
- // TODO: kpvdr: async dequeue here
+ pmsg->dequeueAsync(shared_from_this(), asyncStore);
+ TxnHandle th; // TODO: kpvdr: Impement transactions
+ EnqueueHandle& eh = pmsg->getEnqueueHandle(queueHandle);
+ boost::shared_ptr<QueueAsyncContext> qac(
+ new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
+ pmsg,
+ &dequeueComplete,
+ &broker->getAsyncResultQueue()));
+ asyncStore->submitDequeue(eh, th, qac);
}
}
@@ -908,6 +925,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(contentSize);
}
+
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnDequeues += 1;
@@ -997,7 +1015,9 @@ void Queue::create()
// if (store) {
if (asyncStore) {
// store->create(*this, settings.storeSettings);
- // TODO: kpvdr: async store create here
+ queueHandle = asyncStore->createQueueHandle(name, qpid::types::Variant::Map());
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &createComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitCreate(queueHandle, this, qac);
}
}
@@ -1068,12 +1088,14 @@ void Queue::destroyed()
if (asyncStore) {
barrier.destroy();
// store->flush(*this);
- // TODO: kpvdr: async flush here
+ boost::shared_ptr<QueueAsyncContext> flush_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &flushComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitFlush(queueHandle, flush_qac);
// store->destroy(*this);
- // TODO: kpvdr: async destroy here
+ boost::shared_ptr<QueueAsyncContext> destroy_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &destroyComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitDestroy(queueHandle, destroy_qac);
// store = 0;//ensure we make no more calls to the store for this queue
// TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
- // will cause store to be destroyed when all outstanding async ops are complete.
+ // will prevent new calls from succeeding and cause store to be destroyed when all outstanding async ops are complete.
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
@@ -1102,7 +1124,7 @@ void Queue::bound(const string& exchange, const string& key,
void Queue::unbind(ExchangeRegistry& exchanges)
{
- bindings.unbind(exchanges, shared_from_this());
+ bindings.unbind(exchanges, shared_from_this(), asyncStore);
}
uint64_t Queue::getPersistenceId() const
@@ -1274,6 +1296,46 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+uint64_t Queue::getSize() { return 0; } // TODO: kpvdr: implement
+void Queue::write(char* /*target*/) {} // TODO: kpvdr: implement
+
+// static
+void Queue::createComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Create complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::dequeueComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<PersistableQueue> pq = qac->getQueue();
+ boost::intrusive_ptr<PersistableMessage> pmsg = qac->getMessage();
+ QueueHandle& qh = pq->getQueueHandle();
+ pmsg->dequeueComplete();
+ pmsg->removeEnqueueHandle(qh);
+// std::cout << "@@@@ Queue \"" << pq->getName() << "\": Dequeue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::destroyComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ // TODO: kpvdr: set Queue::asyncStore = 0 from here.
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Destroy complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::enqueueComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ qac->getMessage()->enqueueComplete();
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Enqueue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::flushComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Flush complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
void Queue::countRejected() const
{
if (mgmtObject) {
@@ -1464,19 +1526,23 @@ void Queue::flush()
ScopedUse u(barrier);
// if (u.acquired && store) store->flush(*this);
// TODO: kpvdr: Async store flush here
- if (u.acquired && asyncStore) { /*store->flush(*this);*/ }
+ if (u.acquired && asyncStore) {
+ //store->flush(*this);
+ std::cout << "&&&& Queue::flush(): Queue=\"" << name << "\"" << std::endl << std::flush;
+ }
}
bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments)
{
- if (exchange->bind(shared_from_this(), key, &arguments)) {
+ if (exchange->bind(shared_from_this(), key, &arguments, asyncStore)) {
bound(exchange->getName(), key, arguments);
- if (exchange->isDurable() && isDurable()) {
+// Move this to Exchange::bind() which keeps the binding context
+// if (exchange->isDurable() && isDurable()) {
// store->bind(*exchange, *this, key, arguments);
- // TODO: kpvdr: Store configuration here
- }
+// // TODO: kpvdr: Store configuration here
+// }
return true;
} else {
return false;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 28fa8b5ca9..1294f813aa 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -116,7 +116,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const std::string name;
// MessageStore* store;
- AsyncStore* asyncStore;
+ AsyncStore* const asyncStore;
const OwnershipToken* owner;
uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
uint32_t browserCount; // Count of non-acquiring subscriptions.
@@ -194,6 +194,12 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
virtual bool checkDepth(const QueueDepth& increment, const Message&);
+ static void createComplete(const AsyncResultHandle* const arh);
+ static void dequeueComplete(const AsyncResultHandle* const arh);
+ static void destroyComplete(const AsyncResultHandle* const arh);
+ static void enqueueComplete(const AsyncResultHandle* const arh);
+ static void flushComplete(const AsyncResultHandle* const arh);
+
public:
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -233,7 +239,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
*/
QPID_BROKER_EXTERN bool bind(
boost::shared_ptr<Exchange> exchange, const std::string& key,
- const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
+ const qpid::framing::FieldTable& arguments/*=qpid::framing::FieldTable()*/);
/**
* Removes (and dequeues) a message by its sequence number (used
@@ -336,6 +342,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
virtual void setExternalQueueStore(ExternalQueueStore* inst);
+ // Implement DataStore, allows Queue to persist its configuration
+ uint64_t getSize();
+ void write(char* target);
+
// Increment the rejected-by-consumer counter.
QPID_BROKER_EXTERN void countRejected() const;
QPID_BROKER_EXTERN void countFlowedToDisk(uint64_t size) const;
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp
index 1bad5387a3..24ecaf6b5d 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.cpp
+++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp
@@ -33,67 +33,67 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
+ m_pmsg(0),
+ m_tb(0),
m_rcb(rcb),
m_arq(arq)
-{}
+{
+ //assert(m_q.get() != 0);
+}
-/*
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
- m_msg(msg),
+ m_pmsg(msg),
+ m_tb(0),
m_rcb(rcb),
m_arq(arq)
-{}
-*/
+{
+ //assert(m_q.get() != 0);
+ //assert(m_pmsg.get() != 0);
+}
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
+ m_pmsg(0),
m_tb(tb),
m_rcb(rcb),
m_arq(arq)
{
- assert(m_q.get() != 0);
+ //assert(m_q.get() != 0);
}
-/*
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
- m_msg(msg),
+ m_pmsg(msg),
m_tb(tb),
m_rcb(rcb),
m_arq(arq)
{
- assert(m_q.get() != 0);
- assert(m_msg.get() != 0);
+ //assert(m_q.get() != 0);
+ //assert(m_pmsg.get() != 0);
}
-*/
-QueueAsyncContext::~QueueAsyncContext()
-{}
+QueueAsyncContext::~QueueAsyncContext() {}
boost::shared_ptr<PersistableQueue>
-QueueAsyncContext::getQueue() const
-{
+QueueAsyncContext::getQueue() const {
return m_q;
}
-/*
boost::intrusive_ptr<PersistableMessage>
-QueueAsyncContext::getMessage() const
-{
- return m_msg;
+QueueAsyncContext::getMessage() const {
+ return m_pmsg;
}
-*/
SimpleTxnBuffer*
QueueAsyncContext::getTxnBuffer() const {
@@ -101,28 +101,24 @@ QueueAsyncContext::getTxnBuffer() const {
}
AsyncResultQueue*
-QueueAsyncContext::getAsyncResultQueue() const
-{
+QueueAsyncContext::getAsyncResultQueue() const {
return m_arq;
}
AsyncResultCallback
-QueueAsyncContext::getAsyncResultCallback() const
-{
+QueueAsyncContext::getAsyncResultCallback() const {
return m_rcb;
}
void
-QueueAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const
-{
+QueueAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const {
if (m_rcb) {
m_rcb(arh);
}
}
void
-QueueAsyncContext::destroy()
-{
+QueueAsyncContext::destroy() {
delete this;
}
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h
index 4988f2af39..2ce77232b9 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.h
+++ b/cpp/src/qpid/broker/QueueAsyncContext.h
@@ -36,7 +36,7 @@
namespace qpid {
namespace broker {
-//class PersistableMessage;
+class PersistableMessage;
class PersistableQueue;
typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
@@ -47,22 +47,22 @@ public:
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
-/* QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
AsyncResultCallback rcb,
- AsyncResultQueue* const arq);*/
+ AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
-/* QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
- AsyncResultQueue* const arq);*/
+ AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
boost::shared_ptr<PersistableQueue> getQueue() const;
-// boost::intrusive_ptr<PersistableMessage> getMessage() const;
+ boost::intrusive_ptr<PersistableMessage> getMessage() const;
SimpleTxnBuffer* getTxnBuffer() const;
AsyncResultQueue* getAsyncResultQueue() const;
AsyncResultCallback getAsyncResultCallback() const;
@@ -71,7 +71,7 @@ public:
private:
boost::shared_ptr<PersistableQueue> m_q;
-// boost::intrusive_ptr<PersistableMessage> m_msg;
+ boost::intrusive_ptr<PersistableMessage> m_pmsg;
SimpleTxnBuffer* m_tb;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp
index 1cc3486d9a..3d04b3123c 100644
--- a/cpp/src/qpid/broker/QueueBindings.cpp
+++ b/cpp/src/qpid/broker/QueueBindings.cpp
@@ -34,7 +34,7 @@ void QueueBindings::add(const string& exchange, const string& key, const FieldTa
bindings.push_back(QueueBinding(exchange, key, args));
}
-void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
+void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue, AsyncStore* const store)
{
Bindings local;
{
@@ -44,7 +44,7 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
for (Bindings::iterator i = local.begin(); i != local.end(); i++) {
Exchange::shared_ptr ex = exchanges.find(i->exchange);
- if (ex) ex->unbind(queue, i->key, &(i->args));
+ if (ex) ex->unbind(queue, i->key, &(i->args), store);
}
}
diff --git a/cpp/src/qpid/broker/QueueBindings.h b/cpp/src/qpid/broker/QueueBindings.h
index f9b07e7431..7e49c783e5 100644
--- a/cpp/src/qpid/broker/QueueBindings.h
+++ b/cpp/src/qpid/broker/QueueBindings.h
@@ -55,7 +55,7 @@ class QueueBindings
}
void add(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args);
- void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue);
+ void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue, AsyncStore* const store);
private:
mutable sys::Mutex lock;
diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp
index 6ff3f832e4..f2d7fb8d35 100644
--- a/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/cpp/src/qpid/broker/QueueFactory.cpp
@@ -42,7 +42,7 @@ namespace broker {
//QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {}
-QueueFactory::QueueFactory() : broker(0), asyncStore(0), parent(0) {}
+QueueFactory::QueueFactory() : broker(0), asyncStore(), parent(0) {}
boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
{
@@ -103,7 +103,7 @@ Broker* QueueFactory::getBroker()
return broker;
}
//void QueueFactory::setStore (MessageStore* s)
-void QueueFactory::setStore (AsyncStore* as)
+void QueueFactory::setStore (AsyncStore* const as)
{
asyncStore = as;
}
diff --git a/cpp/src/qpid/broker/QueueFactory.h b/cpp/src/qpid/broker/QueueFactory.h
index 9d0048e139..cc28356982 100644
--- a/cpp/src/qpid/broker/QueueFactory.h
+++ b/cpp/src/qpid/broker/QueueFactory.h
@@ -54,7 +54,7 @@ class QueueFactory
* Set the store to use. May only be called once.
*/
// void setStore (MessageStore*);
- void setStore (AsyncStore*);
+ void setStore (AsyncStore* const);
/**
* Return the message store used.
diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp
index 9c8d7eba67..3c647cb66a 100644
--- a/cpp/src/qpid/broker/QueueHandle.cpp
+++ b/cpp/src/qpid/broker/QueueHandle.cpp
@@ -54,12 +54,4 @@ QueueHandle::operator=(const QueueHandle& r)
return PrivateImpl::assign(*this, r);
}
-// --- QueueHandleImpl methods ---
-
-const std::string&
-QueueHandle::getName() const
-{
- return impl->getName();
-}
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h
index 1110367418..b25064d229 100644
--- a/cpp/src/qpid/broker/QueueHandle.h
+++ b/cpp/src/qpid/broker/QueueHandle.h
@@ -45,9 +45,6 @@ public:
~QueueHandle();
QueueHandle& operator=(const QueueHandle& r);
- // --- QueueHandleImpl methods ---
- const std::string& getName() const;
-
private:
friend class PrivateImplRef<QueueHandle>;
};
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 420a9caa28..eb525b6727 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -101,7 +101,7 @@ Queue::shared_ptr QueueRegistry::get(const string& name) {
}
//void QueueRegistry::setStore (MessageStore* _store)
-void QueueRegistry::setStore (AsyncStore* _store)
+void QueueRegistry::setStore (AsyncStore* const _store)
{
QueueFactory::setStore(_store);
}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index b274493f8d..ada76f9cca 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -98,7 +98,7 @@ class QueueRegistry : QueueFactory {
* Set the store to use. May only be called once.
*/
// void setStore (MessageStore*);
- void setStore (AsyncStore*);
+ void setStore (AsyncStore* const);
/**
* Return the message store used.
diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h
index ca6cc1541e..6bda1e2617 100644
--- a/cpp/src/qpid/broker/RecoverableExchange.h
+++ b/cpp/src/qpid/broker/RecoverableExchange.h
@@ -27,6 +27,7 @@
namespace qpid {
namespace broker {
+class AsyncStore;
/**
* The interface through which bindings are recovered.
@@ -42,7 +43,8 @@ public:
*/
virtual void bind(const std::string& queue,
const std::string& routingKey,
- qpid::framing::FieldTable& args) = 0;
+ qpid::framing::FieldTable&,
+ AsyncStore* const store) = 0;
virtual ~RecoverableExchange() {};
};
diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h
index 0cb7c544cd..f2d28c0328 100644
--- a/cpp/src/qpid/broker/RecoveryManager.h
+++ b/cpp/src/qpid/broker/RecoveryManager.h
@@ -31,12 +31,13 @@
namespace qpid {
namespace broker {
+class AsyncStore;
class RecoveryManager{
public:
virtual ~RecoveryManager(){}
virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0;
- virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
+ virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer, AsyncStore* const store) = 0;
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn) = 0;
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 7deeba5e65..f3e1639ca5 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -81,7 +81,7 @@ class RecoverableExchangeImpl : public RecoverableExchange
public:
RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
void setPersistenceId(uint64_t id);
- void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
+ void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args, AsyncStore* const store);
};
class RecoverableConfigImpl : public RecoverableConfig
@@ -113,13 +113,13 @@ RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Bu
}
}
-RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
+RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer, AsyncStore* const store)
{
Queue::shared_ptr queue = Queue::restore(queues, buffer);
try {
Exchange::shared_ptr exchange = exchanges.getDefault();
if (exchange) {
- exchange->bind(queue, queue->getName(), 0);
+ exchange->bind(queue, queue->getName(), 0, store);
queue->bound(exchange->getName(), queue->getName(), framing::FieldTable());
}
} catch (const framing::NotFoundException& /*e*/) {
@@ -238,10 +238,11 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id)
void RecoverableExchangeImpl::bind(const string& queueName,
const string& key,
- framing::FieldTable& args)
+ framing::FieldTable& args,
+ AsyncStore* const store)
{
Queue::shared_ptr queue = queues.find(queueName);
- exchange->bind(queue, key, &args);
+ exchange->bind(queue, key, &args, store);
queue->bound(exchange->getName(), key, args);
}
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h
index 1ad7892b13..7fca0be194 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -42,7 +42,7 @@ namespace broker {
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
- RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
+ RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer, AsyncStore* const store);
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 97d6dc07b0..5fc9a1a932 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -156,7 +156,7 @@ void SemanticState::startTx()
}
//void SemanticState::commit(MessageStore* const store)
-void SemanticState::commit(AsyncStore* const store)
+void SemanticState::commit(AsyncTransactionalStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index a30c7e15b7..9add663e24 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -235,7 +235,7 @@ class SemanticState : private boost::noncopyable {
void startTx();
// void commit(MessageStore* const store);
- void commit(AsyncStore* const store);
+ void commit(AsyncTransactionalStore* const store);
void rollback();
void selectDtx();
bool getDtxSelected() const { return dtxSelected; }
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index a05934ed8e..cb2fe15b58 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -563,7 +563,7 @@ void SessionAdapter::TxHandlerImpl::select()
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore());
+ state.commit(getBroker().getStore());
}
void SessionAdapter::TxHandlerImpl::rollback()
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 88cdf7e03a..944cbad0aa 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -401,7 +401,7 @@ void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
} else {
// this path runs directly from the ac->end() call in handleContent() above,
- // so *session is definately valid.
+ // so *session is definitely valid.
if (session->isAttached()) {
QPID_LOG(debug, ": receive completed for msg seq=" << id);
session->completeRcvMsg(id, requiresAccept, requiresSync);
diff --git a/cpp/src/qpid/broker/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h
index c2f21076cd..da5c2d9ad6 100644
--- a/cpp/src/qpid/broker/SimpleQueue.h
+++ b/cpp/src/qpid/broker/SimpleQueue.h
@@ -51,8 +51,7 @@ class SimpleMessage;
class SimpleTxnBuffer;
class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
- public PersistableQueue,
- public DataSource
+ public PersistableQueue
{
public:
SimpleQueue(const std::string& name,
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index d9871a430b..38d8f255ac 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -156,7 +156,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
@@ -226,7 +226,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
return true;
}
-bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args)
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName()
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index c50ecf1830..329c2f408f 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -96,9 +96,13 @@ public:
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
diff --git a/cpp/src/qpid/broker/TransactionalStore.h b/cpp/src/qpid/broker/TransactionalStore.h
index 2a2bac0c51..9c844c1ee1 100644
--- a/cpp/src/qpid/broker/TransactionalStore.h
+++ b/cpp/src/qpid/broker/TransactionalStore.h
@@ -18,6 +18,7 @@
* under the License.
*
*/
+
#ifndef _TransactionalStore_
#define _TransactionalStore_
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0a66527e98..3a3c9c2954 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -398,7 +398,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->bind(queue, key, &args);
+ exchange->bind(queue, key, &args, 0);
}
}
@@ -418,7 +418,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
+ exchange->unbind(queue, key, &args, 0);
}
}
@@ -514,7 +514,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
<< " key:" << key);
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- exchange->bind(queue, key, &args);
+ exchange->bind(queue, key, &args, 0);
}
}
@@ -616,8 +616,8 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
else return boost::shared_ptr<Exchange>();
}
-bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
-bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
+bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
// DataSource interface - used to write persistence data to async store
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index 109d8c638e..f6983e8719 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -35,6 +35,7 @@
namespace qpid {
namespace broker {
+class AsyncStore;
class Broker;
class Link;
class Bridge;
@@ -71,8 +72,8 @@ class BrokerReplicator : public broker::Exchange,
// Exchange methods
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 8aba7555d4..cac1fdac29 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -190,8 +190,8 @@ void QueueReplicator::route(Deliverable& msg)
}
// Unused Exchange methods.
-bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
-bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*, qpid::broker::AsyncStore* const) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*, qpid::broker::AsyncStore* const) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index a2a158539e..8d8a41a5ba 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -71,9 +71,8 @@ class QueueReplicator : public broker::Exchange,
void deactivate(); // Call before dtor
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue
- >, const std::string&, const framing::FieldTable*);
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp
index c8bfef3785..d1c2b1c34f 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -53,11 +53,12 @@ void ManagementTopicExchange::route(Deliverable& msg)
bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args)
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store)
{
if (qmfVersion == 1)
managementAgent->clientAdded(routingKey);
- return TopicExchange::bind(queue, routingKey, args);
+ return TopicExchange::bind(queue, routingKey, args, store);
}
void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv)
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h
index 0d6b6ad50c..92894855fd 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.h
+++ b/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -47,7 +47,8 @@ class ManagementTopicExchange : public virtual TopicExchange
virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index 22eeff41c5..55800ac464 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -116,7 +116,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args)
+bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const store)
{
// Federation uses bind for unbind and reorigin comands as well as for binds.
@@ -136,7 +136,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, c
}
if (fedOp == fedOpUnbind) {
- return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args);
+ return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args, store);
}
else if (fedOp == fedOpReorigin) {
fedReorigin();
@@ -176,7 +176,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, c
return true;
}
-bool XmlExchange::unbind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args)
+bool XmlExchange::unbind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
/*
* When called directly, no qpidFedOrigin argument will be
@@ -383,11 +383,11 @@ void XmlExchange::propagateFedOp(const std::string& bindingKey, const std::strin
Exchange::propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, propArgs);
}
-bool XmlExchange::fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args)
+bool XmlExchange::fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const store)
{
RWlock::ScopedRlock l(lock);
- if (unbind(queue, bindingKey, args)) {
+ if (unbind(queue, bindingKey, args, store)) {
propagateFedOp(bindingKey, fedTags, fedOpUnbind, fedOrigin);
return true;
}
diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h
index a80588c7ab..1fb9439b9b 100644
--- a/cpp/src/qpid/xml/XmlExchange.h
+++ b/cpp/src/qpid/xml/XmlExchange.h
@@ -76,9 +76,9 @@ class XmlExchange : public virtual Exchange {
virtual std::string getType() const { return typeName; }
- virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store);
virtual void route(Deliverable& msg);
@@ -86,7 +86,7 @@ class XmlExchange : public virtual Exchange {
virtual void propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args=0);
- virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const qpid::framing::FieldTable* args);
+ virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const qpid::framing::FieldTable* args, AsyncStore* const store);
virtual void fedReorigin();
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 4f18b91b5a..259860e6cd 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -51,12 +51,12 @@ QPID_AUTO_TEST_CASE(testMe)
Queue::shared_ptr queue2(new Queue("queue2", true));
TopicExchange topic("topic");
- topic.bind(queue, "abc", 0);
- topic.bind(queue2, "abc", 0);
+ topic.bind(queue, "abc", 0, 0);
+ topic.bind(queue2, "abc", 0, 0);
DirectExchange direct("direct");
- direct.bind(queue, "abc", 0);
- direct.bind(queue2, "abc", 0);
+ direct.bind(queue, "abc", 0, 0);
+ direct.bind(queue2, "abc", 0, 0);
queue.reset();
queue2.reset();
@@ -78,9 +78,9 @@ QPID_AUTO_TEST_CASE(testIsBound)
string k3("xyz");
FanOutExchange fanout("fanout");
- BOOST_CHECK(fanout.bind(a, "", 0));
- BOOST_CHECK(fanout.bind(b, "", 0));
- BOOST_CHECK(fanout.bind(c, "", 0));
+ BOOST_CHECK(fanout.bind(a, "", 0, 0));
+ BOOST_CHECK(fanout.bind(b, "", 0, 0));
+ BOOST_CHECK(fanout.bind(c, "", 0, 0));
BOOST_CHECK(fanout.isBound(a, 0, 0));
BOOST_CHECK(fanout.isBound(b, 0, 0));
@@ -88,10 +88,10 @@ QPID_AUTO_TEST_CASE(testIsBound)
BOOST_CHECK(!fanout.isBound(d, 0, 0));
DirectExchange direct("direct");
- BOOST_CHECK(direct.bind(a, k1, 0));
- BOOST_CHECK(direct.bind(a, k3, 0));
- BOOST_CHECK(direct.bind(b, k2, 0));
- BOOST_CHECK(direct.bind(c, k1, 0));
+ BOOST_CHECK(direct.bind(a, k1, 0, 0));
+ BOOST_CHECK(direct.bind(a, k3, 0, 0));
+ BOOST_CHECK(direct.bind(b, k2, 0, 0));
+ BOOST_CHECK(direct.bind(c, k1, 0, 0));
BOOST_CHECK(direct.isBound(a, 0, 0));
BOOST_CHECK(direct.isBound(a, &k1, 0));
@@ -106,10 +106,10 @@ QPID_AUTO_TEST_CASE(testIsBound)
BOOST_CHECK(!direct.isBound(d, &k3, 0));
TopicExchange topic("topic");
- BOOST_CHECK(topic.bind(a, k1, 0));
- BOOST_CHECK(topic.bind(a, k3, 0));
- BOOST_CHECK(topic.bind(b, k2, 0));
- BOOST_CHECK(topic.bind(c, k1, 0));
+ BOOST_CHECK(topic.bind(a, k1, 0, 0));
+ BOOST_CHECK(topic.bind(a, k3, 0, 0));
+ BOOST_CHECK(topic.bind(b, k2, 0, 0));
+ BOOST_CHECK(topic.bind(c, k1, 0, 0));
BOOST_CHECK(topic.isBound(a, 0, 0));
BOOST_CHECK(topic.isBound(a, &k1, 0));
@@ -137,10 +137,10 @@ QPID_AUTO_TEST_CASE(testIsBound)
args3.setString("c", "C");
args3.setInt("b", 6);
- headers.bind(a, "", &args1);
- headers.bind(a, "", &args3);
- headers.bind(b, "", &args2);
- headers.bind(c, "", &args1);
+ headers.bind(a, "", &args1, 0);
+ headers.bind(a, "", &args3, 0);
+ headers.bind(b, "", &args2, 0);
+ headers.bind(c, "", &args1, 0);
BOOST_CHECK(headers.isBound(a, 0, 0));
BOOST_CHECK(headers.isBound(a, 0, &args1));
@@ -250,10 +250,10 @@ QPID_AUTO_TEST_CASE(testIVEOption)
Queue::shared_ptr queue2(new Queue("queue2", true));
Queue::shared_ptr queue3(new Queue("queue3", true));
- BOOST_CHECK(direct.bind(queue, "abc", 0));
- BOOST_CHECK(fanout.bind(queue1, "abc", 0));
- BOOST_CHECK(header.bind(queue2, "", &args2));
- BOOST_CHECK(topic.bind(queue3, "abc", 0));
+ BOOST_CHECK(direct.bind(queue, "abc", 0, 0));
+ BOOST_CHECK(fanout.bind(queue1, "abc", 0, 0));
+ BOOST_CHECK(header.bind(queue2, "", &args2, 0));
+ BOOST_CHECK(topic.bind(queue3, "abc", 0, 0));
BOOST_CHECK_EQUAL(1u,queue->getMessageCount());
BOOST_CHECK_EQUAL(1u,queue1->getMessageCount());
diff --git a/cpp/src/tests/HeadersExchangeTest.cpp b/cpp/src/tests/HeadersExchangeTest.cpp
index 40deb59c86..7c8ee4a2d9 100644
--- a/cpp/src/tests/HeadersExchangeTest.cpp
+++ b/cpp/src/tests/HeadersExchangeTest.cpp
@@ -109,7 +109,7 @@ QPID_AUTO_TEST_CASE(testBindNoXMatch)
FieldTable args;
try {
//just checking this doesn't cause assertion etc
- exchange.bind(queue, key, &args);
+ exchange.bind(queue, key, &args, 0);
} catch(qpid::Exception&) {
//expected
}
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 7b7c653029..d86c18c38d 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -100,15 +100,15 @@ QPID_AUTO_TEST_CASE(testBound){
ExchangeRegistry exchanges;
//establish bindings from exchange->queue and notify the queue as it is bound:
Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
- exchange1->bind(queue, key, &args);
+ exchange1->bind(queue, key, &args, 0);
queue->bound(exchange1->getName(), key, args);
Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first;
- exchange2->bind(queue, key, &args);
+ exchange2->bind(queue, key, &args, 0);
queue->bound(exchange2->getName(), key, args);
Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first;
- exchange3->bind(queue, key, &args);
+ exchange3->bind(queue, key, &args, 0);
queue->bound(exchange3->getName(), key, args);
//delete one of the exchanges: