diff options
Diffstat (limited to 'cpp/src/qpid')
43 files changed, 492 insertions, 361 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index bcaac5c548..b8fdb8b140 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -38,7 +38,7 @@ AsyncOperation::AsyncOperation() : {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), m_targetHandle(th), @@ -48,7 +48,7 @@ AsyncOperation::AsyncOperation(const opCode op, {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), @@ -59,7 +59,7 @@ AsyncOperation::AsyncOperation(const opCode op, {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), @@ -70,7 +70,7 @@ AsyncOperation::AsyncOperation(const opCode op, {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index 188750e910..cb73ad639b 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -25,10 +25,10 @@ #define qpid_asyncStore_AsyncOperation_h_ #include "qpid/broker/AsyncStore.h" -#include "qpid/broker/IdHandle.h" namespace qpid { namespace asyncStore { +class AsyncStoreHandle; class AsyncOperation { public: @@ -49,18 +49,18 @@ public: AsyncOperation(); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); @@ -71,7 +71,7 @@ public: private: opCode m_op; - const qpid::broker::IdHandle* m_targetHandle; + const AsyncStoreHandle* m_targetHandle; const qpid::broker::DataSource* const m_dataSrc; const qpid::broker::TxnHandle* m_txnHandle; boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt; diff --git a/cpp/src/qpid/broker/IdHandle.h b/cpp/src/qpid/asyncStore/AsyncStoreHandle.h index ddf94a2396..97111d1b84 100644 --- a/cpp/src/qpid/broker/IdHandle.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreHandle.h @@ -18,20 +18,20 @@ */ /** - * \file IdHandle.h + * \file AsyncStoreHandle.h */ -#ifndef qpid_broker_IdHandle_h_ -#define qpid_broker_IdHandle_h_ +#ifndef qpid_asyncStore_AsyncStoreHandle_h_ +#define qpid_asyncStore_AsyncStoreHandle_h_ namespace qpid { -namespace broker { +namespace asyncStore { -class IdHandle { +class AsyncStoreHandle { public: - virtual ~IdHandle(); + virtual ~AsyncStoreHandle() {} }; -}} // namespace qpid::broker +}} // namespace qpid::asyncStore -#endif // qpid_broker_IdHandle_h_ +#endif // qpid_asyncStore_AsyncStoreHandle_h_ diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 9135fcc27e..6b5c3ac582 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -23,7 +23,11 @@ #include "AsyncStoreImpl.h" -#include "AsyncOperation.h" +#include "ConfigHandleImpl.h" +#include "EnqueueHandleImpl.h" +#include "EventHandleImpl.h" +#include "MessageHandleImpl.h" +#include "QueueHandleImpl.h" #include "TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" @@ -33,8 +37,6 @@ #include "qpid/broker/QueueHandle.h" #include "qpid/broker/TxnHandle.h" -#include <boost/intrusive_ptr.hpp> - namespace qpid { namespace asyncStore { @@ -88,6 +90,36 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb)); } +void +AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { @@ -98,14 +130,16 @@ qpid::broker::EnqueueHandle AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, qpid::broker::QueueHandle& queueHandle) { - return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); + return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, + queueHandle)); } qpid::broker::EventHandle AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle, const std::string& key) { - return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, key)); + return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, + key)); } qpid::broker::MessageHandle @@ -123,42 +157,12 @@ AsyncStoreImpl::createQueueHandle(const std::string& name, } void -AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dynamic_cast<AsyncStoreHandle*>(&cfgHandle), dataSrc, brokerCtxt)); m_operations.submit(op); @@ -169,7 +173,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dynamic_cast<AsyncStoreHandle*>(&cfgHandle), brokerCtxt)); m_operations.submit(op); } @@ -180,7 +184,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dynamic_cast<AsyncStoreHandle*>(&queueHandle), dataSrc, brokerCtxt)); m_operations.submit(op); @@ -191,7 +195,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dynamic_cast<AsyncStoreHandle*>(&queueHandle), brokerCtxt)); m_operations.submit(op); } @@ -201,19 +205,7 @@ AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - dataSrc, + dynamic_cast<AsyncStoreHandle*>(&queueHandle), brokerCtxt)); m_operations.submit(op); } @@ -225,7 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dynamic_cast<AsyncStoreHandle*>(&eventHandle), dataSrc, &txnHandle, brokerCtxt)); @@ -234,21 +226,11 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dynamic_cast<AsyncStoreHandle*>(&eventHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); @@ -260,7 +242,7 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + dynamic_cast<AsyncStoreHandle*>(&enqHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); @@ -272,7 +254,7 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + dynamic_cast<AsyncStoreHandle*>(&enqHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index a00771abf5..7dee03dc6d 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -30,17 +30,19 @@ #include "qpid/asyncStore/jrnl2/RecordIdCounter.h" #include "qpid/broker/AsyncStore.h" -#include "qpid/sys/Poller.h" namespace qpid { - namespace broker { class Broker; -} // namespace qpid::broker +} + +namespace sys { +class Poller; +} namespace asyncStore { -class AsyncStoreImpl: public qpid::broker::AsyncStore { +class AsyncStoreImpl : public qpid::broker::AsyncStore { public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, const AsyncStoreOptions& opts); @@ -52,12 +54,23 @@ public: void initManagement(qpid::broker::Broker* broker); - // --- Factory methods for creating handles --- + // --- Interface from AsyncTransactionalStore --- qpid::broker::TxnHandle createTxnHandle(); qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); qpid::broker::TxnHandle createTxnHandle(const std::string& xid); - qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid, + qpid::broker::TxnBuffer* tb); + + void submitPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + void submitCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + void submitAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + + + // --- Interface from AsyncStore --- qpid::broker::ConfigHandle createConfigHandle(); qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, @@ -68,16 +81,6 @@ public: qpid::broker::QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts); - - // --- Store async interface --- - - void submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); @@ -94,14 +97,9 @@ public: void submitCreate(qpid::broker::EventHandle& eventHandle, const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitDestroy(qpid::broker::EventHandle& eventHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h index 2849e3d2c6..a1325839d9 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h @@ -24,16 +24,11 @@ #ifndef qpid_asyncStore_AsyncStoreOptions_h_ #define qpid_asyncStore_AsyncStoreOptions_h_ -#include "qpid/asyncStore/jrnl2/Streamable.h" - #include "qpid/Options.h" #include <string> namespace qpid { -namespace broker { -class Options; -} namespace asyncStore { class AsyncStoreOptions : public qpid::Options @@ -49,7 +44,8 @@ public: std::string m_storeDir; private: - // Static initialization race condition avoidance with static instance of Plugin class (using construct-on-first-use idiom). + // Static initialization race condition avoidance with static instance of Plugin class + // (using construct-on-first-use idiom). static std::string& getDefaultStoreDir(); }; diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp index 64e2e848fa..fc47f330a4 100644 --- a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp @@ -23,8 +23,6 @@ #include "ConfigHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp index 0e291def78..be87831520 100644 --- a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp @@ -23,8 +23,6 @@ #include "EnqueueHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h index f976b6d246..2788366c5c 100644 --- a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h +++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h @@ -27,7 +27,6 @@ #include "qpid/RefCounted.h" namespace qpid { - namespace broker { class MessageHandle; class QueueHandle; diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp index 05b98d1e5d..4fd8805912 100644 --- a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp @@ -23,8 +23,6 @@ #include "EventHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.h b/cpp/src/qpid/asyncStore/EventHandleImpl.h index 1c6bc52f9f..9a0e694d12 100644 --- a/cpp/src/qpid/asyncStore/EventHandleImpl.h +++ b/cpp/src/qpid/asyncStore/EventHandleImpl.h @@ -27,7 +27,6 @@ #include "qpid/RefCounted.h" namespace qpid { - namespace broker { class QueueHandle; } diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp index cea039221a..a926aa161f 100644 --- a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp @@ -23,8 +23,6 @@ #include "MessageHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.h b/cpp/src/qpid/asyncStore/MessageHandleImpl.h index eb80fca2d4..ded9d63375 100644 --- a/cpp/src/qpid/asyncStore/MessageHandleImpl.h +++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.h @@ -27,7 +27,6 @@ #include "qpid/RefCounted.h" namespace qpid { - namespace broker { class DataSource; } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 0b7f58bd6c..dd4e7c1343 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -24,6 +24,8 @@ #include "OperationQueue.h" #include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/AsyncResultHandleImpl.h" +#include "qpid/log/Statement.h" namespace qpid { namespace asyncStore { @@ -42,7 +44,6 @@ OperationQueue::~OperationQueue() void OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op) { -//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -52,7 +53,6 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); if (bc) { qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); @@ -64,9 +64,9 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) } } } catch (const std::exception& e) { - std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl; + QPID_LOG(error, "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what()); } catch (...) { - std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl; + QPID_LOG(error, "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op"); } return e.end(); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h index d2bd5d0f26..143ac2ab0c 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.h +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -26,7 +26,7 @@ #include "AsyncOperation.h" -#include "qpid/broker/AsyncStore.h" +//#include "qpid/broker/AsyncStore.h" #include "qpid/sys/PollableQueue.h" namespace qpid { diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp index 4f35e8cd2a..400237df75 100644 --- a/cpp/src/qpid/asyncStore/Plugin.cpp +++ b/cpp/src/qpid/asyncStore/Plugin.cpp @@ -23,6 +23,8 @@ #include "Plugin.h" +#include "AsyncStoreImpl.h" + #include "qpid/broker/Broker.h" namespace qpid { diff --git a/cpp/src/qpid/asyncStore/Plugin.h b/cpp/src/qpid/asyncStore/Plugin.h index 7cf500a122..7d2b67bdad 100644 --- a/cpp/src/qpid/asyncStore/Plugin.h +++ b/cpp/src/qpid/asyncStore/Plugin.h @@ -24,13 +24,15 @@ #ifndef qpid_broker_Plugin_h_ #define qpid_broker_Plugin_h_ -#include "AsyncStoreImpl.h" #include "AsyncStoreOptions.h" #include "qpid/Plugin.h" namespace qpid { class Options; +namespace asyncStore { +class AsyncStoreImpl; +} namespace broker { class Plugin : public qpid::Plugin diff --git a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp index 523a31ba7b..3781329b92 100644 --- a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp @@ -23,8 +23,6 @@ #include "QueueHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/RunState.h b/cpp/src/qpid/asyncStore/RunState.h index dfa331ea1a..832e922a04 100644 --- a/cpp/src/qpid/asyncStore/RunState.h +++ b/cpp/src/qpid/asyncStore/RunState.h @@ -59,7 +59,7 @@ typedef enum { RS_STOPPED } RunState_t; -class RunState: public qpid::asyncStore::jrnl2::State<RunState_t> +class RunState : public qpid::asyncStore::jrnl2::State<RunState_t> { public: RunState(); diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index c5371f161c..2b343e9517 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -25,7 +25,7 @@ #include "qpid/Exception.h" #include "qpid/broker/TxnBuffer.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "qpid/log/Statement.h" #include <uuid/uuid.h> @@ -117,7 +117,7 @@ TxnHandleImpl::createLocalXid() char uuidStr[37]; // 36-char uuid + trailing '\0' ::uuid_unparse(uuid, uuidStr); m_xid.assign(uuidStr); -//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush; + QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); } }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp index cdd2231977..d2fa9ae3e0 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.cpp +++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp @@ -23,21 +23,22 @@ #include "AsyncResultHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "AsyncResultHandleImpl.h" +#include "PrivateImplRef.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<AsyncResultHandle> PrivateImpl; +typedef PrivateImplRef<AsyncResultHandle> PrivateImpl; AsyncResultHandle::AsyncResultHandle(AsyncResultHandleImpl* p) : - qpid::messaging::Handle<AsyncResultHandleImpl>() + Handle<AsyncResultHandleImpl>() { PrivateImpl::ctor(*this, p); } AsyncResultHandle::AsyncResultHandle(const AsyncResultHandle& r) : - qpid::messaging::Handle<AsyncResultHandleImpl>() + Handle<AsyncResultHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h index f916bde5d3..cf0fea5a06 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.h +++ b/cpp/src/qpid/broker/AsyncResultHandle.h @@ -24,14 +24,17 @@ #ifndef qpid_broker_AsyncResultHandle_h_ #define qpid_broker_AsyncResultHandle_h_ -#include "AsyncResultHandleImpl.h" +#include "Handle.h" -#include "qpid/messaging/Handle.h" +#include <boost/shared_ptr.hpp> +#include <string> namespace qpid { namespace broker { +class AsyncResultHandleImpl; +class BrokerAsyncContext; -class AsyncResultHandle : public qpid::messaging::Handle<AsyncResultHandleImpl> +class AsyncResultHandle : public Handle<AsyncResultHandleImpl> { public: AsyncResultHandle(AsyncResultHandleImpl* p = 0); @@ -47,7 +50,7 @@ public: void invokeAsyncResultCallback() const; private: - friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>; + friend class PrivateImplRef<AsyncResultHandle>; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp index 62c7ed33d9..ab408be9ca 100644 --- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp @@ -21,9 +21,12 @@ * \file AsyncResultQueueImpl.cpp */ -#include "AsyncResultHandle.h" #include "AsyncResultQueueImpl.h" +#include "AsyncResultHandle.h" + +#include "qpid/log/Statement.h" + namespace qpid { namespace broker { @@ -41,7 +44,6 @@ AsyncResultQueueImpl::~AsyncResultQueueImpl() void AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh) { -//std::cout << "==> AsyncResultQueueImpl::submit() errNo=" << arh->getErrNo() << " errMsg=\"" << arh->getErrMsg() << "\"" << std::endl << std::flush; m_resQueue.push(arh); } @@ -51,15 +53,14 @@ AsyncResultQueueImpl::handle(const ResultQueue::Batch& e) { try { for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush; if ((*i)->isValid()) { (*i)->invokeAsyncResultCallback(); } } } catch (const std::exception& e) { - std::cerr << "qpid::broker::AsyncResultQueueImpl: Exception thrown processing async result: " << e.what() << std::endl; + QPID_LOG(error, "Exception thrown processing async result: " << e.what()); } catch (...) { - std::cerr << "qpid::broker::AsyncResultQueueImpl: Unknown exception thrown processing async result" << std::endl; + QPID_LOG(error, "Unknown exception thrown processing async result"); } return e.end(); } diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 2f73ec8f3e..7bb6175862 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -20,12 +20,10 @@ #ifndef qpid_broker_AsyncStore_h_ #define qpid_broker_AsyncStore_h_ -// TODO: See if we can replace this with a forward declaration, but current definition of qpid::types::Variant::Map -// does not allow it. Using a local map<std::string, Variant> definition also precludes forward declaration. #include "qpid/types/Variant.h" // qpid::types::Variant::Map #include <boost/shared_ptr.hpp> -#include <stdint.h> +#include <stdint.h> // uint64_t #include <string> namespace qpid { @@ -57,66 +55,91 @@ public: virtual void write(char* target) = 0; }; -// Callback invoked by AsyncResultQueue to pass back async results -typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); +// Opaque async handles used for carrying persistence state. class ConfigHandle; class EnqueueHandle; class EventHandle; class MessageHandle; class QueueHandle; -class TxnBuffer; class TxnHandle; -class AsyncTransactionalStore { +class TxnBuffer; + +class AsyncTransaction { public: - virtual ~AsyncTransactionalStore() {} + virtual ~AsyncTransaction() {} virtual TxnHandle createTxnHandle() = 0; virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; virtual TxnHandle createTxnHandle(const std::string& xid) = 0; - virtual TxnHandle createTxnHandle(const std::string& xid, TxnBuffer* tb) = 0; - - // TODO: Remove boost::shared_ptr<> from this interface - virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only - virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid, + TxnBuffer* tb) = 0; + + // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface + virtual void submitPrepare(TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only + virtual void submitCommit(TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitAbort(TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; }; // Subclassed by store: -class AsyncStore : public AsyncTransactionalStore { +class AsyncStore : public AsyncTransaction { public: virtual ~AsyncStore() {} // --- Factory methods for creating handles --- virtual ConfigHandle createConfigHandle() = 0; - virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0; - virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0; + virtual EnqueueHandle createEnqueueHandle(MessageHandle&, + QueueHandle&) = 0; + virtual EventHandle createEventHandle(QueueHandle&, + const std::string& key=std::string()) = 0; virtual MessageHandle createMessageHandle(const DataSource* const) = 0; - virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; + virtual QueueHandle createQueueHandle(const std::string& name, + const qpid::types::Variant::Map& opts) = 0; // --- Store async interface --- - // TODO: Remove boost::shared_ptr<> from this interface - virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - - virtual void submitCreate(QueueHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitFlush(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - - virtual void submitCreate(EventHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(EventHandle&, const DataSource* const, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(EventHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(EventHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - - virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDequeue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface + virtual void submitCreate(ConfigHandle&, + const DataSource* const, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(ConfigHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + + virtual void submitCreate(QueueHandle&, + const DataSource* const, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(QueueHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitFlush(QueueHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + + virtual void submitCreate(EventHandle&, + const DataSource* const, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(EventHandle&, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + + virtual void submitEnqueue(EnqueueHandle&, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDequeue(EnqueueHandle&, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; // Legacy - Restore FTD message, is NOT async! - virtual int loadContent(MessageHandle&, QueueHandle&, char* data, uint64_t offset, const uint64_t length) = 0; + virtual int loadContent(MessageHandle&, + QueueHandle&, + char* data, + uint64_t offset, + const uint64_t length) = 0; }; diff --git a/cpp/src/qpid/broker/ConfigHandle.cpp b/cpp/src/qpid/broker/ConfigHandle.cpp index 13f7e7fa94..0bd65543ae 100644 --- a/cpp/src/qpid/broker/ConfigHandle.cpp +++ b/cpp/src/qpid/broker/ConfigHandle.cpp @@ -23,23 +23,23 @@ #include "ConfigHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/ConfigHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<ConfigHandle> PrivateImpl; +typedef PrivateImplRef<ConfigHandle> PrivateImpl; ConfigHandle::ConfigHandle(qpid::asyncStore::ConfigHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::ConfigHandleImpl>() { PrivateImpl::ctor(*this, p); } ConfigHandle::ConfigHandle(const ConfigHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::ConfigHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h index 52a9d672d5..6bcb3d8ce0 100644 --- a/cpp/src/qpid/broker/ConfigHandle.h +++ b/cpp/src/qpid/broker/ConfigHandle.h @@ -21,19 +21,21 @@ * \file ConfigHandle.h */ -#ifndef qpid_broker_ConfigHandleImpl_h_ -#define qpid_broker_ConfigHandleImpl_h_ +#ifndef qpid_broker_ConfigHandle_h_ +#define qpid_broker_ConfigHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/ConfigHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" namespace qpid { +namespace asyncStore { +class ConfigHandleImpl; +} namespace broker { -class ConfigHandle : public qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>, - public IdHandle +class ConfigHandle : public Handle<qpid::asyncStore::ConfigHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: ConfigHandle(qpid::asyncStore::ConfigHandleImpl* p = 0); @@ -45,9 +47,9 @@ public: // <none> private: - friend class qpid::messaging::PrivateImplRef<ConfigHandle>; + friend class PrivateImplRef<ConfigHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_ConfigHandleImpl_h_ +#endif // qpid_broker_ConfigHandle_h_ diff --git a/cpp/src/qpid/broker/EnqueueHandle.cpp b/cpp/src/qpid/broker/EnqueueHandle.cpp index 3b8e2d5b30..877eb680a6 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.cpp +++ b/cpp/src/qpid/broker/EnqueueHandle.cpp @@ -23,23 +23,23 @@ #include "EnqueueHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/EnqueueHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<EnqueueHandle> PrivateImpl; +typedef PrivateImplRef<EnqueueHandle> PrivateImpl; EnqueueHandle::EnqueueHandle(qpid::asyncStore::EnqueueHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EnqueueHandleImpl>() { PrivateImpl::ctor(*this, p); } EnqueueHandle::EnqueueHandle(const EnqueueHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EnqueueHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h index cdd07e246b..f869a755b1 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.h +++ b/cpp/src/qpid/broker/EnqueueHandle.h @@ -21,19 +21,21 @@ * \file EnqueueHandle.h */ -#ifndef qpid_broker_EnqueueHandleImpl_h_ -#define qpid_broker_EnqueueHandleImpl_h_ +#ifndef qpid_broker_EnqueueHandle_h_ +#define qpid_broker_EnqueueHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/EnqueueHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" namespace qpid { +namespace asyncStore { +class EnqueueHandleImpl; +} namespace broker { -class EnqueueHandle : public qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>, - public IdHandle +class EnqueueHandle : public Handle<qpid::asyncStore::EnqueueHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: EnqueueHandle(qpid::asyncStore::EnqueueHandleImpl* p = 0); @@ -45,9 +47,9 @@ public: // <none> private: - friend class qpid::messaging::PrivateImplRef<EnqueueHandle>; + friend class PrivateImplRef<EnqueueHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_EnqueueHandleImpl_h_ +#endif // qpid_broker_EnqueueHandle_h_ diff --git a/cpp/src/qpid/broker/EventHandle.cpp b/cpp/src/qpid/broker/EventHandle.cpp index 97d5920837..81f33b59a6 100644 --- a/cpp/src/qpid/broker/EventHandle.cpp +++ b/cpp/src/qpid/broker/EventHandle.cpp @@ -23,23 +23,23 @@ #include "EventHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/EventHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<EventHandle> PrivateImpl; +typedef PrivateImplRef<EventHandle> PrivateImpl; EventHandle::EventHandle(qpid::asyncStore::EventHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EventHandleImpl>() { PrivateImpl::ctor(*this, p); } EventHandle::EventHandle(const EventHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EventHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h index 20e7773502..31f0e22dbf 100644 --- a/cpp/src/qpid/broker/EventHandle.h +++ b/cpp/src/qpid/broker/EventHandle.h @@ -21,19 +21,23 @@ * \file EventHandle.h */ -#ifndef qpid_broker_EventHandleImpl_h_ -#define qpid_broker_EventHandleImpl_h_ +#ifndef qpid_broker_EventHandle_h_ +#define qpid_broker_EventHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/EventHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" + +#include <string> namespace qpid { +namespace asyncStore { +class EventHandleImpl; +} namespace broker { -class EventHandle : public qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>, - public IdHandle +class EventHandle : public Handle<qpid::asyncStore::EventHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: EventHandle(qpid::asyncStore::EventHandleImpl* p = 0); @@ -45,9 +49,9 @@ public: const std::string& getKey() const; private: - friend class qpid::messaging::PrivateImplRef<EventHandle>; + friend class PrivateImplRef<EventHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_EventHandleImpl_h_ +#endif // qpid_broker_EventHandle_h_ diff --git a/cpp/src/qpid/broker/Handle.h b/cpp/src/qpid/broker/Handle.h new file mode 100644 index 0000000000..397f58f2e7 --- /dev/null +++ b/cpp/src/qpid/broker/Handle.h @@ -0,0 +1,83 @@ +#ifndef QPID_BROKER_HANDLE_H +#define QPID_BROKER_HANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * NOTE: This is a copy of qpid::messaging::Handle (but stripped of its + * messaging-specific Windows decoration macros) + * + * This (together with PrivateImplRef.h) has been placed here so + * as not to introduce unnecessary dependencies on qpid::messaging + * for users of the Handle template in the qpid::broker namespace. + * + * Any fixes made here should also be made to qpid/messaging/Handle.h + * + * TODO: Find the correct Windows decorations for these functions. + * TODO: Find (if possible) a way to eliminate two copies of the same code. + */ + +namespace qpid { +namespace broker { + +template <class> class PrivateImplRef; + +/** \ingroup messaging + * A handle is like a pointer: refers to an underlying implementation object. + * Copying the handle does not copy the object. + * + * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the + * conversion to bool to test for a null handle. + */ +template <class T> class Handle { + public: + + /**@return true if handle is valid, i.e. not null. */ + bool isValid() const { return impl; } + + /**@return true if handle is null. It is an error to call any function on a null handle. */ + bool isNull() const { return !impl; } + + /** Conversion to bool supports idiom if (handle) { handle->... } */ + operator bool() const { return impl; } + + /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */ + bool operator !() const { return !impl; } + + void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; } + + protected: + typedef T Impl; + Handle() :impl() {} + + // Not implemented,subclasses must implement. + Handle(const Handle&); + Handle& operator=(const Handle&); + + Impl* impl; + + friend class PrivateImplRef<T>; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_HANDLE_H*/ diff --git a/cpp/src/qpid/broker/IdHandle.cpp b/cpp/src/qpid/broker/IdHandle.cpp deleted file mode 100644 index ebb8f9a3c6..0000000000 --- a/cpp/src/qpid/broker/IdHandle.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file IdHandle.cpp - */ - -#include "IdHandle.h" - -namespace qpid { -namespace broker { - -IdHandle::~IdHandle() -{} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandle.cpp b/cpp/src/qpid/broker/MessageHandle.cpp index 9a7b631e8b..2727e74edc 100644 --- a/cpp/src/qpid/broker/MessageHandle.cpp +++ b/cpp/src/qpid/broker/MessageHandle.cpp @@ -23,22 +23,23 @@ #include "MessageHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/MessageHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<MessageHandle> PrivateImpl; +typedef PrivateImplRef<MessageHandle> PrivateImpl; MessageHandle::MessageHandle(qpid::asyncStore::MessageHandleImpl* p) : - IdHandle() + Handle<qpid::asyncStore::MessageHandleImpl>() { PrivateImpl::ctor(*this, p); } MessageHandle::MessageHandle(const MessageHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::MessageHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h index 739c53f7d3..e0a68a8878 100644 --- a/cpp/src/qpid/broker/MessageHandle.h +++ b/cpp/src/qpid/broker/MessageHandle.h @@ -21,19 +21,21 @@ * \file MessageHandle.h */ -#ifndef qpid_broker_MessageHandleImpl_h_ -#define qpid_broker_MessageHandleImpl_h_ +#ifndef qpid_broker_MessageHandle_h_ +#define qpid_broker_MessageHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/MessageHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" namespace qpid { +namespace asyncStore { +class MessageHandleImpl; +} namespace broker { -class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, - public IdHandle +class MessageHandle : public Handle<qpid::asyncStore::MessageHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: MessageHandle(qpid::asyncStore::MessageHandleImpl* p = 0); @@ -45,9 +47,9 @@ public: // <none> private: - friend class qpid::messaging::PrivateImplRef<MessageHandle>; + friend class PrivateImplRef<MessageHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_MessageHandleImpl_h_ +#endif // qpid_broker_MessageHandle_h_ diff --git a/cpp/src/qpid/broker/PrivateImplRef.h b/cpp/src/qpid/broker/PrivateImplRef.h new file mode 100644 index 0000000000..5932ab882b --- /dev/null +++ b/cpp/src/qpid/broker/PrivateImplRef.h @@ -0,0 +1,105 @@ +#ifndef QPID_BROKER_PRIVATEIMPLREF_H +#define QPID_BROKER_PRIVATEIMPLREF_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * NOTE: This is a copy of qpid::messaging::PrivateImplRef + * + * This (together with Handle.h) has been placed here so + * as not to introduce unnecessary dependencies on qpid::messaging + * for users of the Handle template in the qpid::broker namespace. + * + * Any fixes made here should also be made to qpid/messaging/PrivateImplRef.h + * + * TODO: Find (if possible) a way to eliminate two copies of the same code. + */ + +#include <boost/intrusive_ptr.hpp> +#include "qpid/RefCounted.h" + +namespace qpid { +namespace broker { + +/** + * Helper class to implement a class with a private, reference counted + * implementation and reference semantics. + * + * Such classes are used in the public API to hide implementation, they + * should. Example of use: + * + * === Foo.h + * + * template <class T> class PrivateImplRef; + * class FooImpl; + * + * Foo : public Handle<FooImpl> { + * public: + * Foo(FooImpl* = 0); + * Foo(const Foo&); + * ~Foo(); + * Foo& operator=(const Foo&); + * + * int fooDo(); // and other Foo functions... + * + * private: + * typedef FooImpl Impl; + * Impl* impl; + * friend class PrivateImplRef<Foo>; + * + * === Foo.cpp + * + * typedef PrivateImplRef<Foo> PI; + * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); } + * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); } + * Foo::~Foo() { PI::dtor(*this); } + * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); } + * + * int foo::fooDo() { return impl->fooDo(); } + * + */ +template <class T> class PrivateImplRef { + public: + typedef typename T::Impl Impl; + typedef boost::intrusive_ptr<Impl> intrusive_ptr; + + /** Get the implementation pointer from a handle */ + static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const intrusive_ptr& p) { + if (t.impl == p) return; + if (t.impl) boost::intrusive_ptr_release(t.impl); + t.impl = p.get(); + if (t.impl) boost::intrusive_ptr_add_ref(t.impl); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PRIVATEIMPLREF_H*/ diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp index 780eb6395c..5a5678df5a 100644 --- a/cpp/src/qpid/broker/QueueHandle.cpp +++ b/cpp/src/qpid/broker/QueueHandle.cpp @@ -23,22 +23,23 @@ #include "QueueHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/QueueHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<QueueHandle> PrivateImpl; +typedef PrivateImplRef<QueueHandle> PrivateImpl; QueueHandle::QueueHandle(qpid::asyncStore::QueueHandleImpl* p) : - IdHandle() + Handle<qpid::asyncStore::QueueHandleImpl>() { PrivateImpl::ctor(*this, p); } QueueHandle::QueueHandle(const QueueHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::QueueHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::QueueHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h index cb366e2880..234c5e15e8 100644 --- a/cpp/src/qpid/broker/QueueHandle.h +++ b/cpp/src/qpid/broker/QueueHandle.h @@ -21,18 +21,23 @@ * \file QueueHandle.h */ -#ifndef qpid_broker_QueueHandleImpl_h_ -#define qpid_broker_QueueHandleImpl_h_ +#ifndef qpid_broker_QueueHandle_h_ +#define qpid_broker_QueueHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/QueueHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" + +#include <string> namespace qpid { +namespace asyncStore { +class QueueHandleImpl; +} namespace broker { -class QueueHandle : public qpid::messaging::Handle<qpid::asyncStore::QueueHandleImpl>, public IdHandle +class QueueHandle : public Handle<qpid::asyncStore::QueueHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: QueueHandle(qpid::asyncStore::QueueHandleImpl* p = 0); @@ -44,9 +49,9 @@ public: const std::string& getName() const; private: - friend class qpid::messaging::PrivateImplRef<QueueHandle>; + friend class PrivateImplRef<QueueHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_QueueHandleImpl_h_ +#endif // qpid_broker_QueueHandle_h_ diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index e8abe99dab..c3d4342993 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -23,8 +23,6 @@ #include "TxnAsyncContext.h" -#include <cassert> - namespace qpid { namespace broker { @@ -38,9 +36,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, m_op(op), m_rcb(rcb), m_arq(arq) -{ - assert(m_th.isValid()); -} +{} TxnAsyncContext::~TxnAsyncContext() {} @@ -63,7 +59,7 @@ TxnAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -TxnHandle +TxnHandle& TxnAsyncContext::getTransactionContext() const { return m_th; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 9bdd8f7188..810c46429c 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -25,15 +25,16 @@ #define qpid_broker_TxnAsyncContext_h_ #include "AsyncStore.h" // qpid::broker::BrokerAsyncContext -#include "TxnHandle.h" #include "qpid/asyncStore/AsyncOperation.h" -#include <boost/shared_ptr.hpp> - namespace qpid { namespace broker { +class TxnHandle; + +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); + class TxnAsyncContext: public BrokerAsyncContext { public: @@ -46,7 +47,7 @@ public: TxnBuffer* getTxnBuffer() const; qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - TxnHandle getTransactionContext() const; + TxnHandle& getTransactionContext() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; @@ -54,7 +55,7 @@ public: private: TxnBuffer* const m_tb; - TxnHandle m_th; + TxnHandle& m_th; const qpid::asyncStore::AsyncOperation::opCode m_op; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index b975f09448..425d725e9e 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -24,13 +24,10 @@ #include "TxnBuffer.h" #include "AsyncResultHandle.h" -#include "AsyncStore.h" #include "TxnAsyncContext.h" #include "TxnOp.h" -#include "qpid/Exception.h" - -#include <boost/shared_ptr.hpp> +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -47,7 +44,6 @@ TxnBuffer::~TxnBuffer() void TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) { -//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); m_ops.push_back(op); } @@ -55,7 +51,6 @@ TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) bool TxnBuffer::prepare(TxnHandle& th) { -//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { if (!(*i)->prepare(th)) { @@ -68,7 +63,6 @@ TxnBuffer::prepare(TxnHandle& th) void TxnBuffer::commit() { -//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->commit(); @@ -79,7 +73,6 @@ TxnBuffer::commit() void TxnBuffer::rollback() { -//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->rollback(); @@ -88,17 +81,16 @@ TxnBuffer::rollback() } bool -TxnBuffer::commitLocal(AsyncTransactionalStore* const store) +TxnBuffer::commitLocal(AsyncTransaction* const store) { -//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush; if (store) { try { m_store = store; asyncLocalCommit(); } catch (std::exception& e) { - std::cerr << "Commit failed: " << e.what() << std::endl; + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); } catch (...) { - std::cerr << "Commit failed (unknown exception)" << std::endl; + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); } } return false; @@ -111,11 +103,10 @@ TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) if (arh) { boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { - std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); tac->getTxnBuffer()->asyncLocalAbort(); } else { -//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush; if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) { tac->getTxnBuffer()->asyncLocalAbort(); } else { @@ -131,13 +122,11 @@ TxnBuffer::asyncLocalCommit() assert(m_store != 0); switch(m_state) { case NONE: -//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush; m_state = PREPARE; m_txnHandle = m_store->createTxnHandle(this); prepare(m_txnHandle); break; case PREPARE: -//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush; m_state = COMMIT; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, @@ -149,16 +138,12 @@ TxnBuffer::asyncLocalCommit() } break; case COMMIT: -//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush; commit(); m_state = COMPLETE; delete this; // TODO: ugly! Find a better way to handle the life cycle of this class break; -// case COMPLETE: -//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMPLETE" << std::endl << std::flush; - break; + case COMPLETE: default: ; -//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush; } } @@ -170,7 +155,6 @@ TxnBuffer::asyncLocalAbort() case NONE: case PREPARE: case COMMIT: -//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush; m_state = ROLLBACK; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, @@ -182,31 +166,11 @@ TxnBuffer::asyncLocalAbort() } break; case ROLLBACK: -//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush; rollback(); m_state = COMPLETE; delete this; // TODO: ugly! Find a better way to handle the life cycle of this class default: ; -//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush; - } -} - -// for debugging -/* -void -TxnBuffer::printState(std::ostream& os) -{ - os << "state="; - switch(m_state) { - case NONE: os << "NONE"; break; - case PREPARE: os << "PREPARE"; break; - case COMMIT: os << "COMMIT"; break; - case ROLLBACK: os << "ROLLBACK"; break; - case COMPLETE: os << "COMPLETE"; break; - default: os << m_state << "(unknown)"; } - os << "; " << m_ops.size() << "; store=" << m_store; } -*/ }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h index ea8b73407f..cd78846b71 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -26,7 +26,8 @@ #include "TxnHandle.h" -//#include <boost/enable_shared_from_this.hpp> +#include "qpid/sys/Mutex.h" + #include <boost/shared_ptr.hpp> #include <vector> @@ -35,10 +36,10 @@ namespace broker { class AsyncResultHandle; class AsyncResultQueue; -class AsyncTransactionalStore; +class AsyncTransaction; class TxnOp; -class TxnBuffer /*: public boost::enable_shared_from_this<TxnBuffer>*/ { +class TxnBuffer { public: TxnBuffer(AsyncResultQueue& arq); virtual ~TxnBuffer(); @@ -47,21 +48,18 @@ public: bool prepare(TxnHandle& th); void commit(); void rollback(); - bool commitLocal(AsyncTransactionalStore* const store); + bool commitLocal(AsyncTransaction* const store); // --- Async operations --- static void handleAsyncResult(const AsyncResultHandle* const arh); void asyncLocalCommit(); void asyncLocalAbort(); - // --- Debug --- - //void printState(std::ostream& os); - private: std::vector<boost::shared_ptr<TxnOp> > m_ops; qpid::sys::Mutex m_opsMutex; TxnHandle m_txnHandle; - AsyncTransactionalStore* m_store; + AsyncTransaction* m_store; AsyncResultQueue& m_resultQueue; typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp index 07d46b4235..58cedd586e 100644 --- a/cpp/src/qpid/broker/TxnHandle.cpp +++ b/cpp/src/qpid/broker/TxnHandle.cpp @@ -23,23 +23,23 @@ #include "TxnHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/TxnHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<TxnHandle> PrivateImpl; +typedef PrivateImplRef<TxnHandle> PrivateImpl; TxnHandle::TxnHandle(qpid::asyncStore::TxnHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::TxnHandleImpl>() { PrivateImpl::ctor(*this, p); } TxnHandle::TxnHandle(const TxnHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::TxnHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h index 8bed14e16a..7302490939 100644 --- a/cpp/src/qpid/broker/TxnHandle.h +++ b/cpp/src/qpid/broker/TxnHandle.h @@ -21,19 +21,23 @@ * \file TxnHandle.h */ -#ifndef qpid_broker_TxnHandleImpl_h_ -#define qpid_broker_TxnHandleImpl_h_ +#ifndef qpid_broker_TxnHandle_h_ +#define qpid_broker_TxnHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/TxnHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" + +#include <string> namespace qpid { +namespace asyncStore { +class TxnHandleImpl; +} namespace broker { -class TxnHandle : public qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>, - public IdHandle +class TxnHandle : public Handle<qpid::asyncStore::TxnHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: TxnHandle(qpid::asyncStore::TxnHandleImpl* p = 0); @@ -48,9 +52,9 @@ public: void decrOpCnt(); private: - friend class qpid::messaging::PrivateImplRef<TxnHandle>; + friend class PrivateImplRef<TxnHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_TxnHandleImpl_h_ +#endif // qpid_broker_TxnHandle_h_ |