diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-09-24 13:49:13 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-09-24 13:49:13 +0000 |
commit | c095a631dcb2c7be5e167ed50f658f7c24330a45 (patch) | |
tree | f3c6dc1e3a9f6e12501c1dcb794d18779db477ac | |
parent | 0f327ee25b5ab4b9a38a8620a666e6bfc66000e7 (diff) | |
download | qpid-python-c095a631dcb2c7be5e167ed50f658f7c24330a45.tar.gz |
QPID-3858: WIP: Provisional checkin: Wiring of async store interface to broker. Code compiles, but as persistent transactions are currentl disconnected, not all tests pass.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1389378 13f79535-47bb-0310-9956-ffa450edef68
78 files changed, 1031 insertions, 277 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index ee9291a41b..ee25393f25 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1122,7 +1122,12 @@ set (qpidbroker_SOURCES qpid/broker/FifoDistributor.cpp qpid/broker/MessageGroupManager.cpp qpid/broker/PersistableMessage.cpp + qpid/broker/AsyncResultHandle.cpp + qpid/broker/AsyncResultHandleImpl.cpp + qpid/broker/AsyncResultQueueImpl.cpp qpid/broker/Bridge.cpp + qpid/broker/ConfigAsyncContext.cpp + qpid/broker/ConfigHandle.cpp qpid/broker/Connection.cpp qpid/broker/ConnectionHandler.cpp qpid/broker/ConnectionFactory.cpp @@ -1145,9 +1150,9 @@ set (qpidbroker_SOURCES qpid/broker/MessageAdapter.cpp qpid/broker/MessageBuilder.cpp qpid/broker/MessageHandle.cpp - qpid/broker/MessageStoreModule.cpp +# qpid/broker/MessageStoreModule.cpp qpid/broker/NameGenerator.cpp - qpid/broker/NullMessageStore.cpp +# qpid/broker/NullMessageStore.cpp qpid/broker/QueueBindings.cpp qpid/broker/QueuedMessage.cpp qpid/broker/QueueCursor.cpp @@ -1156,6 +1161,8 @@ set (qpidbroker_SOURCES qpid/broker/QueueRegistry.cpp qpid/broker/QueueSettings.cpp qpid/broker/QueueFlowLimit.cpp + qpid/broker/RecoveryAsyncContext.cpp + qpid/broker/RecoveryHandle.cpp qpid/broker/RecoveryManagerImpl.cpp qpid/broker/RecoveredEnqueue.cpp qpid/broker/RecoveredDequeue.cpp @@ -1444,7 +1451,7 @@ add_definitions(-DBOOST_FILESYSTEM_VERSION=2) # Now create the config file from all the info learned above. configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) -add_subdirectory(qpid/store) +#add_subdirectory(qpid/store) add_subdirectory(tests) # Support for pkg-config diff --git a/cpp/src/asyncstore.cmake b/cpp/src/asyncstore.cmake index e6230483ea..477091a927 100644 --- a/cpp/src/asyncstore.cmake +++ b/cpp/src/asyncstore.cmake @@ -54,6 +54,7 @@ set (asyncStore_SOURCES qpid/asyncStore/PersistableMessageContext.cpp qpid/asyncStore/Plugin.cpp qpid/asyncStore/QueueHandleImpl.cpp + qpid/asyncStore/RecoveryHandleImpl.cpp qpid/asyncStore/RunState.cpp qpid/asyncStore/TxnHandleImpl.cpp qpid/broker/AsyncResultHandle.cpp @@ -65,6 +66,8 @@ set (asyncStore_SOURCES qpid/broker/MessageHandle.cpp qpid/broker/QueueAsyncContext.cpp qpid/broker/QueueHandle.cpp + qpid/broker/RecoveryAsyncContext.cpp + qpid/broker/RecoveryHandle.cpp qpid/broker/SimpleDeliverable.cpp qpid/broker/SimpleDeliveryRecord.cpp qpid/broker/SimpleMessage.cpp diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index 1e3ab51165..2023da2ded 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -25,6 +25,7 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/AsyncResultHandleImpl.h" +#include "qpid/broker/RecoveryAsyncContext.h" #include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/TxnAsyncContext.h" @@ -132,6 +133,29 @@ AsyncOpTxnAbort::getOpStr() const { } +// --- class AsyncOpRecover --- + +AsyncOpRecover::AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(rcvrCtxt), store), + m_rcvrHandle(rcvrHandle) +{} + +AsyncOpRecover::~AsyncOpRecover() {} + +void +AsyncOpRecover::executeOp() const { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpRecover::getOpStr() const { + return "RECOVER"; +} + + // --- class AsyncOpConfigCreate --- AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index f22cccad07..ffd6ca44a9 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -90,6 +90,18 @@ private: }; +class AsyncOpRecover: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt, + qpid::broker::AsyncStore* store); + virtual ~AsyncOpRecover(); + virtual void executeOp() const; + virtual const char* getOpStr() const; +private: + qpid::broker::RecoveryHandle& m_rcvrHandle; +}; + class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation { public: AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 9c1ff42fa2..85fd981862 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -29,6 +29,7 @@ #include "qpid/asyncStore/EventHandleImpl.h" #include "qpid/asyncStore/MessageHandleImpl.h" #include "qpid/asyncStore/QueueHandleImpl.h" +#include "qpid/asyncStore/RecoveryHandleImpl.h" #include "qpid/asyncStore/TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" #include "qpid/broker/EnqueueHandle.h" @@ -36,6 +37,8 @@ #include "qpid/broker/MessageHandle.h" #include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/QueueHandle.h" +#include "qpid/broker/RecoveryAsyncContext.h" +#include "qpid/broker/RecoveryHandle.h" #include "qpid/broker/TxnAsyncContext.h" #include "qpid/broker/TxnHandle.h" @@ -48,12 +51,17 @@ AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, m_opts(opts), m_runState(), m_operations(m_poller) -{} +{ + QPID_LOG(info, "AsyncStoreImpl::AsyncStoreImpl()"); +} AsyncStoreImpl::~AsyncStoreImpl() {} void -AsyncStoreImpl::initialize() {} +AsyncStoreImpl::initialize(bool truncateFlag, + bool saveFlag) { + QPID_LOG(info, "AsyncStoreImpl::initialize() truncateFlag=" << (truncateFlag?"T":"F") << " saveFlag=" << (saveFlag?"T":"F")); +} uint64_t AsyncStoreImpl::getNextRid() { @@ -88,25 +96,42 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt, this)); - TxnCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) { + assert(txnCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, txnCtxt, this)); + txnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt, this)); - TxnCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) { + assert(txnCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, txnCtxt, this)); + txnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt, this)); - TxnCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) { + assert(txnCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, txnCtxt, this)); + txnCtxt->setOpStr(op->getOpStr()); + m_operations.submit(op); +} + +qpid::broker::RecoveryHandle +AsyncStoreImpl::createRecoveryHandle() { + return qpid::broker::RecoveryHandle(new RecoveryHandleImpl()); +} + +void +AsyncStoreImpl::submitRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt) { + assert(rcvrCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpRecover(rcvrHandle, rcvrCtxt, this)); + rcvrCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -144,6 +169,7 @@ void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -152,6 +178,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -160,25 +187,28 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, void AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -187,6 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -196,6 +227,7 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { + assert(brokerCtxt.get() != 0); boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); @@ -204,28 +236,30 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt, this)); - QueueCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) { + assert(queueCtxt.get() != 0); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, queueCtxt, this)); + queueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } -int -AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/, - qpid::broker::QueueHandle& /*queueHandle*/, - char* /*data*/, - uint64_t /*offset*/, - const uint64_t /*length*/) { - return 0; -} +//int +//AsyncStoreImpl::loadContent(qpid::broker::MessageHandle& /*msgHandle*/, +// qpid::broker::QueueHandle& /*queueHandle*/, +// char* /*data*/, +// uint64_t /*offset*/, +// const uint64_t /*length*/) { +// return 0; +//} }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 12a7f62c09..dd2cabaa69 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -47,7 +47,7 @@ public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, const AsyncStoreOptions& opts); virtual ~AsyncStoreImpl(); - void initialize(); + void initialize(bool truncateFlag = false, bool saveFlag = true); uint64_t getNextRid(); // Global counter for journal RIDs // --- Management --- @@ -65,11 +65,17 @@ public: qpid::broker::SimpleTxnBuffer* tb); void submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt); + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt); void submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); void submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); + + + // --- Interface from AsyncRecoverable --- + qpid::broker::RecoveryHandle createRecoveryHandle(); + void submitRecover(qpid::broker::RecoveryHandle& rcvrHandle, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext> rcvrCtxt); // --- Interface from AsyncStore --- @@ -112,12 +118,12 @@ public: qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt); - // Legacy - Restore FTD message, is NOT async! - virtual int loadContent(qpid::broker::MessageHandle& msgHandle, - qpid::broker::QueueHandle& queueHandle, - char* data, - uint64_t offset, - const uint64_t length); +// // Legacy - Restore FTD message, is NOT async! +// virtual int loadContent(qpid::broker::MessageHandle& msgHandle, +// qpid::broker::QueueHandle& queueHandle, +// char* data, +// uint64_t offset, +// const uint64_t length); private: boost::shared_ptr<qpid::sys::Poller> m_poller; diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h index 17069ec21c..2e49e0adb9 100644 --- a/cpp/src/qpid/asyncStore/ConfigHandleImpl.h +++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.h @@ -29,8 +29,7 @@ namespace qpid { namespace asyncStore { -class ConfigHandleImpl : public virtual qpid::RefCounted -{ +class ConfigHandleImpl : public virtual qpid::RefCounted { public: ConfigHandleImpl(); virtual ~ConfigHandleImpl(); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 4e05fad10d..dff5387827 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -50,6 +50,8 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { +// DEBUG: kpvdr +std::cout << "#### OperationQueue::handle(): op=" << (*i)->getOpStr() << std::endl << std::flush; (*i)->executeOp(); // Do store work here } } catch (const std::exception& e) { diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp index f6930272a4..7395fc9b87 100644 --- a/cpp/src/qpid/asyncStore/Plugin.cpp +++ b/cpp/src/qpid/asyncStore/Plugin.cpp @@ -43,7 +43,7 @@ Plugin::earlyInitialize(Target& target) { } m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options)); boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store); - broker->setAsyncStore(brokerAsyncStore); + broker->setStore(brokerAsyncStore); boost::function<void()> fn = boost::bind(&Plugin::finalize, this); target.addFinalizer(fn); QPID_LOG(info, "asyncStore: Initialized using path " << m_options.m_storeDir); diff --git a/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp new file mode 100644 index 0000000000..a976f48e17 --- /dev/null +++ b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.cpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryHandleImpl.cpp + */ + +#include "RecoveryHandleImpl.h" + +namespace qpid { +namespace asyncStore { + +RecoveryHandleImpl::RecoveryHandleImpl() {} + +RecoveryHandleImpl::~RecoveryHandleImpl() {} + +}} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h new file mode 100644 index 0000000000..8abd0b6f65 --- /dev/null +++ b/cpp/src/qpid/asyncStore/RecoveryHandleImpl.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoverHandleImpl.h + */ + +#ifndef qpid_asyncStore_RecoveryHandleImpl_h_ +#define qpid_asyncStore_RecoveryHandleImpl_h_ + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace asyncStore { + +class RecoveryHandleImpl: public virtual qpid::RefCounted { +public: + RecoveryHandleImpl(); + virtual ~RecoveryHandleImpl(); +}; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_RecoveryHandleImpl_h_ diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index e274a4e196..781418f81c 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -20,6 +20,7 @@ #ifndef qpid_broker_AsyncStore_h_ #define qpid_broker_AsyncStore_h_ +#include "qpid/broker/RecoveryManager.h" #include "qpid/types/Variant.h" // qpid::types::Variant::Map #include <boost/shared_ptr.hpp> @@ -65,9 +66,12 @@ class EnqueueHandle; class EventHandle; class MessageHandle; class QueueHandle; +class RecoveryHandle; class TxnHandle; +class InitAsyncContext; class QueueAsyncContext; +class RecoveryAsyncContext; class TpcTxnAsyncContext; class TxnAsyncContext; class SimpleTxnBuffer; @@ -92,10 +96,20 @@ public: boost::shared_ptr<TxnAsyncContext>) = 0; }; +class AsyncRecoverable { +public: + virtual ~AsyncRecoverable() {} + virtual RecoveryHandle createRecoveryHandle() = 0; + virtual void submitRecover(qpid::broker::RecoveryHandle&, + boost::shared_ptr<qpid::broker::RecoveryAsyncContext>) = 0; +}; + // Subclassed by store: -class AsyncStore : public AsyncTransactionalStore { +class AsyncStore : public AsyncTransactionalStore, + public AsyncRecoverable { public: virtual ~AsyncStore() {} + virtual void initialize(bool truncateFlag = false, bool saveFlag = true) = 0; // --- Factory methods for creating handles --- @@ -144,12 +158,12 @@ public: TxnHandle&, boost::shared_ptr<QueueAsyncContext>) = 0; - // Legacy - Restore FTD message, is NOT async! - virtual int loadContent(MessageHandle&, - QueueHandle&, - char* data, - uint64_t offset, - const uint64_t length) = 0; +// // Legacy - Restore FTD message, is NOT async! +// virtual int loadContent(MessageHandle&, +// QueueHandle&, +// char* data, +// uint64_t offset, +// const uint64_t length) = 0; }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 2411e0520c..2382205268 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -20,12 +20,16 @@ */ #include "qpid/broker/Broker.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/ConfigAsyncContext.h" +#include "qpid/broker/ConfigHandle.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" -#include "qpid/broker/MessageStoreModule.h" -#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/MessageStoreModule.h" +//#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/RecoveryAsyncContext.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" #include "qpid/broker/SecureConnectionFactory.h" @@ -34,6 +38,8 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" +#include "qpid/broker/RecoveryAsyncContext.h" +#include "qpid/broker/RecoveryHandle.h" #include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" @@ -190,7 +196,9 @@ Broker::Broker(const Broker::Options& conf) : managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, conf.qmf2Support) : 0), - store(new NullMessageStore), +// store(new NullMessageStore), +// asyncStore(0), + asyncResultQueue(poller), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), queues(this), @@ -270,23 +278,31 @@ Broker::Broker(const Broker::Options& conf) : MessageGroupManager::setDefaults(conf.defaultMsgGroup); // If no plugin store module registered itself, set up the null store. - if (NullMessageStore::isNullStore(store.get())) - setStore(); +// if (NullMessageStore::isNullStore(store.get())) +// setStore(); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if (store.get() != 0) { +// if (store.get() != 0) { + if (asyncStore.get() != 0) { // The cluster plug-in will setRecovery(false) on all but the first // broker to join a cluster. if (getRecovery()) { + QPID_LOG(info, "Store recovery starting") RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager); - store->recover(recoverer); + RecoveryHandle rh = asyncStore->createRecoveryHandle(); + boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue)); + asyncStore->submitRecover(rh, rac); +// store->recover(recoverer); } else { QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down"); - store->truncateInit(true); // save old files in subdir +// store->truncateInit(true); // save old files in subdir + asyncStore->initialize(true, true); } } +// debug + else QPID_LOG(info, ">>>> No store!!!!") //ensure standard exchanges exist (done after recovery from store) declareStandardExchange(amq_direct, DirectExchange::typeName); @@ -357,10 +373,14 @@ Broker::Broker(const Broker::Options& conf) : void Broker::declareStandardExchange(const std::string& name, const std::string& type) { - bool storeEnabled = store.get() != NULL; +// bool storeEnabled = store.get() != NULL; + bool storeEnabled = asyncStore.get() != NULL; std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); if (status.second && storeEnabled) { - store->create(*status.first, framing::FieldTable ()); +// store->create(*status.first, framing::FieldTable ()); + ConfigHandle ch = asyncStore->createConfigHandle(); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + asyncStore->submitCreate(ch, status.first.get(), bc); } } @@ -377,23 +397,39 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts) return boost::intrusive_ptr<Broker>(new Broker(opts)); } -void Broker::setStore (boost::shared_ptr<MessageStore>& _store) +//void Broker::setStore (boost::shared_ptr<MessageStore>& _store) +//{ +// store.reset(new MessageStoreModule (_store)); +// setStore(); +//} + +void Broker::setStore(boost::shared_ptr<AsyncStore>& _asyncStore) { - store.reset(new MessageStoreModule (_store)); +// asyncStore.reset(_asyncStore.get()); + asyncStore = _asyncStore; setStore(); } -void Broker::setAsyncStore(boost::shared_ptr<AsyncStore>& /*asyncStore*/) -{ - // TODO: Provide implementation for async store interface +void Broker::setStore () { +// queues.setStore (store.get()); + queues.setStore(asyncStore.get()); +// dtxManager.setStore (store.get()); + dtxManager.setStore(asyncStore.get()); +// links.setStore (store.get()); + links.setStore(asyncStore.get()); } -void Broker::setStore () { - queues.setStore (store.get()); - dtxManager.setStore (store.get()); - links.setStore (store.get()); +// static +void Broker::recoverComplete(const AsyncResultHandle* const arh) { + std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } +// static +void Broker::configureComplete(const AsyncResultHandle* const arh) { + std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + + void Broker::run() { if (config.workerThreads > 0) { QPID_LOG(notice, "Broker running"); @@ -926,7 +962,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name, return Manageable::STATUS_UNKNOWN_OBJECT; } q->query( results ); - return Manageable::STATUS_OK;; + return Manageable::STATUS_OK; } Manageable::status_t Broker::getTimestampConfig(bool& receive, @@ -1168,7 +1204,10 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( alternate->incAlternateUsers(); } if (durable) { - store->create(*result.first, arguments); +// store->create(*result.first, arguments); + ConfigHandle ch = asyncStore->createConfigHandle(); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + asyncStore->submitCreate(ch, result.first.get(), bc); } if (managementAgent.get()) { //TODO: debatable whether we should raise an event here for @@ -1208,7 +1247,11 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, Exchange::shared_ptr exchange(exchanges.get(name)); if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); - if (exchange->isDurable()) store->destroy(*exchange); +// if (exchange->isDurable()) store->destroy(*exchange); + if (exchange->isDurable()) { +// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); +// asyncStore->submitDestroy(exchange.getHandle(), bc); + } if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); exchanges.destroy(name); @@ -1285,7 +1328,8 @@ void Broker::unbind(const std::string& queueName, } else { if (exchange->unbind(queue, key, 0)) { if (exchange->isDurable() && queue->isDurable()) { - store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); +// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); + // TODO: kpvdr: Async config destroy here } getConfigurationObservers().unbind( exchange, queue, key, framing::FieldTable()); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index d1be0f58da..e4d1d93423 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -22,6 +22,7 @@ * */ +#include "qpid/broker/AsyncResultQueueImpl.h" #include "qpid/broker/AsyncStore.h" #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/ConnectionFactory.h" @@ -29,7 +30,7 @@ #include "qpid/broker/DirectExchange.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/SessionManager.h" @@ -138,6 +139,8 @@ class Broker : public sys::Runnable, public Plugin::Target, void declareStandardExchange(const std::string& name, const std::string& type); void setStore (); + static void recoverComplete(const AsyncResultHandle* const); + static void configureComplete(const AsyncResultHandle* const); void setLogLevel(const std::string& level); std::string getLogLevel(); void createObject(const std::string& type, const std::string& name, @@ -160,7 +163,10 @@ class Broker : public sys::Runnable, public Plugin::Target, Options config; std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; - std::auto_ptr<MessageStore> store; +// std::auto_ptr<MessageStore> store; +// std::auto_ptr<AsyncStore> asyncStore; + boost::shared_ptr<AsyncStore> asyncStore; + AsyncResultQueueImpl asyncResultQueue; AclModule* acl; DataDir dataDir; ConnectionObservers connectionObservers; @@ -213,9 +219,10 @@ class Broker : public sys::Runnable, public Plugin::Target, /** Shut down the broker */ QPID_BROKER_EXTERN virtual void shutdown(); - QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store); - void setAsyncStore(boost::shared_ptr<AsyncStore>& asyncStore); - MessageStore& getStore() { return *store; } +// QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store); + void setStore(boost::shared_ptr<AsyncStore>& asyncStore); +// MessageStore& getStore() { return *store; } + AsyncStore& getStore() { return *asyncStore; } void setAcl (AclModule* _acl) {acl = _acl;} AclModule* getAcl() { return acl; } QueueRegistry& getQueues() { return queues; } diff --git a/cpp/src/qpid/broker/ConfigAsyncContext.cpp b/cpp/src/qpid/broker/ConfigAsyncContext.cpp new file mode 100644 index 0000000000..ef91f56452 --- /dev/null +++ b/cpp/src/qpid/broker/ConfigAsyncContext.cpp @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ConfigAsyncContext.cpp + */ + +#include "ConfigAsyncContext.h" + +namespace qpid { +namespace broker { + +ConfigAsyncContext::ConfigAsyncContext(AsyncResultCallback rcb, + AsyncResultQueue* const arq) : + m_rcb(rcb), + m_arq(arq) +{} + +ConfigAsyncContext::~ConfigAsyncContext() {} + +AsyncResultQueue* +ConfigAsyncContext::getAsyncResultQueue() const { + return m_arq; +} + +void +ConfigAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const { + if (m_rcb) { + m_rcb(arh); + } +} + +}} // namespace qpid diff --git a/cpp/src/qpid/broker/ConfigAsyncContext.h b/cpp/src/qpid/broker/ConfigAsyncContext.h new file mode 100644 index 0000000000..ad879a09c4 --- /dev/null +++ b/cpp/src/qpid/broker/ConfigAsyncContext.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ConfigAsyncContext.h + */ + +#ifndef qpid_broker_ConfigAsyncContext_h_ +#define qpid_broker_ConfigAsyncContext_h_ + +#include "AsyncStore.h" + +namespace qpid { +namespace broker { +class AsyncResultHandle; +class AsyncResultQueue; + +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); + +class ConfigAsyncContext: public qpid::broker::BrokerAsyncContext +{ +public: + ConfigAsyncContext(AsyncResultCallback rcb, + AsyncResultQueue* const arq); + virtual ~ConfigAsyncContext(); + virtual AsyncResultQueue* getAsyncResultQueue() const; + virtual void invokeCallback(const AsyncResultHandle* const) const; + +private: + AsyncResultCallback m_rcb; + AsyncResultQueue* const m_arq; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_ConfigAsyncContext_h_ diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 2fa7ce0fc5..c56a1da6cc 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -199,3 +199,10 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin DirectExchange::~DirectExchange() {} const std::string DirectExchange::typeName("direct"); + +// DataSource interface - used to write persistence data to async store +// TODO: kpvdr: implement +uint64_t DirectExchange::getSize() { + return 0; +} +void DirectExchange::write(char* /*target*/) {} diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 833be52c1c..a0e1477d0c 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -65,6 +65,11 @@ public: QPID_BROKER_EXTERN virtual ~DirectExchange(); virtual bool supportsDynamicBinding() { return true; } + + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + }; }} diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index d482c2c327..7e2eb927a3 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -35,7 +35,8 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} +//DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} +DtxManager::DtxManager(qpid::sys::Timer& t) : asyncTxnStore(0), timer(&t) {} DtxManager::~DtxManager() {} @@ -124,7 +125,8 @@ DtxWorkRecord* DtxManager::createWork(const std::string& xid) throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)")); } else { std::string ncxid = xid; // Work around const correctness problems in ptr_map. - return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first); +// return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first); + return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, asyncTxnStore)).first); } } @@ -172,9 +174,11 @@ void DtxManager::DtxCleanup::fire() } } -void DtxManager::setStore (TransactionalStore* _store) +//void DtxManager::setStore (TransactionalStore* _store) +void DtxManager::setStore (AsyncTransactionalStore* _ats) { - store = _store; +// store = _store; + asyncTxnStore = _ats; } std::string DtxManager::convert(const qpid::framing::Xid& xid) diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index 6f03189f66..fe20a89c32 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -22,9 +22,10 @@ #define _DtxManager_ #include <boost/ptr_container/ptr_map.hpp> +#include "qpid/broker/AsyncStore.h" #include "qpid/broker/DtxBuffer.h" #include "qpid/broker/DtxWorkRecord.h" -#include "qpid/broker/TransactionalStore.h" +//#include "qpid/broker/TransactionalStore.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/Xid.h" #include "qpid/sys/Mutex.h" @@ -46,7 +47,8 @@ class DtxManager{ }; WorkMap work; - TransactionalStore* store; +// TransactionalStore* store; + AsyncTransactionalStore* asyncTxnStore; qpid::sys::Mutex lock; qpid::sys::Timer* timer; @@ -65,7 +67,8 @@ public: void setTimeout(const std::string& xid, uint32_t secs); uint32_t getTimeout(const std::string& xid); void timedout(const std::string& xid); - void setStore(TransactionalStore* store); +// void setStore(TransactionalStore* store); + void setStore(AsyncTransactionalStore* ats); void setTimer(sys::Timer& t) { timer = &t; } // Used by cluster for replication. diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index 2c26fec49f..924f953eb8 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -29,8 +29,8 @@ using qpid::sys::Mutex; using namespace qpid::broker; using namespace qpid::framing; -DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : - xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, AsyncTransactionalStore* const _ats) : + xid(_xid), asyncTxnStore(_ats), completed(false), rolledback(false), prepared(false), expired(false) {} DtxWorkRecord::~DtxWorkRecord() { @@ -43,14 +43,15 @@ bool DtxWorkRecord::prepare() { Mutex::ScopedLock locker(lock); if (check()) { - txn = store->begin(xid); - if (prepare(txn.get())) { - store->prepare(*txn); - prepared = true; - } else { - abort(); - //TODO: this should probably be flagged as internal error - } +// txn = asyncTxnStore->begin(xid); +// if (prepare(txn.get())) { +// asyncTxnStore->prepare(*txn); +// prepared = true; +// } else { +// abort(); +// //TODO: this should probably be flagged as internal error +// } + // TODO: kpvdr: Async transaction prepare here } else { //some part of the work has been marked rollback only abort(); @@ -67,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn) return succeeded; } -bool DtxWorkRecord::commit(bool onePhase) +bool DtxWorkRecord::commit(bool onePhase) // why is onePhase necessary if prepared already contains the necessary state? { Mutex::ScopedLock locker(lock); if (check()) { @@ -77,7 +78,8 @@ bool DtxWorkRecord::commit(bool onePhase) throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been prepared, one-phase option not valid!")); } - store->commit(*txn); +// asyncTxnStore->commit(*txn); + // TODO: kpvdr: Async transaction commit here txn.reset(); std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); @@ -87,17 +89,20 @@ bool DtxWorkRecord::commit(bool onePhase) if (!onePhase) { throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has not been prepared, one-phase option required!")); } - std::auto_ptr<TransactionContext> localtxn = store->begin(); - if (prepare(localtxn.get())) { - store->commit(*localtxn); - std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); - return true; - } else { - store->abort(*localtxn); - abort(); - //TODO: this should probably be flagged as internal error - return false; - } +// std::auto_ptr<TransactionContext> localtxn = asyncTxnStore->begin(); +// if (prepare(localtxn.get())) { +// asyncTxnStore->commit(*localtxn); +// std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); +// return true; +// } else { +// asyncTxnStore->abort(*localtxn); +// abort(); +// //TODO: this should probably be flagged as internal error +// return false; +// } + // TODO: kpvdr: Local transaction async prepare and commit here + // temp return value: + return true; } } else { //some part of the work has been marked rollback only @@ -147,7 +152,8 @@ bool DtxWorkRecord::check() void DtxWorkRecord::abort() { if (txn.get()) { - store->abort(*txn); +// asyncTxnStore->abort(*txn); + // TODO: kpvdr: Async transaction abore here txn.reset(); } std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback)); diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index 331e42fefd..9dd86bdcad 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -21,6 +21,7 @@ #ifndef _DtxWorkRecord_ #define _DtxWorkRecord_ +#include "qpid/broker/AsyncStore.h" #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/DtxBuffer.h" #include "qpid/broker/DtxTimeout.h" @@ -48,7 +49,8 @@ class DtxWorkRecord typedef std::vector<DtxBuffer::shared_ptr> Work; const std::string xid; - TransactionalStore* const store; +// TransactionalStore* const store; + AsyncTransactionalStore* const asyncTxnStore; bool completed; bool rolledback; bool prepared; @@ -63,7 +65,8 @@ class DtxWorkRecord bool prepare(TransactionContext* txn); public: QPID_BROKER_EXTERN DtxWorkRecord(const std::string& xid, - TransactionalStore* const store); +// TransactionalStore* const store); + AsyncTransactionalStore* const store); QPID_BROKER_EXTERN ~DtxWorkRecord(); QPID_BROKER_EXTERN bool prepare(); QPID_BROKER_EXTERN bool commit(bool onePhase); diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 2b2f7db934..b4c6b799a4 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -23,10 +23,11 @@ */ #include <boost/shared_ptr.hpp> +#include <qpid/broker/AsyncStore.h> #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableExchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Mutex.h" @@ -35,13 +36,15 @@ #include "qmf/org/apache/qpid/broker/Binding.h" #include "qmf/org/apache/qpid/broker/Broker.h" +#include <set> + namespace qpid { namespace broker { class Broker; class ExchangeRegistry; -class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable { +class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public DataSource, public management::Manageable { public: struct Binding : public management::Manageable { typedef boost::shared_ptr<Binding> shared_ptr; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index 27b705fbe5..c5d2483a23 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -24,7 +24,7 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Exchange.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" #include "qpid/management/Manageable.h" diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 56c894c129..5b7e0c7324 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -120,3 +120,11 @@ bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FanOutExchange::~FanOutExchange() {} const std::string FanOutExchange::typeName("fanout"); + + +// DataSource interface - used to write persistence data to async store +// TODO: kpvdr: implement +uint64_t FanOutExchange::getSize() { + return 0; +} +void FanOutExchange::write(char* /*target*/) {} diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index c979fdca25..dc301a4266 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -62,6 +62,11 @@ class FanOutExchange : public virtual Exchange { QPID_BROKER_EXTERN virtual ~FanOutExchange(); virtual bool supportsDynamicBinding() { return true; } + + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + }; } diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 02c05852ff..ec19765387 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -418,3 +418,9 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk) return true; } +// DataSource interface - used to write persistence data to async store +// TODO: kpvdr: implement +uint64_t HeadersExchange::getSize() { + return 0; +} +void HeadersExchange::write(char* /*target*/) {} diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 2e4669a018..ff0fec4212 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -107,6 +107,11 @@ class HeadersExchange : public virtual Exchange { static QPID_BROKER_EXTERN bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); + + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + }; diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 6479e47799..416c3b7d34 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -115,6 +115,11 @@ public: link = _link; } + // DataSource interface - used to write persistence data to async store + uint64_t getSize() { return 0; } + void write(char* /*target*/) {} + + private: Link *link; }; diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 6f813554fa..43ed208eba 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -42,7 +42,8 @@ namespace _qmf = qmf::org::apache::qpid::broker; // factored: The persistence element should be factored separately LinkRegistry::LinkRegistry () : broker(0), - parent(0), store(0), passive(false), +// parent(0), store(0), passive(false), + parent(0), asyncStore(0), passive(false), realm("") { } @@ -59,7 +60,7 @@ class LinkRegistryConnectionObserver : public ConnectionObserver { LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), - parent(0), store(0), passive(false), + parent(0), asyncStore(0), passive(false), realm(broker->getOptions().realm) { broker->getConnectionObservers().add( @@ -117,7 +118,11 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, boost::bind(&LinkRegistry::linkDestroyed, this, _1), durable, authMechanism, username, password, broker, parent, failover)); - if (durable && store) store->create(*link); +// if (durable && store) store->create(*link); + if (durable && asyncStore) { +// store->create(*link); + // TODO: kpvdr: async create config (link) + } links[name] = link; pendingLinks[name] = link; QPID_LOG(debug, "Creating new link; name=" << name ); @@ -213,8 +218,11 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, args, init, queueName, altExchange)); bridges[name] = bridge; link.add(bridge); - if (durable && store) - store->create(*bridge); +// if (durable && store) + if (durable && asyncStore) { +// store->create(*bridge); + // TODO: kpvdr: Async create config (bridge) + } QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() << "' from " << src << " to " << dest << " (" << key << ")"); @@ -234,8 +242,11 @@ void LinkRegistry::linkDestroyed(Link *link) LinkMap::iterator i = links.find(link->getName()); if (i != links.end()) { - if (i->second->isDurable() && store) - store->destroy(*(i->second)); +// if (i->second->isDurable() && store) + if (i->second->isDurable() && asyncStore) { +// store->destroy(*(i->second)); + // TODO: kpvdr: Async destroy config (link) + } links.erase(i); } } @@ -254,18 +265,22 @@ void LinkRegistry::destroyBridge(Bridge *bridge) if (link) { link->cancel(b->second); } - if (b->second->isDurable()) - store->destroy(*(b->second)); +// if (b->second->isDurable()) + if (b->second->isDurable()) { +// store->destroy(*(b->second)); + // TODO: kpvdr: Async destroy config (bridge) + } bridges.erase(b); } -void LinkRegistry::setStore (MessageStore* _store) -{ - store = _store; +//void LinkRegistry::setStore (MessageStore* _store) +void LinkRegistry::setStore (AsyncStore* _asyncStore) { + asyncStore = _asyncStore; } -MessageStore* LinkRegistry::getStore() const { - return store; +//MessageStore* LinkRegistry::getStore() const { +AsyncStore* LinkRegistry::getStore() const { + return asyncStore; } namespace { diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 076ab831c9..a30f91e2a0 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -25,7 +25,7 @@ #include <map> #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Bridge.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/Address.h" #include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" @@ -52,7 +52,8 @@ namespace broker { qpid::sys::Mutex lock; Broker* broker; management::Manageable* parent; - MessageStore* store; +// MessageStore* store; + AsyncStore* asyncStore; bool passive; std::string realm; @@ -130,12 +131,14 @@ namespace broker { /** * Set the store to use. May only be called once. */ - QPID_BROKER_EXTERN void setStore (MessageStore*); +// QPID_BROKER_EXTERN void setStore (MessageStore*); + QPID_BROKER_EXTERN void setStore (AsyncStore*); /** * Return the message store used. */ - QPID_BROKER_EXTERN MessageStore* getStore() const; +// QPID_BROKER_EXTERN MessageStore* getStore() const; + QPID_BROKER_EXTERN AsyncStore* getStore() const; QPID_BROKER_EXTERN std::string getAuthMechanism (const std::string& key); QPID_BROKER_EXTERN std::string getAuthCredentials (const std::string& key); diff --git a/cpp/src/qpid/broker/LossyQueue.cpp b/cpp/src/qpid/broker/LossyQueue.cpp index ee2c3ca794..a4e46a2e2f 100644 --- a/cpp/src/qpid/broker/LossyQueue.cpp +++ b/cpp/src/qpid/broker/LossyQueue.cpp @@ -33,8 +33,10 @@ bool isLowerPriorityThan(uint8_t priority, const Message& m) } } -LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) - : Queue(n, s, ms, p, b) {} +//LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) +// : Queue(n, s, ms, p, b) {} +LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b) + : Queue(n, s, as, p, b) {} bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message) { diff --git a/cpp/src/qpid/broker/LossyQueue.h b/cpp/src/qpid/broker/LossyQueue.h index 3e62151d6f..da4e1d52e2 100644 --- a/cpp/src/qpid/broker/LossyQueue.h +++ b/cpp/src/qpid/broker/LossyQueue.h @@ -32,7 +32,8 @@ namespace broker { class LossyQueue : public Queue { public: - LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); +// LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); + LossyQueue(const std::string&, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*); bool checkDepth(const QueueDepth& increment, const Message&); private: }; diff --git a/cpp/src/qpid/broker/Lvq.cpp b/cpp/src/qpid/broker/Lvq.cpp index d053616c8a..0bededb966 100644 --- a/cpp/src/qpid/broker/Lvq.cpp +++ b/cpp/src/qpid/broker/Lvq.cpp @@ -25,8 +25,10 @@ namespace qpid { namespace broker { -Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) - : Queue(n, s, ms, p, b), messageMap(*m) +//Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) +// : Queue(n, s, ms, p, b), messageMap(*m) +Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b) + : Queue(n, s, as, p, b), messageMap(*m) { messages = m; } diff --git a/cpp/src/qpid/broker/Lvq.h b/cpp/src/qpid/broker/Lvq.h index 335270a073..3eba381b81 100644 --- a/cpp/src/qpid/broker/Lvq.h +++ b/cpp/src/qpid/broker/Lvq.h @@ -35,7 +35,8 @@ class MessageMap; class Lvq : public Queue { public: - Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); +// Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); + Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, AsyncStore* const, management::Manageable*, Broker*); void push(Message& msg, bool isRecovery=false); private: MessageMap& messageMap; diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index ab0225ef6b..5e339574fd 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -18,6 +18,9 @@ * under the License. * */ + +#error "deprecated in favor of AsyncStore" + #ifndef _MessageStore_ #define _MessageStore_ diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index cd9fd4c933..4309ee8524 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -19,6 +19,8 @@ * */ +#error "deprecated in favor of AsyncStore" + #include "qpid/broker/MessageStoreModule.h" #include "qpid/broker/NullMessageStore.h" #include <iostream> diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 56b5a3c1ae..e5c271f4fa 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -18,6 +18,9 @@ * under the License. * */ + +#error "deprecated in favor of AsyncStore" + #ifndef _MessageStoreModule_ #define _MessageStoreModule_ diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 43f600eaf1..209941875a 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -19,6 +19,8 @@ * */ +#error "deprecated in favor of AsyncStore" + #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/MessageStoreModule.h" #include "qpid/broker/RecoveryManager.h" diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index c6f402662e..799bf6b368 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -18,6 +18,9 @@ * under the License. * */ + +#error "deprected in favor of AsyncStore" + #ifndef _NullMessageStore_ #define _NullMessageStore_ diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 9601b8dcce..2ef9fbfcbb 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -64,19 +64,19 @@ void PersistableMessage::flush() //TODO: is this really the right place for this? } -// deprecated -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*) -{ - enqueueStart(); -} +//// deprecated +//void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*) +//{ +// enqueueStart(); +//} void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, AsyncStore*) { enqueueStart(); } -// deprecated -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {} +//// deprecated +//void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {} void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) {} diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index be2910280c..0fd3d169b4 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -80,16 +80,16 @@ class PersistableMessage : public Persistable QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); } QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); } - QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated - MessageStore* _store); +// QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated +// MessageStore* _store); QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store); QPID_BROKER_EXTERN bool isDequeueComplete(); QPID_BROKER_EXTERN void dequeueComplete(); - QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated - MessageStore* _store); +// QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated +// MessageStore* _store); QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 0dd4cb7b10..40574ded3b 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -26,11 +26,11 @@ #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/MessageDeque.h" #include "qpid/broker/MessageDistributor.h" #include "qpid/broker/FifoDistributor.h" -#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" //TODO: get rid of this @@ -165,12 +165,14 @@ void Queue::TxPublish::rollback() throw() } Queue::Queue(const string& _name, const QueueSettings& _settings, - MessageStore* const _store, +// MessageStore* const _store, + AsyncStore* const _asyncStore, Manageable* parent, Broker* b) : name(_name), - store(_store), +// store(_store), + asyncStore(_asyncStore), owner(0), consumerCount(0), browserCount(0), @@ -198,9 +200,11 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); +// mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); + mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete); mgmtObject->set_arguments(settings.asMap()); - agent->addObject(mgmtObject, 0, store != 0); +// agent->addObject(mgmtObject, 0, store != 0); + agent->addObject(mgmtObject, 0, asyncStore != 0); brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); @@ -787,7 +791,7 @@ void Queue::setLastNodeFailure() * return true if enqueue succeeded and message should be made * available; returning false will result in the message being dropped */ -bool Queue::enqueue(TransactionContext* ctxt, Message& msg) +bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg) { ScopedUse u(barrier); if (!u.acquired) return false; @@ -807,13 +811,16 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) msg.addTraceId(settings.traceId); } - if (msg.isPersistent() && store) { +// if (msg.isPersistent() && store) { + if (msg.isPersistent() && asyncStore) { // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete() // when it considers the message stored. boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext(); assert(pmsg); - pmsg->enqueueAsync(shared_from_this(), store); - store->enqueue(ctxt, pmsg, *this); +// pmsg->enqueueAsync(shared_from_this(), store); + pmsg->enqueueAsync(shared_from_this(), asyncStore); +// store->enqueue(ctxt, pmsg, *this); + // TODO - kpvdr: async enqueue here } return true; } @@ -858,8 +865,10 @@ void Queue::dequeueCommited(const Message& msg) void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg) { ScopedUse u(barrier); - if (u.acquired && msg && store) { - store->dequeue(0, msg, *this); +// if (u.acquired && msg && store) { + if (u.acquired && msg && asyncStore) { +// store->dequeue(0, msg, *this); + // TODO: kpvdr: async dequeue here } } @@ -881,8 +890,10 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) return; } } - if (store && pmsg) { - store->dequeue(ctxt, pmsg, *this); +// if (store && pmsg) { + if (asyncStore && pmsg) { +// store->dequeue(ctxt, pmsg, *this); + // TODO: kpvdr: async dequeue here } } @@ -983,8 +994,10 @@ void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::Sc void Queue::create() { - if (store) { - store->create(*this, settings.storeSettings); +// if (store) { + if (asyncStore) { +// store->create(*this, settings.storeSettings); + // TODO: kpvdr: async store create here } } @@ -1051,11 +1064,16 @@ void Queue::destroyed() alternateExchange->decAlternateUsers(); } - if (store) { +// if (store) { + if (asyncStore) { barrier.destroy(); - store->flush(*this); - store->destroy(*this); - store = 0;//ensure we make no more calls to the store for this queue +// store->flush(*this); + // TODO: kpvdr: async flush here +// store->destroy(*this); + // TODO: kpvdr: async destroy here +// store = 0;//ensure we make no more calls to the store for this queue + // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which + // will cause store to be destroyed when all outstanding async ops are complete. } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); @@ -1444,7 +1462,9 @@ void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) void Queue::flush() { ScopedUse u(barrier); - if (u.acquired && store) store->flush(*this); +// if (u.acquired && store) store->flush(*this); + // TODO: kpvdr: Async store flush here + if (u.acquired && asyncStore) { /*store->flush(*this);*/ } } @@ -1454,7 +1474,8 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, if (exchange->bind(shared_from_this(), key, &arguments)) { bound(exchange->getName(), key, arguments); if (exchange->isDurable() && isDurable()) { - store->bind(*exchange, *this, key, arguments); +// store->bind(*exchange, *this, key, arguments); + // TODO: kpvdr: Store configuration here } return true; } else { diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 671a24d53e..28fa8b5ca9 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -59,7 +59,7 @@ namespace qpid { namespace broker { class Broker; class Exchange; -class MessageStore; +//class MessageStore; class QueueDepth; class QueueEvents; class QueueRegistry; @@ -115,7 +115,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, typedef boost::function1<void, Message&> MessageFunctor; const std::string name; - MessageStore* store; +// MessageStore* store; + AsyncStore* asyncStore; const OwnershipToken* owner; uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not. uint32_t browserCount; // Count of non-acquiring subscriptions. @@ -201,7 +202,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN Queue(const std::string& name, const QueueSettings& settings = QueueSettings(), - MessageStore* const store = 0, +// MessageStore* const store = 0, + AsyncStore* const asyncStore = 0, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN virtual ~Queue(); @@ -286,7 +288,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o); QPID_BROKER_EXTERN bool hasExclusiveConsumer() const; QPID_BROKER_EXTERN bool hasExclusiveOwner() const; - inline bool isDurable() const { return store != 0; } +// inline bool isDurable() const { return store != 0; } + inline bool isDurable() const { return asyncStore != 0; } inline const QueueSettings& getSettings() const { return settings; } inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; } inline bool isAutoDelete() const { return settings.autodelete; } diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h index 7a159f2639..4988f2af39 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.h +++ b/cpp/src/qpid/broker/QueueAsyncContext.h @@ -24,9 +24,9 @@ #ifndef qpid_broker_QueueAsyncContext_h_ #define qpid_broker_QueueAsyncContext_h_ -#include "AsyncResultHandle.h" -#include "AsyncStore.h" -#include "TxnHandle.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/AsyncStore.h" +#include "qpid/broker/TxnHandle.h" #include "qpid/asyncStore/AsyncOperation.h" @@ -36,7 +36,7 @@ namespace qpid { namespace broker { -class PersistableMessage; +//class PersistableMessage; class PersistableQueue; typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp index efeb9ae53b..6ff3f832e4 100644 --- a/cpp/src/qpid/broker/QueueFactory.cpp +++ b/cpp/src/qpid/broker/QueueFactory.cpp @@ -41,7 +41,8 @@ namespace qpid { namespace broker { -QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {} +//QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {} +QueueFactory::QueueFactory() : broker(0), asyncStore(0), parent(0) {} boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings) { @@ -51,12 +52,15 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que // -> if 'ring' policy is in use then subclass boost::shared_ptr<Queue> queue; if (settings.dropMessagesAtLimit) { - queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); +// queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? asyncStore : 0, parent, broker)); } else if (settings.lvqKey.size()) { std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey)); - queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker)); +// queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker)); + queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? asyncStore : 0, parent, broker)); } else { - queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker)); +// queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker)); + queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? asyncStore : 0, parent, broker)); } //2. determine Messages type (i.e. structure) @@ -98,13 +102,15 @@ Broker* QueueFactory::getBroker() { return broker; } -void QueueFactory::setStore (MessageStore* s) +//void QueueFactory::setStore (MessageStore* s) +void QueueFactory::setStore (AsyncStore* as) { - store = s; + asyncStore = as; } -MessageStore* QueueFactory::getStore() const +//MessageStore* QueueFactory::getStore() const +AsyncStore* QueueFactory::getStore() const { - return store; + return asyncStore; } void QueueFactory::setParent(management::Manageable* p) { diff --git a/cpp/src/qpid/broker/QueueFactory.h b/cpp/src/qpid/broker/QueueFactory.h index b6a79f1f1a..9d0048e139 100644 --- a/cpp/src/qpid/broker/QueueFactory.h +++ b/cpp/src/qpid/broker/QueueFactory.h @@ -31,7 +31,8 @@ class Manageable; } namespace broker { class Broker; -class MessageStore; +//class MessageStore; +class AsyncStore; class Queue; struct QueueSettings; @@ -52,12 +53,14 @@ class QueueFactory /** * Set the store to use. May only be called once. */ - void setStore (MessageStore*); +// void setStore (MessageStore*); + void setStore (AsyncStore*); /** * Return the message store used. */ - MessageStore* getStore() const; +// MessageStore* getStore() const; + AsyncStore* getStore() const; /** * Register the manageable parent for declared queues @@ -65,7 +68,8 @@ class QueueFactory void setParent(management::Manageable*); private: Broker* broker; - MessageStore* store; +// MessageStore* store; + AsyncStore* asyncStore; management::Manageable* parent; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp index dffb262a3b..9c8d7eba67 100644 --- a/cpp/src/qpid/broker/QueueHandle.cpp +++ b/cpp/src/qpid/broker/QueueHandle.cpp @@ -23,9 +23,8 @@ #include "QueueHandle.h" -#include "PrivateImplRef.h" - #include "qpid/asyncStore/QueueHandleImpl.h" +#include "qpid/broker/PrivateImplRef.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 3521e08325..420a9caa28 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -100,12 +100,14 @@ Queue::shared_ptr QueueRegistry::get(const string& name) { return q; } -void QueueRegistry::setStore (MessageStore* _store) +//void QueueRegistry::setStore (MessageStore* _store) +void QueueRegistry::setStore (AsyncStore* _store) { QueueFactory::setStore(_store); } -MessageStore* QueueRegistry::getStore() const +//MessageStore* QueueRegistry::getStore() const +AsyncStore* QueueRegistry::getStore() const { return QueueFactory::getStore(); } diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 7fce90c679..b274493f8d 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -97,12 +97,14 @@ class QueueRegistry : QueueFactory { /** * Set the store to use. May only be called once. */ - void setStore (MessageStore*); +// void setStore (MessageStore*); + void setStore (AsyncStore*); /** * Return the message store used. */ - MessageStore* getStore() const; +// MessageStore* getStore() const; + AsyncStore* getStore() const; /** * Register the manageable parent for declared queues diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 87f768eefd..805041f0b7 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -23,7 +23,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/TxOp.h" #include <algorithm> diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index d1f8e1106c..15221a58c0 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -23,7 +23,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/TxOp.h" #include <algorithm> diff --git a/cpp/src/qpid/broker/RecoveryAsyncContext.cpp b/cpp/src/qpid/broker/RecoveryAsyncContext.cpp new file mode 100644 index 0000000000..acbe3d5d9c --- /dev/null +++ b/cpp/src/qpid/broker/RecoveryAsyncContext.cpp @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryAsyncContext.cpp + */ + +#include "RecoveryAsyncContext.h" + +namespace qpid { +namespace broker { + +RecoveryAsyncContext::RecoveryAsyncContext(RecoveryManagerImpl& rm, + AsyncResultCallback rcb, + AsyncResultQueue* const arq) : + m_rm(rm), + m_rcb(rcb), + m_arq(arq) +{} + +RecoveryAsyncContext::~RecoveryAsyncContext() {} + +RecoveryManagerImpl& +RecoveryAsyncContext::getRecoveryManager() const { + return m_rm; +} + + +AsyncResultQueue* +RecoveryAsyncContext::getAsyncResultQueue() const { + return m_arq; +} + +void +RecoveryAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const { + if (m_rcb) { + m_rcb(arh); + } +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/RecoveryAsyncContext.h b/cpp/src/qpid/broker/RecoveryAsyncContext.h new file mode 100644 index 0000000000..f06bf85154 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveryAsyncContext.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryAsyncContext.h + */ + +#ifndef qpid_broker_RecoveryAsyncContext_h_ +#define qpid_broker_RecoveryAsyncContext_h_ + +#include "qpid/broker/AsyncStore.h" + +namespace qpid { +namespace broker { +class AsyncResultHandle; +class RecoveryManagerImpl; + +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); + +class RecoveryAsyncContext: public qpid::broker::BrokerAsyncContext { +public: + RecoveryAsyncContext(RecoveryManagerImpl& rm, + AsyncResultCallback rcb, + AsyncResultQueue* const arq); + virtual ~RecoveryAsyncContext(); + RecoveryManagerImpl& getRecoveryManager() const; + AsyncResultQueue* getAsyncResultQueue() const; + void invokeCallback(const AsyncResultHandle* const) const; + +private: + RecoveryManagerImpl& m_rm; + AsyncResultCallback m_rcb; + AsyncResultQueue* const m_arq; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_RecoveryAsyncContext_h_ diff --git a/cpp/src/qpid/broker/RecoveryHandle.cpp b/cpp/src/qpid/broker/RecoveryHandle.cpp new file mode 100644 index 0000000000..f93b7602a3 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveryHandle.cpp @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryHandle.cpp + */ + +#include "RecoveryHandle.h" + +#include "qpid/asyncStore/RecoveryHandleImpl.h" +#include "qpid/broker/PrivateImplRef.h" + +namespace qpid { +namespace broker { + +typedef PrivateImplRef<RecoveryHandle> PrivateImpl; + +RecoveryHandle::RecoveryHandle(qpid::asyncStore::RecoveryHandleImpl* p) : + Handle<qpid::asyncStore::RecoveryHandleImpl>() +{ + PrivateImpl::ctor(*this, p); +} + +RecoveryHandle::RecoveryHandle(const RecoveryHandle& r) : + Handle<qpid::asyncStore::RecoveryHandleImpl>() +{ + PrivateImpl::copy(*this, r); +} + +RecoveryHandle::~RecoveryHandle() { + PrivateImpl::dtor(*this); +} + +RecoveryHandle& +RecoveryHandle::operator=(const RecoveryHandle& r) { + return PrivateImpl::assign(*this, r); +} + +}} // namespace qpid */ diff --git a/cpp/src/qpid/broker/RecoveryHandle.h b/cpp/src/qpid/broker/RecoveryHandle.h new file mode 100644 index 0000000000..4efd674835 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveryHandle.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryHandle.h + */ + +#ifndef qpid_broker_RecoveryHandle_h_ +#define qpid_broker_RecoveryHandle_h_ + +#include "qpid/asyncStore/AsyncStoreHandle.h" +#include "qpid/broker/Handle.h" + +namespace qpid { +namespace asyncStore { +class RecoveryHandleImpl; +} +namespace broker { + +class RecoveryHandle: public qpid::broker::Handle<qpid::asyncStore::RecoveryHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle +{ +public: + RecoveryHandle(qpid::asyncStore::RecoveryHandleImpl* p = 0); + RecoveryHandle(const RecoveryHandle& r); + virtual ~RecoveryHandle(); + RecoveryHandle& operator=(const RecoveryHandle& r); + + // --- RecoveryHandleImpl methods --- + // <none> + +private: + friend class PrivateImplRef<RecoveryHandle>; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_RecoveryHandle_h_ diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h index 2929e92250..0cb7c544cd 100644 --- a/cpp/src/qpid/broker/RecoveryManager.h +++ b/cpp/src/qpid/broker/RecoveryManager.h @@ -45,15 +45,18 @@ class RecoveryManager{ virtual void recoveryComplete() = 0; }; +// kpvdr: this has been replaced with AsyncRecoverable defined in AsyncStore.h +/* class Recoverable { public: virtual ~Recoverable() {} - /** + * * Request recovery of queue and message state. - */ + virtual void recover(RecoveryManager& recoverer) = 0; }; +*/ }} diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 5d96467bbf..97d6dc07b0 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -155,7 +155,8 @@ void SemanticState::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void SemanticState::commit(MessageStore* const store) +//void SemanticState::commit(MessageStore* const store) +void SemanticState::commit(AsyncStore* const store) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 67cfe808d0..a30c7e15b7 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -55,7 +55,8 @@ namespace qpid { namespace broker { class Exchange; -class MessageStore; +//class MessageStore; +class AsyncStore; class SessionContext; class SessionState; @@ -233,7 +234,8 @@ class SemanticState : private boost::noncopyable { void stop(const std::string& destination); void startTx(); - void commit(MessageStore* const store); +// void commit(MessageStore* const store); + void commit(AsyncStore* const store); void rollback(); void selectDtx(); bool getDtxSelected() const { return dtxSelected; } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c973098020..a05934ed8e 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -655,7 +655,9 @@ XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() { std::set<std::string> xids; - getBroker().getStore().collectPreparedXids(xids); +// getBroker().getStore().collectPreparedXids(xids); + // TODO: kpvdr: When designing async store with gsim, it was decided that this function + // would be performed outside the store. Resolve this function. /* * create array of long structs */ diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index c11389bb17..d9871a430b 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -337,4 +337,11 @@ TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); +// DataSource interface - used to write persistence data to async store +// TODO: kpvdr: implement +uint64_t TopicExchange::getSize() { + return 0; +} +void TopicExchange::write(char* /*target*/) {} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 46871a1c6b..c50ecf1830 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -109,6 +109,11 @@ public: QPID_BROKER_EXTERN virtual ~TopicExchange(); virtual bool supportsDynamicBinding() { return true; } + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + + class TopicExchangeTester; friend class TopicExchangeTester; }; diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp index 7663cc525f..dad451776d 100644 --- a/cpp/src/qpid/broker/TxBuffer.cpp +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -53,20 +53,22 @@ void TxBuffer::enlist(TxOp::shared_ptr op) ops.push_back(op); } -bool TxBuffer::commitLocal(TransactionalStore* const store) +//bool TxBuffer::commitLocal(TransactionalStore* const store) +bool TxBuffer::commitLocal(AsyncTransactionalStore* const asyncTxnStore) { - if (!store) return false; + if (!asyncTxnStore) return false; try { - std::auto_ptr<TransactionContext> ctxt = store->begin(); - if (prepare(ctxt.get())) { - store->commit(*ctxt); - commit(); - return true; - } else { - store->abort(*ctxt); - rollback(); - return false; - } +// std::auto_ptr<TransactionContext> ctxt = asyncTxnStore->begin(); +// if (prepare(ctxt.get())) { +// asyncTxnStore->commit(*ctxt); +// commit(); +// return true; +// } else { +// asyncTxnStore->abort(*ctxt); +// rollback(); +// return false; +// } + // TODO: kpvdr: add async local transaction commits here } catch (std::exception& e) { QPID_LOG(error, "Commit failed with exception: " << e.what()); } catch (...) { diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h index 22e2f06be1..df878e2b96 100644 --- a/cpp/src/qpid/broker/TxBuffer.h +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -59,6 +59,8 @@ */ namespace qpid { namespace broker { + class AsyncTransactionalStore; + class TxBuffer{ typedef std::vector<TxOp::shared_ptr>::iterator op_iterator; std::vector<TxOp::shared_ptr> ops; @@ -107,7 +109,8 @@ namespace qpid { * Helper method for managing the process of server local * commit */ - QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store); +// QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store); + QPID_BROKER_EXTERN bool commitLocal(AsyncTransactionalStore* const store); }; } } diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index fe32753b4e..0a66527e98 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -620,6 +620,10 @@ bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const frami bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } +// DataSource interface - used to write persistence data to async store +uint64_t BrokerReplicator::getSize() { return 0; } +void BrokerReplicator::write(char* /*target*/) {} + string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } }} // namespace broker diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 69653b876a..109d8c638e 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -76,6 +76,10 @@ class BrokerReplicator : public broker::Exchange, void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index ae53f89404..8aba7555d4 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -195,4 +195,8 @@ bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } std::string QueueReplicator::getType() const { return TYPE_NAME; } +// DataSource interface - used to write persistence data to async store +uint64_t QueueReplicator::getSize() { return 0; } +void QueueReplicator::write(char* /*target*/) {} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index f8a68ea38f..a2a158539e 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -77,6 +77,10 @@ class QueueReplicator : public broker::Exchange, void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + private: void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&); diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h index 582354d723..481a8c499d 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.h +++ b/cpp/src/qpid/management/ManagementDirectExchange.h @@ -48,6 +48,10 @@ class ManagementDirectExchange : public virtual DirectExchange void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); virtual ~ManagementDirectExchange(); + + // DataSource interface - used to write persistence data to async store + uint64_t getSize() { return 0; } + void write(char* /*target*/) {} }; diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h index eff01a8552..0d6b6ad50c 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.h +++ b/cpp/src/qpid/management/ManagementTopicExchange.h @@ -52,6 +52,11 @@ class ManagementTopicExchange : public virtual TopicExchange void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); virtual ~ManagementTopicExchange(); + + // DataSource interface - used to write persistence data to async store + uint64_t getSize() { return 0; } + void write(char* /*target*/) {} + }; diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp index c6b0e1a53a..d72200c2ba 100644 --- a/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -101,7 +101,8 @@ MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target) provider->second->activate(*this); NoopDeleter d; boost::shared_ptr<qpid::broker::MessageStore> sp(this, d); - broker->setStore(sp); +// broker->setStore(sp); + // TODO: kpvdr: Windows store earlyInitialize() target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this)); } diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index f88acb04ee..22eeff41c5 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -430,6 +430,11 @@ bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b) const std::string XmlExchange::typeName("xml"); - + + +// DataSource interface - used to write persistence data to async store +uint64_t XmlExchange::getSize() { return 0; } +void XmlExchange::write(char* /*target*/) {} + } } diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h index 7b04781ad5..a80588c7ab 100644 --- a/cpp/src/qpid/xml/XmlExchange.h +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -94,6 +94,10 @@ class XmlExchange : public virtual Exchange { virtual ~XmlExchange(); + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + struct MatchOrigin { const std::string origin; MatchOrigin(const std::string& origin); diff --git a/cpp/src/tests/AsyncCompletion.cpp b/cpp/src/tests/AsyncCompletion.cpp index e32097106f..a864841f07 100644 --- a/cpp/src/tests/AsyncCompletion.cpp +++ b/cpp/src/tests/AsyncCompletion.cpp @@ -16,33 +16,34 @@ * */ - -#include "unit_test.h" -#include "test_tools.h" -#include "BrokerFixture.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/sys/BlockingQueue.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/QueueQueryResult.h" -#include "qpid/client/TypedResult.h" - -using namespace std; -using namespace qpid; -using namespace client; -using namespace framing; - -namespace qpid { namespace broker { -class TransactionContext; -class PersistableQueue; -}} - -using broker::PersistableMessage; -using broker::NullMessageStore; -using broker::TransactionContext; -using broker::PersistableQueue; -using sys::TIME_SEC; -using boost::intrusive_ptr; +// TODO: kpvdr: Rewrite this test in terms of an Null AsyncStore + +//#include "unit_test.h" +//#include "test_tools.h" +//#include "BrokerFixture.h" +//#include "qpid/broker/NullMessageStore.h" +//#include "qpid/sys/BlockingQueue.h" +//#include "qpid/client/AsyncSession.h" +//#include "qpid/sys/Time.h" +//#include "qpid/framing/QueueQueryResult.h" +//#include "qpid/client/TypedResult.h" +// +//using namespace std; +//using namespace qpid; +//using namespace client; +//using namespace framing; +// +//namespace qpid { namespace broker { +//class TransactionContext; +//class PersistableQueue; +//}} +// +//using broker::PersistableMessage; +//using broker::NullMessageStore; +//using broker::TransactionContext; +//using broker::PersistableQueue; +//using sys::TIME_SEC; +//using boost::intrusive_ptr; /** @file Unit tests for async completion. * Using a dummy store, verify that the broker indicates async completion of @@ -52,6 +53,7 @@ using boost::intrusive_ptr; namespace qpid { namespace tests { +/* class AsyncCompletionMessageStore : public NullMessageStore { public: sys::BlockingQueue<boost::intrusive_ptr<PersistableMessage> > enqueued; @@ -72,8 +74,10 @@ QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite) QPID_AUTO_TEST_CASE(testWaitTillComplete) { SessionFixture fix; AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore; - boost::shared_ptr<qpid::broker::MessageStore> p; - p.reset(store); +// boost::shared_ptr<qpid::broker::MessageStore> p; + boost::shared_ptr<qpid::broker::AsyncStore> p; +// p.reset(store); + // TODO: kpvdr: Rewrite this test to use AsyncStore fix.broker->setStore(p); AsyncSession s = fix.session; @@ -116,5 +120,6 @@ QPID_AUTO_TEST_CASE(testGetResult) { } QPID_AUTO_TEST_SUITE_END() +*/ }} // namespace qpid::tests diff --git a/cpp/src/tests/DtxWorkRecordTest.cpp b/cpp/src/tests/DtxWorkRecordTest.cpp index 9d7666dca4..654342037e 100644 --- a/cpp/src/tests/DtxWorkRecordTest.cpp +++ b/cpp/src/tests/DtxWorkRecordTest.cpp @@ -32,6 +32,7 @@ namespace tests { QPID_AUTO_TEST_SUITE(DtxWorkRecordTestSuite) +/* QPID_AUTO_TEST_CASE(testOnePhaseCommit){ MockTransactionalStore store; store.expectBegin().expectCommit(); @@ -187,6 +188,8 @@ QPID_AUTO_TEST_CASE(testRollback){ opA->check(); opB->check(); } +*/ +// TODO: kpvdr: Rewrite this test (and TxMocks.h) to use Async store QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 3dfe3863f4..7b7c653029 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -29,7 +29,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/FieldTable.h" diff --git a/cpp/src/tests/TxBufferTest.cpp b/cpp/src/tests/TxBufferTest.cpp index 4807026ab7..6c4473a662 100644 --- a/cpp/src/tests/TxBufferTest.cpp +++ b/cpp/src/tests/TxBufferTest.cpp @@ -32,6 +32,7 @@ namespace tests { QPID_AUTO_TEST_SUITE(TxBufferTestSuite) +/* QPID_AUTO_TEST_CASE(testCommitLocal) { MockTransactionalStore store; @@ -175,6 +176,8 @@ QPID_AUTO_TEST_CASE(testBufferIsClearedAfterCommit) opA->check(); opB->check(); } +*/ +// TODO: kpvdr: Rewrite this test (and TxMocks.h) to use Async store QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp index 83f6a5e4b1..e6bc55c033 100644 --- a/cpp/src/tests/test_store.cpp +++ b/cpp/src/tests/test_store.cpp @@ -19,7 +19,7 @@ * */ - +// TODO: kpvdr: Rewrite this test in terms of an Null AsyncStore /**@file * Plug-in message store for tests. * @@ -32,26 +32,26 @@ * - do async completion after a delay. */ -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/amqp_0_10/MessageTransfer.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/log/Statement.h" -#include "qpid/Plugin.h" -#include "qpid/Options.h" -#include <boost/cast.hpp> -#include <boost/lexical_cast.hpp> -#include <memory> -#include <fstream> - -using namespace qpid; -using namespace broker; -using namespace std; -using namespace qpid::sys; +//#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/Broker.h" +//#include "qpid/broker/amqp_0_10/MessageTransfer.h" +//#include "qpid/framing/AMQFrame.h" +//#include "qpid/log/Statement.h" +//#include "qpid/Plugin.h" +//#include "qpid/Options.h" +//#include <boost/cast.hpp> +//#include <boost/lexical_cast.hpp> +//#include <memory> +//#include <fstream> +// +//using namespace qpid; +//using namespace broker; +//using namespace std; +//using namespace qpid::sys; namespace qpid { namespace tests { - +/* struct TestStoreOptions : public Options { string name; @@ -76,6 +76,7 @@ struct Completer : public Runnable { } }; + class TestStore : public NullMessageStore { public: TestStore(const TestStoreOptions& opts, Broker& broker_) @@ -157,6 +158,7 @@ const string TestStore::EXCEPTION = "exception"; const string TestStore::EXIT_PROCESS = "exit_process"; const string TestStore::ASYNC="async "; + struct TestStorePlugin : public Plugin { TestStoreOptions options; @@ -168,12 +170,13 @@ struct TestStorePlugin : public Plugin { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; boost::shared_ptr<MessageStore> p(new TestStore(options, *broker)); - broker->setStore (p); +// broker->setStore (p); + // TODO: kpvdr: This test will need to be reworked in terms of an AsyncStore. } void initialize(qpid::Plugin::Target&) {} }; static TestStorePlugin pluginInstance; - +*/ }} // namespace qpid::tests |