summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-01 15:30:01 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-01 15:30:01 +0000
commitad9bebb1157f009151973cf721fdebdd663d39e3 (patch)
tree3b8dc0a9fa3de3b88bcbb82572a06cb579fa3002 /cpp/src/qpid/asyncStore
parent220841d24ff48f27339000e887d5465a53c39013 (diff)
downloadqpid-python-ad9bebb1157f009151973cf721fdebdd663d39e3.tar.gz
WIP: Non-transactional message path in place. Transactions not working.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1345240 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp32
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h6
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp6
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h85
4 files changed, 88 insertions, 41 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 5a4905fef6..083034acc4 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -246,9 +246,9 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
- qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerAsyncContext* brokerCtxt)
{
AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
@@ -260,18 +260,6 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
-{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- resultCb,
- brokerCtxt);
- m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt)
@@ -282,18 +270,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
resultCb,
brokerCtxt);
m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
-{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- resultCb,
- brokerCtxt);
- m_operations.submit(op);
+//delete op;
+//delete brokerCtxt;
}
void
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 7e3b3e94da..0298c74dc5 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -111,16 +111,10 @@ public:
qpid::broker::BrokerAsyncContext* brokerCtxt);
void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
- void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt);
void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
- void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt);
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 1e52eb3612..69ddf7645e 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -23,8 +23,6 @@
#include "OperationQueue.h"
-#include "qpid/broker/BrokerAsyncContext.h"
-
namespace qpid {
namespace asyncStore {
@@ -42,7 +40,7 @@ OperationQueue::~OperationQueue()
void
OperationQueue::submit(const AsyncOperation* op)
{
-//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -51,7 +49,7 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
if ((*i)->m_resCb) {
((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
} else {
diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
index b13caa5462..207bbc68f2 100644
--- a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
+++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
@@ -40,7 +40,7 @@ public:
/**
* \brief Constructor with an option to set an inital value for the counter.
*/
- AtomicCounter(T initialValue = T(0)) :
+ AtomicCounter(const T initialValue = T(0)) :
m_cnt(initialValue)
{}
@@ -58,13 +58,90 @@ public:
* first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid
* returning the value 0.
*/
- virtual T next()
+ T
+ next()
{
- // --- START OF CRITICAL SECTION ---
ScopedLock l(m_mutex);
while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow
return m_cnt;
- } // --- END OF CRITICAL SECTION ---
+ }
+
+ void
+ operator++()
+ {
+ ScopedLock l(m_mutex);
+ ++m_cnt;
+ }
+
+ void
+ operator--()
+ {
+ ScopedLock l(m_mutex);
+ --m_cnt;
+ }
+
+ T
+ get() const
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt;
+ }
+
+ bool
+ operator==(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt == rhs.get();
+ }
+
+ bool
+ operator==(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt == rhs;
+ }
+
+ bool
+ operator!=(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt != rhs.get();
+ }
+
+ bool
+ operator!=(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt != rhs;
+ }
+
+ bool
+ operator>(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt > rhs.get();
+ }
+
+ bool
+ operator>(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt > rhs;
+ }
+
+ bool
+ operator<(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt < rhs.get();
+ }
+
+ bool
+ operator<(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt < rhs;
+ }
protected:
T m_cnt; ///< Internal count value