diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-01 15:30:01 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-01 15:30:01 +0000 |
commit | ad9bebb1157f009151973cf721fdebdd663d39e3 (patch) | |
tree | 3b8dc0a9fa3de3b88bcbb82572a06cb579fa3002 /cpp/src/qpid/asyncStore | |
parent | 220841d24ff48f27339000e887d5465a53c39013 (diff) | |
download | qpid-python-ad9bebb1157f009151973cf721fdebdd663d39e3.tar.gz |
WIP: Non-transactional message path in place. Transactions not working.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1345240 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h | 85 |
4 files changed, 88 insertions, 41 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 5a4905fef6..083034acc4 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -246,9 +246,9 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerAsyncContext* brokerCtxt) { AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), @@ -260,18 +260,6 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) -{ - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - resultCb, - brokerCtxt); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt) @@ -282,18 +270,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, resultCb, brokerCtxt); m_operations.submit(op); -} - -void -AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) -{ - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - resultCb, - brokerCtxt); - m_operations.submit(op); +//delete op; +//delete brokerCtxt; } void diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 7e3b3e94da..0298c74dc5 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -111,16 +111,10 @@ public: qpid::broker::BrokerAsyncContext* brokerCtxt); void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); - void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt); void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); - void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 1e52eb3612..69ddf7645e 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -23,8 +23,6 @@ #include "OperationQueue.h" -#include "qpid/broker/BrokerAsyncContext.h" - namespace qpid { namespace asyncStore { @@ -42,7 +40,7 @@ OperationQueue::~OperationQueue() void OperationQueue::submit(const AsyncOperation* op) { -//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; +//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -51,7 +49,7 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; +//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; if ((*i)->m_resCb) { ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); } else { diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h index b13caa5462..207bbc68f2 100644 --- a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h +++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h @@ -40,7 +40,7 @@ public: /** * \brief Constructor with an option to set an inital value for the counter. */ - AtomicCounter(T initialValue = T(0)) : + AtomicCounter(const T initialValue = T(0)) : m_cnt(initialValue) {} @@ -58,13 +58,90 @@ public: * first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid * returning the value 0. */ - virtual T next() + T + next() { - // --- START OF CRITICAL SECTION --- ScopedLock l(m_mutex); while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow return m_cnt; - } // --- END OF CRITICAL SECTION --- + } + + void + operator++() + { + ScopedLock l(m_mutex); + ++m_cnt; + } + + void + operator--() + { + ScopedLock l(m_mutex); + --m_cnt; + } + + T + get() const + { + ScopedLock l(m_mutex); + return m_cnt; + } + + bool + operator==(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt == rhs.get(); + } + + bool + operator==(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt == rhs; + } + + bool + operator!=(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt != rhs.get(); + } + + bool + operator!=(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt != rhs; + } + + bool + operator>(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt > rhs.get(); + } + + bool + operator>(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt > rhs; + } + + bool + operator<(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt < rhs.get(); + } + + bool + operator<(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt < rhs; + } protected: T m_cnt; ///< Internal count value |