summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-09-24 13:49:13 +0000
committerKim van der Riet <kpvdr@apache.org>2012-09-24 13:49:13 +0000
commitc095a631dcb2c7be5e167ed50f658f7c24330a45 (patch)
treef3c6dc1e3a9f6e12501c1dcb794d18779db477ac
parent0f327ee25b5ab4b9a38a8620a666e6bfc66000e7 (diff)
downloadqpid-python-c095a631dcb2c7be5e167ed50f658f7c24330a45.tar.gz
QPID-3858: WIP: Provisional checkin: Wiring of async store interface to broker. Code compiles, but as persistent transactions are currentl disconnected, not all tests pass.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1389378 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt13
-rw-r--r--cpp/src/asyncstore.cmake3
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp24
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h12
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp102
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h26
-rw-r--r--cpp/src/qpid/asyncStore/ConfigHandleImpl.h3
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/Plugin.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp33
-rw-r--r--cpp/src/qpid/asyncStore/RecoveryHandleImpl.h40
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h28
-rw-r--r--cpp/src/qpid/broker/Broker.cpp90
-rw-r--r--cpp/src/qpid/broker/Broker.h17
-rw-r--r--cpp/src/qpid/broker/ConfigAsyncContext.cpp49
-rw-r--r--cpp/src/qpid/broker/ConfigAsyncContext.h52
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp7
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h5
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp12
-rw-r--r--cpp/src/qpid/broker/DtxManager.h9
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp54
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.h7
-rw-r--r--cpp/src/qpid/broker/Exchange.h7
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h2
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h5
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h5
-rw-r--r--cpp/src/qpid/broker/Link.cpp5
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp43
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h11
-rw-r--r--cpp/src/qpid/broker/LossyQueue.cpp6
-rw-r--r--cpp/src/qpid/broker/LossyQueue.h3
-rw-r--r--cpp/src/qpid/broker/Lvq.cpp6
-rw-r--r--cpp/src/qpid/broker/Lvq.h3
-rw-r--r--cpp/src/qpid/broker/MessageStore.h3
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h3
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h3
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp14
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h8
-rw-r--r--cpp/src/qpid/broker/Queue.cpp65
-rw-r--r--cpp/src/qpid/broker/Queue.h11
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.h8
-rw-r--r--cpp/src/qpid/broker/QueueFactory.cpp22
-rw-r--r--cpp/src/qpid/broker/QueueFactory.h12
-rw-r--r--cpp/src/qpid/broker/QueueHandle.cpp3
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h6
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.h2
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.h2
-rw-r--r--cpp/src/qpid/broker/RecoveryAsyncContext.cpp57
-rw-r--r--cpp/src/qpid/broker/RecoveryAsyncContext.h54
-rw-r--r--cpp/src/qpid/broker/RecoveryHandle.cpp55
-rw-r--r--cpp/src/qpid/broker/RecoveryHandle.h54
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.h7
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp3
-rw-r--r--cpp/src/qpid/broker/SemanticState.h6
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp7
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h5
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp26
-rw-r--r--cpp/src/qpid/broker/TxBuffer.h5
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp4
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h4
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp4
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h4
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.h4
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.h5
-rw-r--r--cpp/src/qpid/store/MessageStorePlugin.cpp3
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp7
-rw-r--r--cpp/src/qpid/xml/XmlExchange.h4
-rw-r--r--cpp/src/tests/AsyncCompletion.cpp63
-rw-r--r--cpp/src/tests/DtxWorkRecordTest.cpp3
-rw-r--r--cpp/src/tests/QueueTest.cpp2
-rw-r--r--cpp/src/tests/TxBufferTest.cpp3
-rw-r--r--cpp/src/tests/test_store.cpp43
78 files changed, 1031 insertions, 277 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index ee9291a41b..ee25393f25 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1122,7 +1122,12 @@ set (qpidbroker_SOURCES
qpid/broker/FifoDistributor.cpp
qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
+ qpid/broker/AsyncResultHandle.cpp
+ qpid/broker/AsyncResultHandleImpl.cpp
+ qpid/broker/AsyncResultQueueImpl.cpp
qpid/broker/Bridge.cpp
+ qpid/broker/ConfigAsyncContext.cpp
+ qpid/broker/ConfigHandle.cpp
qpid/broker/Connection.cpp
qpid/broker/ConnectionHandler.cpp
qpid/broker/ConnectionFactory.cpp
@@ -1145,9 +1150,9 @@ set (qpidbroker_SOURCES
qpid/broker/MessageAdapter.cpp
qpid/broker/MessageBuilder.cpp
qpid/broker/MessageHandle.cpp
- qpid/broker/MessageStoreModule.cpp
+# qpid/broker/MessageStoreModule.cpp
qpid/broker/NameGenerator.cpp
- qpid/broker/NullMessageStore.cpp
+# qpid/broker/NullMessageStore.cpp
qpid/broker/QueueBindings.cpp
qpid/broker/QueuedMessage.cpp
qpid/broker/QueueCursor.cpp
@@ -1156,6 +1161,8 @@ set (qpidbroker_SOURCES
qpid/broker/QueueRegistry.cpp
qpid/broker/QueueSettings.cpp
qpid/broker/QueueFlowLimit.cpp
+ qpid/broker/RecoveryAsyncContext.cpp
+ qpid/broker/RecoveryHandle.cpp
qpid/broker/RecoveryManagerImpl.cpp
qpid/broker/RecoveredEnqueue.cpp
qpid/broker/RecoveredDequeue.cpp
@@ -1444,7 +1451,7 @@ add_definitions(-DBOOST_FILESYSTEM_VERSION=2)
# Now create the config file from all the info learned above.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
${CMAKE_CURRENT_BINARY_DIR}/config.h)
-add_subdirectory(qpid/store)
+#add_subdirectory(qpid/store)
add_subdirectory(tests)
# Support for pkg-config
diff --git a/cpp/src/asyncstore.cmake b/cpp/src/asyncstore.cmake
index e6230483ea..477091a927 100644
--- a/cpp/src/asyncstore.cmake
+++ b/cpp/src/asyncstore.cmake
@@ -54,6 +54,7 @@ set (asyncStore_SOURCES
qpid/asyncStore/PersistableMessageContext.cpp
qpid/asyncStore/Plugin.cpp
qpid/asyncStore/QueueHandleImpl.cpp
+ qpid/asyncStore/RecoveryHandleImpl.cpp
qpid/asyncStore/RunState.cpp
qpid/asyncStore/TxnHandleImpl.cpp
qpid/broker/AsyncResultHandle.cpp
@@ -65,6 +66,8 @@ set (asyncStore_SOURCES
qpid/broker/MessageHandle.cpp
qpid/broker/QueueAsyncContext.cpp
qpid/broker/QueueHandle.cpp
+ qpid/broker/RecoveryAsyncContext.cpp
+ qpid/broker/RecoveryHandle.cpp
qpid/broker/SimpleDeliverable.cpp
qpid/broker/SimpleDeliveryRecord.cpp
qpid/broker/SimpleMessage.cpp
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index 1e3ab51165..2023da2ded 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -25,6 +25,7 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/AsyncResultHandleImpl.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/TxnAsyncContext.h"
@@ -132,6 +133,29 @@ AsyncOpTxnAbort::getOpStr() const {
}
+// --- class AsyncOpRecover ---
+
+AsyncOpRecover::AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(rcvrCtxt), store),
+ m_rcvrHandle(rcvrHandle)
+{}
+
+AsyncOpRecover::~AsyncOpRecover() {}
+
+void
+AsyncOpRecover::executeOp() const {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpRecover::getOpStr() const {
+ return "RECOVER";
+}
+
+
// --- class AsyncOpConfigCreate ---
AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index f22cccad07..ffd6ca44a9 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -90,6 +90,18 @@ private:
};
+class AsyncOpRecover: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt,
+ qpid::broker::AsyncStore* store);
+ virtual ~AsyncOpRecover();
+ virtual void executeOp() const;
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::RecoveryHandle& m_rcvrHandle;
+};
+
class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 9c1ff42fa2..85fd981862 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -29,6 +29,7 @@
#include "qpid/asyncStore/EventHandleImpl.h"
#include "qpid/asyncStore/MessageHandleImpl.h"
#include "qpid/asyncStore/QueueHandleImpl.h"
+#include "qpid/asyncStore/RecoveryHandleImpl.h"
#include "qpid/asyncStore/TxnHandleImpl.h"
#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/EnqueueHandle.h"
@@ -36,6 +37,8 @@
#include "qpid/broker/MessageHandle.h"
#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/QueueHandle.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/RecoveryHandle.h"
#include "qpid/broker/TxnAsyncContext.h"
#include "qpid/broker/TxnHandle.h"
@@ -48,12 +51,17 @@ AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
m_opts(opts),
m_runState(),
m_operations(m_poller)
-{}
+{
+ QPID_LOG(info, "AsyncStoreImpl::AsyncStoreImpl()");
+}
AsyncStoreImpl::~AsyncStoreImpl() {}
void
-AsyncStoreImpl::initialize() {}
+AsyncStoreImpl::initialize(bool truncateFlag,
+ bool saveFlag) {
+ QPID_LOG(info, "AsyncStoreImpl::initialize() truncateFlag=" << (truncateFlag?"T":"F") << " saveFlag=" << (saveFlag?"T":"F"));
+}
uint64_t
AsyncStoreImpl::getNextRid() {
@@ -88,25 +96,42 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid,
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt, this));
- TxnCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) {
+ assert(txnCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, txnCtxt, this));
+ txnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt, this));
- TxnCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) {
+ assert(txnCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, txnCtxt, this));
+ txnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt, this));
- TxnCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) {
+ assert(txnCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, txnCtxt, this));
+ txnCtxt->setOpStr(op->getOpStr());
+ m_operations.submit(op);
+}
+
+qpid::broker::RecoveryHandle
+AsyncStoreImpl::createRecoveryHandle() {
+ return qpid::broker::RecoveryHandle(new RecoveryHandleImpl());
+}
+
+void
+AsyncStoreImpl::submitRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt) {
+ assert(rcvrCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpRecover(rcvrHandle, rcvrCtxt, this));
+ rcvrCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -144,6 +169,7 @@ void
AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -152,6 +178,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -160,25 +187,28 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
void
AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -187,6 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
const qpid::broker::DataSource* const dataSrc,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -196,6 +227,7 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
+ assert(brokerCtxt.get() != 0);
boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
@@ -204,28 +236,30 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt, this));
- QueueCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) {
+ assert(queueCtxt.get() != 0);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, queueCtxt, this));
+ queueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
-int
-AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/,
- qpid::broker::QueueHandle& /*queueHandle*/,
- char* /*data*/,
- uint64_t /*offset*/,
- const uint64_t /*length*/) {
- return 0;
-}
+//int
+//AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/,
+// qpid::broker::QueueHandle& /*queueHandle*/,
+// char* /*data*/,
+// uint64_t /*offset*/,
+// const uint64_t /*length*/) {
+// return 0;
+//}
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 12a7f62c09..dd2cabaa69 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -47,7 +47,7 @@ public:
AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
const AsyncStoreOptions& opts);
virtual ~AsyncStoreImpl();
- void initialize();
+ void initialize(bool truncateFlag = false, bool saveFlag = true);
uint64_t getNextRid(); // Global counter for journal RIDs
// --- Management ---
@@ -65,11 +65,17 @@ public:
qpid::broker::SimpleTxnBuffer* tb);
void submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt);
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt);
void submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
void submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
+
+
+ // --- Interface from AsyncRecoverable ---
+ qpid::broker::RecoveryHandle createRecoveryHandle();
+ void submitRecover(qpid::broker::RecoveryHandle& rcvrHandle,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt);
// --- Interface from AsyncStore ---
@@ -112,12 +118,12 @@ public:
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
- // Legacy - Restore FTD message, is NOT async!
- virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
- qpid::broker::QueueHandle& queueHandle,
- char* data,
- uint64_t offset,
- const uint64_t length);
+// // Legacy - Restore FTD message, is NOT async!
+// virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
+// qpid::broker::QueueHandle& queueHandle,
+// char* data,
+// uint64_t offset,
+// const uint64_t length);
private:
boost::shared_ptr<qpid::sys::Poller> m_poller;
diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
index 17069ec21c..2e49e0adb9 100644
--- a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h
@@ -29,8 +29,7 @@
namespace qpid {
namespace asyncStore {
-class ConfigHandleImpl : public virtual qpid::RefCounted
-{
+class ConfigHandleImpl : public virtual qpid::RefCounted {
public:
ConfigHandleImpl();
virtual ~ConfigHandleImpl();
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 4e05fad10d..dff5387827 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -50,6 +50,8 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) {
try {
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+// DEBUG: kpvdr
+std::cout << "#### OperationQueue::handle(): op=" << (*i)->getOpStr() << std::endl << std::flush;
(*i)->executeOp(); // Do store work here
}
} catch (const std::exception& e) {
diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp
index f6930272a4..7395fc9b87 100644
--- a/cpp/src/qpid/asyncStore/Plugin.cpp
+++ b/cpp/src/qpid/asyncStore/Plugin.cpp
@@ -43,7 +43,7 @@ Plugin::earlyInitialize(Target& target) {
}
m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options));
boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store);
- broker->setAsyncStore(brokerAsyncStore);
+ broker->setStore(brokerAsyncStore);
boost::function<void()> fn = boost::bind(&Plugin::finalize, this);
target.addFinalizer(fn);
QPID_LOG(info, "asyncStore: Initialized using path " << m_options.m_storeDir);
diff --git a/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp
new file mode 100644
index 0000000000..a976f48e17
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoveryHandleImpl.cpp
+ */
+
+#include "RecoveryHandleImpl.h"
+
+namespace qpid {
+namespace asyncStore {
+
+RecoveryHandleImpl::RecoveryHandleImpl() {}
+
+RecoveryHandleImpl::~RecoveryHandleImpl() {}
+
+}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h
new file mode 100644
index 0000000000..8abd0b6f65
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoverHandleImpl.h
+ */
+
+#ifndef qpid_asyncStore_RecoveryHandleImpl_h_
+#define qpid_asyncStore_RecoveryHandleImpl_h_
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace asyncStore {
+
+class RecoveryHandleImpl: public virtual qpid::RefCounted {
+public:
+ RecoveryHandleImpl();
+ virtual ~RecoveryHandleImpl();
+};
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_RecoveryHandleImpl_h_
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index e274a4e196..781418f81c 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -20,6 +20,7 @@
#ifndef qpid_broker_AsyncStore_h_
#define qpid_broker_AsyncStore_h_
+#include "qpid/broker/RecoveryManager.h"
#include "qpid/types/Variant.h" // qpid::types::Variant::Map
#include <boost/shared_ptr.hpp>
@@ -65,9 +66,12 @@ class EnqueueHandle;
class EventHandle;
class MessageHandle;
class QueueHandle;
+class RecoveryHandle;
class TxnHandle;
+class InitAsyncContext;
class QueueAsyncContext;
+class RecoveryAsyncContext;
class TpcTxnAsyncContext;
class TxnAsyncContext;
class SimpleTxnBuffer;
@@ -92,10 +96,20 @@ public:
boost::shared_ptr<TxnAsyncContext>) = 0;
};
+class AsyncRecoverable {
+public:
+ virtual ~AsyncRecoverable() {}
+ virtual RecoveryHandle createRecoveryHandle() = 0;
+ virtual void submitRecover(qpid::broker::RecoveryHandle&,
+ boost::shared_ptr<qpid::broker::RecoveryAsyncContext>) = 0;
+};
+
// Subclassed by store:
-class AsyncStore : public AsyncTransactionalStore {
+class AsyncStore : public AsyncTransactionalStore,
+ public AsyncRecoverable {
public:
virtual ~AsyncStore() {}
+ virtual void initialize(bool truncateFlag = false, bool saveFlag = true) = 0;
// --- Factory methods for creating handles ---
@@ -144,12 +158,12 @@ public:
TxnHandle&,
boost::shared_ptr<QueueAsyncContext>) = 0;
- // Legacy - Restore FTD message, is NOT async!
- virtual int loadContent(MessageHandle&,
- QueueHandle&,
- char* data,
- uint64_t offset,
- const uint64_t length) = 0;
+// // Legacy - Restore FTD message, is NOT async!
+// virtual int loadContent(MessageHandle&,
+// QueueHandle&,
+// char* data,
+// uint64_t offset,
+// const uint64_t length) = 0;
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 2411e0520c..2382205268 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -20,12 +20,16 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/ConfigAsyncContext.h"
+#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/MessageStoreModule.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/MessageStoreModule.h"
+//#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/RecoveryAsyncContext.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/broker/SecureConnectionFactory.h"
@@ -34,6 +38,8 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/RecoveryHandle.h"
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
@@ -190,7 +196,9 @@ Broker::Broker(const Broker::Options& conf) :
managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
: 0),
- store(new NullMessageStore),
+// store(new NullMessageStore),
+// asyncStore(0),
+ asyncResultQueue(poller),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
@@ -270,23 +278,31 @@ Broker::Broker(const Broker::Options& conf) :
MessageGroupManager::setDefaults(conf.defaultMsgGroup);
// If no plugin store module registered itself, set up the null store.
- if (NullMessageStore::isNullStore(store.get()))
- setStore();
+// if (NullMessageStore::isNullStore(store.get()))
+// setStore();
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- if (store.get() != 0) {
+// if (store.get() != 0) {
+ if (asyncStore.get() != 0) {
// The cluster plug-in will setRecovery(false) on all but the first
// broker to join a cluster.
if (getRecovery()) {
+ QPID_LOG(info, "Store recovery starting")
RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
- store->recover(recoverer);
+ RecoveryHandle rh = asyncStore->createRecoveryHandle();
+ boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue));
+ asyncStore->submitRecover(rh, rac);
+// store->recover(recoverer);
}
else {
QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
- store->truncateInit(true); // save old files in subdir
+// store->truncateInit(true); // save old files in subdir
+ asyncStore->initialize(true, true);
}
}
+// debug
+ else QPID_LOG(info, ">>>> No store!!!!")
//ensure standard exchanges exist (done after recovery from store)
declareStandardExchange(amq_direct, DirectExchange::typeName);
@@ -357,10 +373,14 @@ Broker::Broker(const Broker::Options& conf) :
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
{
- bool storeEnabled = store.get() != NULL;
+// bool storeEnabled = store.get() != NULL;
+ bool storeEnabled = asyncStore.get() != NULL;
std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
if (status.second && storeEnabled) {
- store->create(*status.first, framing::FieldTable ());
+// store->create(*status.first, framing::FieldTable ());
+ ConfigHandle ch = asyncStore->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitCreate(ch, status.first.get(), bc);
}
}
@@ -377,23 +397,39 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
-void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+//void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+//{
+// store.reset(new MessageStoreModule (_store));
+// setStore();
+//}
+
+void Broker::setStore(boost::shared_ptr<AsyncStore>& _asyncStore)
{
- store.reset(new MessageStoreModule (_store));
+// asyncStore.reset(_asyncStore.get());
+ asyncStore = _asyncStore;
setStore();
}
-void Broker::setAsyncStore(boost::shared_ptr<AsyncStore>& /*asyncStore*/)
-{
- // TODO: Provide implementation for async store interface
+void Broker::setStore () {
+// queues.setStore (store.get());
+ queues.setStore(asyncStore.get());
+// dtxManager.setStore (store.get());
+ dtxManager.setStore(asyncStore.get());
+// links.setStore (store.get());
+ links.setStore(asyncStore.get());
}
-void Broker::setStore () {
- queues.setStore (store.get());
- dtxManager.setStore (store.get());
- links.setStore (store.get());
+// static
+void Broker::recoverComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
+// static
+void Broker::configureComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(notice, "Broker running");
@@ -926,7 +962,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
return Manageable::STATUS_UNKNOWN_OBJECT;
}
q->query( results );
- return Manageable::STATUS_OK;;
+ return Manageable::STATUS_OK;
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
@@ -1168,7 +1204,10 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
alternate->incAlternateUsers();
}
if (durable) {
- store->create(*result.first, arguments);
+// store->create(*result.first, arguments);
+ ConfigHandle ch = asyncStore->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitCreate(ch, result.first.get(), bc);
}
if (managementAgent.get()) {
//TODO: debatable whether we should raise an event here for
@@ -1208,7 +1247,11 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
- if (exchange->isDurable()) store->destroy(*exchange);
+// if (exchange->isDurable()) store->destroy(*exchange);
+ if (exchange->isDurable()) {
+// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+// asyncStore->submitDestroy(exchange.getHandle(), bc);
+ }
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name);
@@ -1285,7 +1328,8 @@ void Broker::unbind(const std::string& queueName,
} else {
if (exchange->unbind(queue, key, 0)) {
if (exchange->isDurable() && queue->isDurable()) {
- store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+ // TODO: kpvdr: Async config destroy here
}
getConfigurationObservers().unbind(
exchange, queue, key, framing::FieldTable());
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index d1be0f58da..e4d1d93423 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/broker/AsyncResultQueueImpl.h"
#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionFactory.h"
@@ -29,7 +30,7 @@
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionManager.h"
@@ -138,6 +139,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
void declareStandardExchange(const std::string& name, const std::string& type);
void setStore ();
+ static void recoverComplete(const AsyncResultHandle* const);
+ static void configureComplete(const AsyncResultHandle* const);
void setLogLevel(const std::string& level);
std::string getLogLevel();
void createObject(const std::string& type, const std::string& name,
@@ -160,7 +163,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
- std::auto_ptr<MessageStore> store;
+// std::auto_ptr<MessageStore> store;
+// std::auto_ptr<AsyncStore> asyncStore;
+ boost::shared_ptr<AsyncStore> asyncStore;
+ AsyncResultQueueImpl asyncResultQueue;
AclModule* acl;
DataDir dataDir;
ConnectionObservers connectionObservers;
@@ -213,9 +219,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
/** Shut down the broker */
QPID_BROKER_EXTERN virtual void shutdown();
- QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
- void setAsyncStore(boost::shared_ptr<AsyncStore>& asyncStore);
- MessageStore& getStore() { return *store; }
+// QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
+ void setStore(boost::shared_ptr<AsyncStore>& asyncStore);
+// MessageStore& getStore() { return *store; }
+ AsyncStore& getStore() { return *asyncStore; }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
QueueRegistry& getQueues() { return queues; }
diff --git a/cpp/src/qpid/broker/ConfigAsyncContext.cpp b/cpp/src/qpid/broker/ConfigAsyncContext.cpp
new file mode 100644
index 0000000000..ef91f56452
--- /dev/null
+++ b/cpp/src/qpid/broker/ConfigAsyncContext.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigAsyncContext.cpp
+ */
+
+#include "ConfigAsyncContext.h"
+
+namespace qpid {
+namespace broker {
+
+ConfigAsyncContext::ConfigAsyncContext(AsyncResultCallback rcb,
+ AsyncResultQueue* const arq) :
+ m_rcb(rcb),
+ m_arq(arq)
+{}
+
+ConfigAsyncContext::~ConfigAsyncContext() {}
+
+AsyncResultQueue*
+ConfigAsyncContext::getAsyncResultQueue() const {
+ return m_arq;
+}
+
+void
+ConfigAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const {
+ if (m_rcb) {
+ m_rcb(arh);
+ }
+}
+
+}} // namespace qpid
diff --git a/cpp/src/qpid/broker/ConfigAsyncContext.h b/cpp/src/qpid/broker/ConfigAsyncContext.h
new file mode 100644
index 0000000000..ad879a09c4
--- /dev/null
+++ b/cpp/src/qpid/broker/ConfigAsyncContext.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ConfigAsyncContext.h
+ */
+
+#ifndef qpid_broker_ConfigAsyncContext_h_
+#define qpid_broker_ConfigAsyncContext_h_
+
+#include "AsyncStore.h"
+
+namespace qpid {
+namespace broker {
+class AsyncResultHandle;
+class AsyncResultQueue;
+
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+
+class ConfigAsyncContext: public qpid::broker::BrokerAsyncContext
+{
+public:
+ ConfigAsyncContext(AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
+ virtual ~ConfigAsyncContext();
+ virtual AsyncResultQueue* getAsyncResultQueue() const;
+ virtual void invokeCallback(const AsyncResultHandle* const) const;
+
+private:
+ AsyncResultCallback m_rcb;
+ AsyncResultQueue* const m_arq;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_ConfigAsyncContext_h_
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 2fa7ce0fc5..c56a1da6cc 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -199,3 +199,10 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin
DirectExchange::~DirectExchange() {}
const std::string DirectExchange::typeName("direct");
+
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t DirectExchange::getSize() {
+ return 0;
+}
+void DirectExchange::write(char* /*target*/) {}
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 833be52c1c..a0e1477d0c 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -65,6 +65,11 @@ public:
QPID_BROKER_EXTERN virtual ~DirectExchange();
virtual bool supportsDynamicBinding() { return true; }
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
};
}}
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index d482c2c327..7e2eb927a3 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -35,7 +35,8 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
+//DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : asyncTxnStore(0), timer(&t) {}
DtxManager::~DtxManager() {}
@@ -124,7 +125,8 @@ DtxWorkRecord* DtxManager::createWork(const std::string& xid)
throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)"));
} else {
std::string ncxid = xid; // Work around const correctness problems in ptr_map.
- return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
+// return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
+ return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, asyncTxnStore)).first);
}
}
@@ -172,9 +174,11 @@ void DtxManager::DtxCleanup::fire()
}
}
-void DtxManager::setStore (TransactionalStore* _store)
+//void DtxManager::setStore (TransactionalStore* _store)
+void DtxManager::setStore (AsyncTransactionalStore* _ats)
{
- store = _store;
+// store = _store;
+ asyncTxnStore = _ats;
}
std::string DtxManager::convert(const qpid::framing::Xid& xid)
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index 6f03189f66..fe20a89c32 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -22,9 +22,10 @@
#define _DtxManager_
#include <boost/ptr_container/ptr_map.hpp>
+#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxWorkRecord.h"
-#include "qpid/broker/TransactionalStore.h"
+//#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/Xid.h"
#include "qpid/sys/Mutex.h"
@@ -46,7 +47,8 @@ class DtxManager{
};
WorkMap work;
- TransactionalStore* store;
+// TransactionalStore* store;
+ AsyncTransactionalStore* asyncTxnStore;
qpid::sys::Mutex lock;
qpid::sys::Timer* timer;
@@ -65,7 +67,8 @@ public:
void setTimeout(const std::string& xid, uint32_t secs);
uint32_t getTimeout(const std::string& xid);
void timedout(const std::string& xid);
- void setStore(TransactionalStore* store);
+// void setStore(TransactionalStore* store);
+ void setStore(AsyncTransactionalStore* ats);
void setTimer(sys::Timer& t) { timer = &t; }
// Used by cluster for replication.
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index 2c26fec49f..924f953eb8 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -29,8 +29,8 @@ using qpid::sys::Mutex;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
- xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, AsyncTransactionalStore* const _ats) :
+ xid(_xid), asyncTxnStore(_ats), completed(false), rolledback(false), prepared(false), expired(false) {}
DtxWorkRecord::~DtxWorkRecord()
{
@@ -43,14 +43,15 @@ bool DtxWorkRecord::prepare()
{
Mutex::ScopedLock locker(lock);
if (check()) {
- txn = store->begin(xid);
- if (prepare(txn.get())) {
- store->prepare(*txn);
- prepared = true;
- } else {
- abort();
- //TODO: this should probably be flagged as internal error
- }
+// txn = asyncTxnStore->begin(xid);
+// if (prepare(txn.get())) {
+// asyncTxnStore->prepare(*txn);
+// prepared = true;
+// } else {
+// abort();
+// //TODO: this should probably be flagged as internal error
+// }
+ // TODO: kpvdr: Async transaction prepare here
} else {
//some part of the work has been marked rollback only
abort();
@@ -67,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn)
return succeeded;
}
-bool DtxWorkRecord::commit(bool onePhase)
+bool DtxWorkRecord::commit(bool onePhase) // why is onePhase necessary if prepared already contains the necessary state?
{
Mutex::ScopedLock locker(lock);
if (check()) {
@@ -77,7 +78,8 @@ bool DtxWorkRecord::commit(bool onePhase)
throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been prepared, one-phase option not valid!"));
}
- store->commit(*txn);
+// asyncTxnStore->commit(*txn);
+ // TODO: kpvdr: Async transaction commit here
txn.reset();
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
@@ -87,17 +89,20 @@ bool DtxWorkRecord::commit(bool onePhase)
if (!onePhase) {
throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has not been prepared, one-phase option required!"));
}
- std::auto_ptr<TransactionContext> localtxn = store->begin();
- if (prepare(localtxn.get())) {
- store->commit(*localtxn);
- std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
- return true;
- } else {
- store->abort(*localtxn);
- abort();
- //TODO: this should probably be flagged as internal error
- return false;
- }
+// std::auto_ptr<TransactionContext> localtxn = asyncTxnStore->begin();
+// if (prepare(localtxn.get())) {
+// asyncTxnStore->commit(*localtxn);
+// std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+// return true;
+// } else {
+// asyncTxnStore->abort(*localtxn);
+// abort();
+// //TODO: this should probably be flagged as internal error
+// return false;
+// }
+ // TODO: kpvdr: Local transaction async prepare and commit here
+ // temp return value:
+ return true;
}
} else {
//some part of the work has been marked rollback only
@@ -147,7 +152,8 @@ bool DtxWorkRecord::check()
void DtxWorkRecord::abort()
{
if (txn.get()) {
- store->abort(*txn);
+// asyncTxnStore->abort(*txn);
+ // TODO: kpvdr: Async transaction abore here
txn.reset();
}
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h
index 331e42fefd..9dd86bdcad 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -21,6 +21,7 @@
#ifndef _DtxWorkRecord_
#define _DtxWorkRecord_
+#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxTimeout.h"
@@ -48,7 +49,8 @@ class DtxWorkRecord
typedef std::vector<DtxBuffer::shared_ptr> Work;
const std::string xid;
- TransactionalStore* const store;
+// TransactionalStore* const store;
+ AsyncTransactionalStore* const asyncTxnStore;
bool completed;
bool rolledback;
bool prepared;
@@ -63,7 +65,8 @@ class DtxWorkRecord
bool prepare(TransactionContext* txn);
public:
QPID_BROKER_EXTERN DtxWorkRecord(const std::string& xid,
- TransactionalStore* const store);
+// TransactionalStore* const store);
+ AsyncTransactionalStore* const store);
QPID_BROKER_EXTERN ~DtxWorkRecord();
QPID_BROKER_EXTERN bool prepare();
QPID_BROKER_EXTERN bool commit(bool onePhase);
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 2b2f7db934..b4c6b799a4 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -23,10 +23,11 @@
*/
#include <boost/shared_ptr.hpp>
+#include <qpid/broker/AsyncStore.h>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Mutex.h"
@@ -35,13 +36,15 @@
#include "qmf/org/apache/qpid/broker/Binding.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
+#include <set>
+
namespace qpid {
namespace broker {
class Broker;
class ExchangeRegistry;
-class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
+class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public DataSource, public management::Manageable {
public:
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index 27b705fbe5..c5d2483a23 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -24,7 +24,7 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Exchange.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 56c894c129..5b7e0c7324 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -120,3 +120,11 @@ bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const
FanOutExchange::~FanOutExchange() {}
const std::string FanOutExchange::typeName("fanout");
+
+
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t FanOutExchange::getSize() {
+ return 0;
+}
+void FanOutExchange::write(char* /*target*/) {}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index c979fdca25..dc301a4266 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -62,6 +62,11 @@ class FanOutExchange : public virtual Exchange {
QPID_BROKER_EXTERN virtual ~FanOutExchange();
virtual bool supportsDynamicBinding() { return true; }
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
};
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 02c05852ff..ec19765387 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -418,3 +418,9 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
return true;
}
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t HeadersExchange::getSize() {
+ return 0;
+}
+void HeadersExchange::write(char* /*target*/) {}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 2e4669a018..ff0fec4212 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -107,6 +107,11 @@ class HeadersExchange : public virtual Exchange {
static QPID_BROKER_EXTERN bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
};
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 6479e47799..416c3b7d34 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -115,6 +115,11 @@ public:
link = _link;
}
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize() { return 0; }
+ void write(char* /*target*/) {}
+
+
private:
Link *link;
};
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 6f813554fa..43ed208eba 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -42,7 +42,8 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// factored: The persistence element should be factored separately
LinkRegistry::LinkRegistry () :
broker(0),
- parent(0), store(0), passive(false),
+// parent(0), store(0), passive(false),
+ parent(0), asyncStore(0), passive(false),
realm("")
{
}
@@ -59,7 +60,7 @@ class LinkRegistryConnectionObserver : public ConnectionObserver {
LinkRegistry::LinkRegistry (Broker* _broker) :
broker(_broker),
- parent(0), store(0), passive(false),
+ parent(0), asyncStore(0), passive(false),
realm(broker->getOptions().realm)
{
broker->getConnectionObservers().add(
@@ -117,7 +118,11 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
boost::bind(&LinkRegistry::linkDestroyed, this, _1),
durable, authMechanism, username, password, broker,
parent, failover));
- if (durable && store) store->create(*link);
+// if (durable && store) store->create(*link);
+ if (durable && asyncStore) {
+// store->create(*link);
+ // TODO: kpvdr: async create config (link)
+ }
links[name] = link;
pendingLinks[name] = link;
QPID_LOG(debug, "Creating new link; name=" << name );
@@ -213,8 +218,11 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
args, init, queueName, altExchange));
bridges[name] = bridge;
link.add(bridge);
- if (durable && store)
- store->create(*bridge);
+// if (durable && store)
+ if (durable && asyncStore) {
+// store->create(*bridge);
+ // TODO: kpvdr: Async create config (bridge)
+ }
QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
"' from " << src << " to " << dest << " (" << key << ")");
@@ -234,8 +242,11 @@ void LinkRegistry::linkDestroyed(Link *link)
LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
- if (i->second->isDurable() && store)
- store->destroy(*(i->second));
+// if (i->second->isDurable() && store)
+ if (i->second->isDurable() && asyncStore) {
+// store->destroy(*(i->second));
+ // TODO: kpvdr: Async destroy config (link)
+ }
links.erase(i);
}
}
@@ -254,18 +265,22 @@ void LinkRegistry::destroyBridge(Bridge *bridge)
if (link) {
link->cancel(b->second);
}
- if (b->second->isDurable())
- store->destroy(*(b->second));
+// if (b->second->isDurable())
+ if (b->second->isDurable()) {
+// store->destroy(*(b->second));
+ // TODO: kpvdr: Async destroy config (bridge)
+ }
bridges.erase(b);
}
-void LinkRegistry::setStore (MessageStore* _store)
-{
- store = _store;
+//void LinkRegistry::setStore (MessageStore* _store)
+void LinkRegistry::setStore (AsyncStore* _asyncStore) {
+ asyncStore = _asyncStore;
}
-MessageStore* LinkRegistry::getStore() const {
- return store;
+//MessageStore* LinkRegistry::getStore() const {
+AsyncStore* LinkRegistry::getStore() const {
+ return asyncStore;
}
namespace {
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 076ab831c9..a30f91e2a0 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -25,7 +25,7 @@
#include <map>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Bridge.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
@@ -52,7 +52,8 @@ namespace broker {
qpid::sys::Mutex lock;
Broker* broker;
management::Manageable* parent;
- MessageStore* store;
+// MessageStore* store;
+ AsyncStore* asyncStore;
bool passive;
std::string realm;
@@ -130,12 +131,14 @@ namespace broker {
/**
* Set the store to use. May only be called once.
*/
- QPID_BROKER_EXTERN void setStore (MessageStore*);
+// QPID_BROKER_EXTERN void setStore (MessageStore*);
+ QPID_BROKER_EXTERN void setStore (AsyncStore*);
/**
* Return the message store used.
*/
- QPID_BROKER_EXTERN MessageStore* getStore() const;
+// QPID_BROKER_EXTERN MessageStore* getStore() const;
+ QPID_BROKER_EXTERN AsyncStore* getStore() const;
QPID_BROKER_EXTERN std::string getAuthMechanism (const std::string& key);
QPID_BROKER_EXTERN std::string getAuthCredentials (const std::string& key);
diff --git a/cpp/src/qpid/broker/LossyQueue.cpp b/cpp/src/qpid/broker/LossyQueue.cpp
index ee2c3ca794..a4e46a2e2f 100644
--- a/cpp/src/qpid/broker/LossyQueue.cpp
+++ b/cpp/src/qpid/broker/LossyQueue.cpp
@@ -33,8 +33,10 @@ bool isLowerPriorityThan(uint8_t priority, const Message& m)
}
}
-LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
- : Queue(n, s, ms, p, b) {}
+//LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+// : Queue(n, s, ms, p, b) {}
+LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+ : Queue(n, s, as, p, b) {}
bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
{
diff --git a/cpp/src/qpid/broker/LossyQueue.h b/cpp/src/qpid/broker/LossyQueue.h
index 3e62151d6f..da4e1d52e2 100644
--- a/cpp/src/qpid/broker/LossyQueue.h
+++ b/cpp/src/qpid/broker/LossyQueue.h
@@ -32,7 +32,8 @@ namespace broker {
class LossyQueue : public Queue
{
public:
- LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+// LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+ LossyQueue(const std::string&, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*);
bool checkDepth(const QueueDepth& increment, const Message&);
private:
};
diff --git a/cpp/src/qpid/broker/Lvq.cpp b/cpp/src/qpid/broker/Lvq.cpp
index d053616c8a..0bededb966 100644
--- a/cpp/src/qpid/broker/Lvq.cpp
+++ b/cpp/src/qpid/broker/Lvq.cpp
@@ -25,8 +25,10 @@
namespace qpid {
namespace broker {
-Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
- : Queue(n, s, ms, p, b), messageMap(*m)
+//Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+// : Queue(n, s, ms, p, b), messageMap(*m)
+Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+ : Queue(n, s, as, p, b), messageMap(*m)
{
messages = m;
}
diff --git a/cpp/src/qpid/broker/Lvq.h b/cpp/src/qpid/broker/Lvq.h
index 335270a073..3eba381b81 100644
--- a/cpp/src/qpid/broker/Lvq.h
+++ b/cpp/src/qpid/broker/Lvq.h
@@ -35,7 +35,8 @@ class MessageMap;
class Lvq : public Queue
{
public:
- Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+// Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+ Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*);
void push(Message& msg, bool isRecovery=false);
private:
MessageMap& messageMap;
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index ab0225ef6b..5e339574fd 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#error "deprecated in favor of AsyncStore"
+
#ifndef _MessageStore_
#define _MessageStore_
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index cd9fd4c933..4309ee8524 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -19,6 +19,8 @@
*
*/
+#error "deprecated in favor of AsyncStore"
+
#include "qpid/broker/MessageStoreModule.h"
#include "qpid/broker/NullMessageStore.h"
#include <iostream>
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 56b5a3c1ae..e5c271f4fa 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#error "deprecated in favor of AsyncStore"
+
#ifndef _MessageStoreModule_
#define _MessageStoreModule_
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 43f600eaf1..209941875a 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -19,6 +19,8 @@
*
*/
+#error "deprecated in favor of AsyncStore"
+
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/MessageStoreModule.h"
#include "qpid/broker/RecoveryManager.h"
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index c6f402662e..799bf6b368 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#error "deprected in favor of AsyncStore"
+
#ifndef _NullMessageStore_
#define _NullMessageStore_
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 9601b8dcce..2ef9fbfcbb 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -64,19 +64,19 @@ void PersistableMessage::flush()
//TODO: is this really the right place for this?
}
-// deprecated
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
-{
- enqueueStart();
-}
+//// deprecated
+//void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
+//{
+// enqueueStart();
+//}
void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, AsyncStore*)
{
enqueueStart();
}
-// deprecated
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
+//// deprecated
+//void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) {}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index be2910280c..0fd3d169b4 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -80,16 +80,16 @@ class PersistableMessage : public Persistable
QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
- QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
- MessageStore* _store);
+// QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
+// MessageStore* _store);
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
AsyncStore* _store);
QPID_BROKER_EXTERN bool isDequeueComplete();
QPID_BROKER_EXTERN void dequeueComplete();
- QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
- MessageStore* _store);
+// QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
+// MessageStore* _store);
QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
AsyncStore* _store);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 0dd4cb7b10..40574ded3b 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -26,11 +26,11 @@
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
//TODO: get rid of this
@@ -165,12 +165,14 @@ void Queue::TxPublish::rollback() throw()
}
Queue::Queue(const string& _name, const QueueSettings& _settings,
- MessageStore* const _store,
+// MessageStore* const _store,
+ AsyncStore* const _asyncStore,
Manageable* parent,
Broker* b) :
name(_name),
- store(_store),
+// store(_store),
+ asyncStore(_asyncStore),
owner(0),
consumerCount(0),
browserCount(0),
@@ -198,9 +200,11 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+// mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete);
mgmtObject->set_arguments(settings.asMap());
- agent->addObject(mgmtObject, 0, store != 0);
+// agent->addObject(mgmtObject, 0, store != 0);
+ agent->addObject(mgmtObject, 0, asyncStore != 0);
brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
@@ -787,7 +791,7 @@ void Queue::setLastNodeFailure()
* return true if enqueue succeeded and message should be made
* available; returning false will result in the message being dropped
*/
-bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
+bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
@@ -807,13 +811,16 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
msg.addTraceId(settings.traceId);
}
- if (msg.isPersistent() && store) {
+// if (msg.isPersistent() && store) {
+ if (msg.isPersistent() && asyncStore) {
// mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
// when it considers the message stored.
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
- pmsg->enqueueAsync(shared_from_this(), store);
- store->enqueue(ctxt, pmsg, *this);
+// pmsg->enqueueAsync(shared_from_this(), store);
+ pmsg->enqueueAsync(shared_from_this(), asyncStore);
+// store->enqueue(ctxt, pmsg, *this);
+ // TODO - kpvdr: async enqueue here
}
return true;
}
@@ -858,8 +865,10 @@ void Queue::dequeueCommited(const Message& msg)
void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
{
ScopedUse u(barrier);
- if (u.acquired && msg && store) {
- store->dequeue(0, msg, *this);
+// if (u.acquired && msg && store) {
+ if (u.acquired && msg && asyncStore) {
+// store->dequeue(0, msg, *this);
+ // TODO: kpvdr: async dequeue here
}
}
@@ -881,8 +890,10 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
return;
}
}
- if (store && pmsg) {
- store->dequeue(ctxt, pmsg, *this);
+// if (store && pmsg) {
+ if (asyncStore && pmsg) {
+// store->dequeue(ctxt, pmsg, *this);
+ // TODO: kpvdr: async dequeue here
}
}
@@ -983,8 +994,10 @@ void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::Sc
void Queue::create()
{
- if (store) {
- store->create(*this, settings.storeSettings);
+// if (store) {
+ if (asyncStore) {
+// store->create(*this, settings.storeSettings);
+ // TODO: kpvdr: async store create here
}
}
@@ -1051,11 +1064,16 @@ void Queue::destroyed()
alternateExchange->decAlternateUsers();
}
- if (store) {
+// if (store) {
+ if (asyncStore) {
barrier.destroy();
- store->flush(*this);
- store->destroy(*this);
- store = 0;//ensure we make no more calls to the store for this queue
+// store->flush(*this);
+ // TODO: kpvdr: async flush here
+// store->destroy(*this);
+ // TODO: kpvdr: async destroy here
+// store = 0;//ensure we make no more calls to the store for this queue
+ // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
+ // will cause store to be destroyed when all outstanding async ops are complete.
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
@@ -1444,7 +1462,9 @@ void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
void Queue::flush()
{
ScopedUse u(barrier);
- if (u.acquired && store) store->flush(*this);
+// if (u.acquired && store) store->flush(*this);
+ // TODO: kpvdr: Async store flush here
+ if (u.acquired && asyncStore) { /*store->flush(*this);*/ }
}
@@ -1454,7 +1474,8 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
if (exchange->bind(shared_from_this(), key, &arguments)) {
bound(exchange->getName(), key, arguments);
if (exchange->isDurable() && isDurable()) {
- store->bind(*exchange, *this, key, arguments);
+// store->bind(*exchange, *this, key, arguments);
+ // TODO: kpvdr: Store configuration here
}
return true;
} else {
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 671a24d53e..28fa8b5ca9 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -59,7 +59,7 @@ namespace qpid {
namespace broker {
class Broker;
class Exchange;
-class MessageStore;
+//class MessageStore;
class QueueDepth;
class QueueEvents;
class QueueRegistry;
@@ -115,7 +115,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef boost::function1<void, Message&> MessageFunctor;
const std::string name;
- MessageStore* store;
+// MessageStore* store;
+ AsyncStore* asyncStore;
const OwnershipToken* owner;
uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
uint32_t browserCount; // Count of non-acquiring subscriptions.
@@ -201,7 +202,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN Queue(const std::string& name,
const QueueSettings& settings = QueueSettings(),
- MessageStore* const store = 0,
+// MessageStore* const store = 0,
+ AsyncStore* const asyncStore = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
QPID_BROKER_EXTERN virtual ~Queue();
@@ -286,7 +288,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
- inline bool isDurable() const { return store != 0; }
+// inline bool isDurable() const { return store != 0; }
+ inline bool isDurable() const { return asyncStore != 0; }
inline const QueueSettings& getSettings() const { return settings; }
inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; }
inline bool isAutoDelete() const { return settings.autodelete; }
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h
index 7a159f2639..4988f2af39 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.h
+++ b/cpp/src/qpid/broker/QueueAsyncContext.h
@@ -24,9 +24,9 @@
#ifndef qpid_broker_QueueAsyncContext_h_
#define qpid_broker_QueueAsyncContext_h_
-#include "AsyncResultHandle.h"
-#include "AsyncStore.h"
-#include "TxnHandle.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/TxnHandle.h"
#include "qpid/asyncStore/AsyncOperation.h"
@@ -36,7 +36,7 @@
namespace qpid {
namespace broker {
-class PersistableMessage;
+//class PersistableMessage;
class PersistableQueue;
typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp
index efeb9ae53b..6ff3f832e4 100644
--- a/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/cpp/src/qpid/broker/QueueFactory.cpp
@@ -41,7 +41,8 @@ namespace qpid {
namespace broker {
-QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {}
+//QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {}
+QueueFactory::QueueFactory() : broker(0), asyncStore(0), parent(0) {}
boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
{
@@ -51,12 +52,15 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
// -> if 'ring' policy is in use then subclass
boost::shared_ptr<Queue> queue;
if (settings.dropMessagesAtLimit) {
- queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
+// queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
+ queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? asyncStore : 0, parent, broker));
} else if (settings.lvqKey.size()) {
std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey));
- queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker));
+// queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker));
+ queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? asyncStore : 0, parent, broker));
} else {
- queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker));
+// queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker));
+ queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? asyncStore : 0, parent, broker));
}
//2. determine Messages type (i.e. structure)
@@ -98,13 +102,15 @@ Broker* QueueFactory::getBroker()
{
return broker;
}
-void QueueFactory::setStore (MessageStore* s)
+//void QueueFactory::setStore (MessageStore* s)
+void QueueFactory::setStore (AsyncStore* as)
{
- store = s;
+ asyncStore = as;
}
-MessageStore* QueueFactory::getStore() const
+//MessageStore* QueueFactory::getStore() const
+AsyncStore* QueueFactory::getStore() const
{
- return store;
+ return asyncStore;
}
void QueueFactory::setParent(management::Manageable* p)
{
diff --git a/cpp/src/qpid/broker/QueueFactory.h b/cpp/src/qpid/broker/QueueFactory.h
index b6a79f1f1a..9d0048e139 100644
--- a/cpp/src/qpid/broker/QueueFactory.h
+++ b/cpp/src/qpid/broker/QueueFactory.h
@@ -31,7 +31,8 @@ class Manageable;
}
namespace broker {
class Broker;
-class MessageStore;
+//class MessageStore;
+class AsyncStore;
class Queue;
struct QueueSettings;
@@ -52,12 +53,14 @@ class QueueFactory
/**
* Set the store to use. May only be called once.
*/
- void setStore (MessageStore*);
+// void setStore (MessageStore*);
+ void setStore (AsyncStore*);
/**
* Return the message store used.
*/
- MessageStore* getStore() const;
+// MessageStore* getStore() const;
+ AsyncStore* getStore() const;
/**
* Register the manageable parent for declared queues
@@ -65,7 +68,8 @@ class QueueFactory
void setParent(management::Manageable*);
private:
Broker* broker;
- MessageStore* store;
+// MessageStore* store;
+ AsyncStore* asyncStore;
management::Manageable* parent;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp
index dffb262a3b..9c8d7eba67 100644
--- a/cpp/src/qpid/broker/QueueHandle.cpp
+++ b/cpp/src/qpid/broker/QueueHandle.cpp
@@ -23,9 +23,8 @@
#include "QueueHandle.h"
-#include "PrivateImplRef.h"
-
#include "qpid/asyncStore/QueueHandleImpl.h"
+#include "qpid/broker/PrivateImplRef.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 3521e08325..420a9caa28 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -100,12 +100,14 @@ Queue::shared_ptr QueueRegistry::get(const string& name) {
return q;
}
-void QueueRegistry::setStore (MessageStore* _store)
+//void QueueRegistry::setStore (MessageStore* _store)
+void QueueRegistry::setStore (AsyncStore* _store)
{
QueueFactory::setStore(_store);
}
-MessageStore* QueueRegistry::getStore() const
+//MessageStore* QueueRegistry::getStore() const
+AsyncStore* QueueRegistry::getStore() const
{
return QueueFactory::getStore();
}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 7fce90c679..b274493f8d 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -97,12 +97,14 @@ class QueueRegistry : QueueFactory {
/**
* Set the store to use. May only be called once.
*/
- void setStore (MessageStore*);
+// void setStore (MessageStore*);
+ void setStore (AsyncStore*);
/**
* Return the message store used.
*/
- MessageStore* getStore() const;
+// MessageStore* getStore() const;
+ AsyncStore* getStore() const;
/**
* Register the manageable parent for declared queues
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h
index 87f768eefd..805041f0b7 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.h
+++ b/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/TxOp.h"
#include <algorithm>
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h
index d1f8e1106c..15221a58c0 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.h
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/TxOp.h"
#include <algorithm>
diff --git a/cpp/src/qpid/broker/RecoveryAsyncContext.cpp b/cpp/src/qpid/broker/RecoveryAsyncContext.cpp
new file mode 100644
index 0000000000..acbe3d5d9c
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoveryAsyncContext.cpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoveryAsyncContext.cpp
+ */
+
+#include "RecoveryAsyncContext.h"
+
+namespace qpid {
+namespace broker {
+
+RecoveryAsyncContext::RecoveryAsyncContext(RecoveryManagerImpl& rm,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq) :
+ m_rm(rm),
+ m_rcb(rcb),
+ m_arq(arq)
+{}
+
+RecoveryAsyncContext::~RecoveryAsyncContext() {}
+
+RecoveryManagerImpl&
+RecoveryAsyncContext::getRecoveryManager() const {
+ return m_rm;
+}
+
+
+AsyncResultQueue*
+RecoveryAsyncContext::getAsyncResultQueue() const {
+ return m_arq;
+}
+
+void
+RecoveryAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const {
+ if (m_rcb) {
+ m_rcb(arh);
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/RecoveryAsyncContext.h b/cpp/src/qpid/broker/RecoveryAsyncContext.h
new file mode 100644
index 0000000000..f06bf85154
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoveryAsyncContext.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoveryAsyncContext.h
+ */
+
+#ifndef qpid_broker_RecoveryAsyncContext_h_
+#define qpid_broker_RecoveryAsyncContext_h_
+
+#include "qpid/broker/AsyncStore.h"
+
+namespace qpid {
+namespace broker {
+class AsyncResultHandle;
+class RecoveryManagerImpl;
+
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+
+class RecoveryAsyncContext: public qpid::broker::BrokerAsyncContext {
+public:
+ RecoveryAsyncContext(RecoveryManagerImpl& rm,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
+ virtual ~RecoveryAsyncContext();
+ RecoveryManagerImpl& getRecoveryManager() const;
+ AsyncResultQueue* getAsyncResultQueue() const;
+ void invokeCallback(const AsyncResultHandle* const) const;
+
+private:
+ RecoveryManagerImpl& m_rm;
+ AsyncResultCallback m_rcb;
+ AsyncResultQueue* const m_arq;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_RecoveryAsyncContext_h_
diff --git a/cpp/src/qpid/broker/RecoveryHandle.cpp b/cpp/src/qpid/broker/RecoveryHandle.cpp
new file mode 100644
index 0000000000..f93b7602a3
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoveryHandle.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoveryHandle.cpp
+ */
+
+#include "RecoveryHandle.h"
+
+#include "qpid/asyncStore/RecoveryHandleImpl.h"
+#include "qpid/broker/PrivateImplRef.h"
+
+namespace qpid {
+namespace broker {
+
+typedef PrivateImplRef<RecoveryHandle> PrivateImpl;
+
+RecoveryHandle::RecoveryHandle(qpid::asyncStore::RecoveryHandleImpl* p) :
+ Handle<qpid::asyncStore::RecoveryHandleImpl>()
+{
+ PrivateImpl::ctor(*this, p);
+}
+
+RecoveryHandle::RecoveryHandle(const RecoveryHandle& r) :
+ Handle<qpid::asyncStore::RecoveryHandleImpl>()
+{
+ PrivateImpl::copy(*this, r);
+}
+
+RecoveryHandle::~RecoveryHandle() {
+ PrivateImpl::dtor(*this);
+}
+
+RecoveryHandle&
+RecoveryHandle::operator=(const RecoveryHandle& r) {
+ return PrivateImpl::assign(*this, r);
+}
+
+}} // namespace qpid */
diff --git a/cpp/src/qpid/broker/RecoveryHandle.h b/cpp/src/qpid/broker/RecoveryHandle.h
new file mode 100644
index 0000000000..4efd674835
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoveryHandle.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file RecoveryHandle.h
+ */
+
+#ifndef qpid_broker_RecoveryHandle_h_
+#define qpid_broker_RecoveryHandle_h_
+
+#include "qpid/asyncStore/AsyncStoreHandle.h"
+#include "qpid/broker/Handle.h"
+
+namespace qpid {
+namespace asyncStore {
+class RecoveryHandleImpl;
+}
+namespace broker {
+
+class RecoveryHandle: public qpid::broker::Handle<qpid::asyncStore::RecoveryHandleImpl>,
+ public qpid::asyncStore::AsyncStoreHandle
+{
+public:
+ RecoveryHandle(qpid::asyncStore::RecoveryHandleImpl* p = 0);
+ RecoveryHandle(const RecoveryHandle& r);
+ virtual ~RecoveryHandle();
+ RecoveryHandle& operator=(const RecoveryHandle& r);
+
+ // --- RecoveryHandleImpl methods ---
+ // <none>
+
+private:
+ friend class PrivateImplRef<RecoveryHandle>;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_RecoveryHandle_h_
diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h
index 2929e92250..0cb7c544cd 100644
--- a/cpp/src/qpid/broker/RecoveryManager.h
+++ b/cpp/src/qpid/broker/RecoveryManager.h
@@ -45,15 +45,18 @@ class RecoveryManager{
virtual void recoveryComplete() = 0;
};
+// kpvdr: this has been replaced with AsyncRecoverable defined in AsyncStore.h
+/*
class Recoverable {
public:
virtual ~Recoverable() {}
- /**
+ *
* Request recovery of queue and message state.
- */
+
virtual void recover(RecoveryManager& recoverer) = 0;
};
+*/
}}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 5d96467bbf..97d6dc07b0 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -155,7 +155,8 @@ void SemanticState::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store)
+//void SemanticState::commit(MessageStore* const store)
+void SemanticState::commit(AsyncStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 67cfe808d0..a30c7e15b7 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -55,7 +55,8 @@ namespace qpid {
namespace broker {
class Exchange;
-class MessageStore;
+//class MessageStore;
+class AsyncStore;
class SessionContext;
class SessionState;
@@ -233,7 +234,8 @@ class SemanticState : private boost::noncopyable {
void stop(const std::string& destination);
void startTx();
- void commit(MessageStore* const store);
+// void commit(MessageStore* const store);
+ void commit(AsyncStore* const store);
void rollback();
void selectDtx();
bool getDtxSelected() const { return dtxSelected; }
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index c973098020..a05934ed8e 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -655,7 +655,9 @@ XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
- getBroker().getStore().collectPreparedXids(xids);
+// getBroker().getStore().collectPreparedXids(xids);
+ // TODO: kpvdr: When designing async store with gsim, it was decided that this function
+ // would be performed outside the store. Resolve this function.
/*
* create array of long structs
*/
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index c11389bb17..d9871a430b 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -337,4 +337,11 @@ TopicExchange::~TopicExchange() {}
const std::string TopicExchange::typeName("topic");
+// DataSource interface - used to write persistence data to async store
+// TODO: kpvdr: implement
+uint64_t TopicExchange::getSize() {
+ return 0;
+}
+void TopicExchange::write(char* /*target*/) {}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 46871a1c6b..c50ecf1830 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -109,6 +109,11 @@ public:
QPID_BROKER_EXTERN virtual ~TopicExchange();
virtual bool supportsDynamicBinding() { return true; }
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
+
class TopicExchangeTester;
friend class TopicExchangeTester;
};
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
index 7663cc525f..dad451776d 100644
--- a/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -53,20 +53,22 @@ void TxBuffer::enlist(TxOp::shared_ptr op)
ops.push_back(op);
}
-bool TxBuffer::commitLocal(TransactionalStore* const store)
+//bool TxBuffer::commitLocal(TransactionalStore* const store)
+bool TxBuffer::commitLocal(AsyncTransactionalStore* const asyncTxnStore)
{
- if (!store) return false;
+ if (!asyncTxnStore) return false;
try {
- std::auto_ptr<TransactionContext> ctxt = store->begin();
- if (prepare(ctxt.get())) {
- store->commit(*ctxt);
- commit();
- return true;
- } else {
- store->abort(*ctxt);
- rollback();
- return false;
- }
+// std::auto_ptr<TransactionContext> ctxt = asyncTxnStore->begin();
+// if (prepare(ctxt.get())) {
+// asyncTxnStore->commit(*ctxt);
+// commit();
+// return true;
+// } else {
+// asyncTxnStore->abort(*ctxt);
+// rollback();
+// return false;
+// }
+ // TODO: kpvdr: add async local transaction commits here
} catch (std::exception& e) {
QPID_LOG(error, "Commit failed with exception: " << e.what());
} catch (...) {
diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h
index 22e2f06be1..df878e2b96 100644
--- a/cpp/src/qpid/broker/TxBuffer.h
+++ b/cpp/src/qpid/broker/TxBuffer.h
@@ -59,6 +59,8 @@
*/
namespace qpid {
namespace broker {
+ class AsyncTransactionalStore;
+
class TxBuffer{
typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
std::vector<TxOp::shared_ptr> ops;
@@ -107,7 +109,8 @@ namespace qpid {
* Helper method for managing the process of server local
* commit
*/
- QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
+// QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
+ QPID_BROKER_EXTERN bool commitLocal(AsyncTransactionalStore* const store);
};
}
}
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index fe32753b4e..0a66527e98 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -620,6 +620,10 @@ bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const frami
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+// DataSource interface - used to write persistence data to async store
+uint64_t BrokerReplicator::getSize() { return 0; }
+void BrokerReplicator::write(char* /*target*/) {}
+
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
}} // namespace broker
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index 69653b876a..109d8c638e 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -76,6 +76,10 @@ class BrokerReplicator : public broker::Exchange,
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
private:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index ae53f89404..8aba7555d4 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -195,4 +195,8 @@ bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
+// DataSource interface - used to write persistence data to async store
+uint64_t QueueReplicator::getSize() { return 0; }
+void QueueReplicator::write(char* /*target*/) {}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index f8a68ea38f..a2a158539e 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -77,6 +77,10 @@ class QueueReplicator : public broker::Exchange,
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
private:
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h
index 582354d723..481a8c499d 100644
--- a/cpp/src/qpid/management/ManagementDirectExchange.h
+++ b/cpp/src/qpid/management/ManagementDirectExchange.h
@@ -48,6 +48,10 @@ class ManagementDirectExchange : public virtual DirectExchange
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
virtual ~ManagementDirectExchange();
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize() { return 0; }
+ void write(char* /*target*/) {}
};
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h
index eff01a8552..0d6b6ad50c 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.h
+++ b/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -52,6 +52,11 @@ class ManagementTopicExchange : public virtual TopicExchange
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
virtual ~ManagementTopicExchange();
+
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize() { return 0; }
+ void write(char* /*target*/) {}
+
};
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp
index c6b0e1a53a..d72200c2ba 100644
--- a/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -101,7 +101,8 @@ MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
provider->second->activate(*this);
NoopDeleter d;
boost::shared_ptr<qpid::broker::MessageStore> sp(this, d);
- broker->setStore(sp);
+// broker->setStore(sp);
+ // TODO: kpvdr: Windows store earlyInitialize()
target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this));
}
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index f88acb04ee..22eeff41c5 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -430,6 +430,11 @@ bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b)
const std::string XmlExchange::typeName("xml");
-
+
+
+// DataSource interface - used to write persistence data to async store
+uint64_t XmlExchange::getSize() { return 0; }
+void XmlExchange::write(char* /*target*/) {}
+
}
}
diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h
index 7b04781ad5..a80588c7ab 100644
--- a/cpp/src/qpid/xml/XmlExchange.h
+++ b/cpp/src/qpid/xml/XmlExchange.h
@@ -94,6 +94,10 @@ class XmlExchange : public virtual Exchange {
virtual ~XmlExchange();
+ // DataSource interface - used to write persistence data to async store
+ uint64_t getSize();
+ void write(char* target);
+
struct MatchOrigin {
const std::string origin;
MatchOrigin(const std::string& origin);
diff --git a/cpp/src/tests/AsyncCompletion.cpp b/cpp/src/tests/AsyncCompletion.cpp
index e32097106f..a864841f07 100644
--- a/cpp/src/tests/AsyncCompletion.cpp
+++ b/cpp/src/tests/AsyncCompletion.cpp
@@ -16,33 +16,34 @@
*
*/
-
-#include "unit_test.h"
-#include "test_tools.h"
-#include "BrokerFixture.h"
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid/sys/BlockingQueue.h"
-#include "qpid/client/AsyncSession.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/QueueQueryResult.h"
-#include "qpid/client/TypedResult.h"
-
-using namespace std;
-using namespace qpid;
-using namespace client;
-using namespace framing;
-
-namespace qpid { namespace broker {
-class TransactionContext;
-class PersistableQueue;
-}}
-
-using broker::PersistableMessage;
-using broker::NullMessageStore;
-using broker::TransactionContext;
-using broker::PersistableQueue;
-using sys::TIME_SEC;
-using boost::intrusive_ptr;
+// TODO: kpvdr: Rewrite this test in terms of an Null AsyncStore
+
+//#include "unit_test.h"
+//#include "test_tools.h"
+//#include "BrokerFixture.h"
+//#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/sys/BlockingQueue.h"
+//#include "qpid/client/AsyncSession.h"
+//#include "qpid/sys/Time.h"
+//#include "qpid/framing/QueueQueryResult.h"
+//#include "qpid/client/TypedResult.h"
+//
+//using namespace std;
+//using namespace qpid;
+//using namespace client;
+//using namespace framing;
+//
+//namespace qpid { namespace broker {
+//class TransactionContext;
+//class PersistableQueue;
+//}}
+//
+//using broker::PersistableMessage;
+//using broker::NullMessageStore;
+//using broker::TransactionContext;
+//using broker::PersistableQueue;
+//using sys::TIME_SEC;
+//using boost::intrusive_ptr;
/** @file Unit tests for async completion.
* Using a dummy store, verify that the broker indicates async completion of
@@ -52,6 +53,7 @@ using boost::intrusive_ptr;
namespace qpid {
namespace tests {
+/*
class AsyncCompletionMessageStore : public NullMessageStore {
public:
sys::BlockingQueue<boost::intrusive_ptr<PersistableMessage> > enqueued;
@@ -72,8 +74,10 @@ QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite)
QPID_AUTO_TEST_CASE(testWaitTillComplete) {
SessionFixture fix;
AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
- boost::shared_ptr<qpid::broker::MessageStore> p;
- p.reset(store);
+// boost::shared_ptr<qpid::broker::MessageStore> p;
+ boost::shared_ptr<qpid::broker::AsyncStore> p;
+// p.reset(store);
+ // TODO: kpvdr: Rewrite this test to use AsyncStore
fix.broker->setStore(p);
AsyncSession s = fix.session;
@@ -116,5 +120,6 @@ QPID_AUTO_TEST_CASE(testGetResult) {
}
QPID_AUTO_TEST_SUITE_END()
+*/
}} // namespace qpid::tests
diff --git a/cpp/src/tests/DtxWorkRecordTest.cpp b/cpp/src/tests/DtxWorkRecordTest.cpp
index 9d7666dca4..654342037e 100644
--- a/cpp/src/tests/DtxWorkRecordTest.cpp
+++ b/cpp/src/tests/DtxWorkRecordTest.cpp
@@ -32,6 +32,7 @@ namespace tests {
QPID_AUTO_TEST_SUITE(DtxWorkRecordTestSuite)
+/*
QPID_AUTO_TEST_CASE(testOnePhaseCommit){
MockTransactionalStore store;
store.expectBegin().expectCommit();
@@ -187,6 +188,8 @@ QPID_AUTO_TEST_CASE(testRollback){
opA->check();
opB->check();
}
+*/
+// TODO: kpvdr: Rewrite this test (and TxMocks.h) to use Async store
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 3dfe3863f4..7b7c653029 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -29,7 +29,7 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/FieldTable.h"
diff --git a/cpp/src/tests/TxBufferTest.cpp b/cpp/src/tests/TxBufferTest.cpp
index 4807026ab7..6c4473a662 100644
--- a/cpp/src/tests/TxBufferTest.cpp
+++ b/cpp/src/tests/TxBufferTest.cpp
@@ -32,6 +32,7 @@ namespace tests {
QPID_AUTO_TEST_SUITE(TxBufferTestSuite)
+/*
QPID_AUTO_TEST_CASE(testCommitLocal)
{
MockTransactionalStore store;
@@ -175,6 +176,8 @@ QPID_AUTO_TEST_CASE(testBufferIsClearedAfterCommit)
opA->check();
opB->check();
}
+*/
+// TODO: kpvdr: Rewrite this test (and TxMocks.h) to use Async store
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp
index 83f6a5e4b1..e6bc55c033 100644
--- a/cpp/src/tests/test_store.cpp
+++ b/cpp/src/tests/test_store.cpp
@@ -19,7 +19,7 @@
*
*/
-
+// TODO: kpvdr: Rewrite this test in terms of an Null AsyncStore
/**@file
* Plug-in message store for tests.
*
@@ -32,26 +32,26 @@
* - do async completion after a delay.
*/
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/amqp_0_10/MessageTransfer.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/log/Statement.h"
-#include "qpid/Plugin.h"
-#include "qpid/Options.h"
-#include <boost/cast.hpp>
-#include <boost/lexical_cast.hpp>
-#include <memory>
-#include <fstream>
-
-using namespace qpid;
-using namespace broker;
-using namespace std;
-using namespace qpid::sys;
+//#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/Broker.h"
+//#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+//#include "qpid/framing/AMQFrame.h"
+//#include "qpid/log/Statement.h"
+//#include "qpid/Plugin.h"
+//#include "qpid/Options.h"
+//#include <boost/cast.hpp>
+//#include <boost/lexical_cast.hpp>
+//#include <memory>
+//#include <fstream>
+//
+//using namespace qpid;
+//using namespace broker;
+//using namespace std;
+//using namespace qpid::sys;
namespace qpid {
namespace tests {
-
+/*
struct TestStoreOptions : public Options {
string name;
@@ -76,6 +76,7 @@ struct Completer : public Runnable {
}
};
+
class TestStore : public NullMessageStore {
public:
TestStore(const TestStoreOptions& opts, Broker& broker_)
@@ -157,6 +158,7 @@ const string TestStore::EXCEPTION = "exception";
const string TestStore::EXIT_PROCESS = "exit_process";
const string TestStore::ASYNC="async ";
+
struct TestStorePlugin : public Plugin {
TestStoreOptions options;
@@ -168,12 +170,13 @@ struct TestStorePlugin : public Plugin {
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
boost::shared_ptr<MessageStore> p(new TestStore(options, *broker));
- broker->setStore (p);
+// broker->setStore (p);
+ // TODO: kpvdr: This test will need to be reworked in terms of an AsyncStore.
}
void initialize(qpid::Plugin::Target&) {}
};
static TestStorePlugin pluginInstance;
-
+*/
}} // namespace qpid::tests