summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-07 12:42:37 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-07 12:42:37 +0000
commit22d453646b4815752134ad62e0b27841a103afb2 (patch)
tree152b6447a5c097b9617c10b7309775fc7987f996
parent45d67efe63abecddf5ca7a68c45f308664bd1466 (diff)
downloadqpid-python-22d453646b4815752134ad62e0b27841a103afb2.tar.gz
QPID-3858: WIP - added AsyncResultQueue for async result return path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1347588 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt5
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp5
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h3
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp24
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.h4
-rw-r--r--cpp/src/qpid/asyncStore/Plugin.cpp2
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandle.cpp74
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandle.h56
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandleImpl.cpp68
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandleImpl.h54
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueue.cpp62
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueue.h51
-rw-r--r--cpp/src/qpid/broker/AsyncStore.cpp2
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h55
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp17
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h12
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp3
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp2
23 files changed, 468 insertions, 47 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 7ca1a145c6..b5ac6af825 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1076,6 +1076,9 @@ set (qpidbroker_SOURCES
${qpidbroker_platform_SOURCES}
qpid/amqp_0_10/Connection.h
qpid/amqp_0_10/Connection.cpp
+ qpid/broker/AsyncResultHandle.cpp
+ qpid/broker/AsyncResultHandleImpl.cpp
+ qpid/broker/AsyncResultQueue.cpp
qpid/broker/AsyncStore.cpp
qpid/broker/Broker.cpp
qpid/broker/Credit.cpp
@@ -1498,6 +1501,8 @@ set (asyncStore_SOURCES
qpid/asyncStore/QueueHandleImpl.cpp
qpid/asyncStore/RunState.cpp
qpid/asyncStore/TxnHandleImpl.cpp
+ qpid/broker/AsyncResultHandle.cpp
+ qpid/broker/AsyncResultHandleImpl.cpp
qpid/broker/ConfigHandle.cpp
qpid/broker/EnqueueHandle.cpp
qpid/broker/EventHandle.cpp
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 083034acc4..6283d07ee9 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -38,11 +38,12 @@ namespace qpid {
namespace asyncStore {
AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
- const AsyncStoreOptions& opts) :
+ const AsyncStoreOptions& opts,
+ qpid::broker::AsyncResultQueue* resultQueue) :
m_poller(poller),
m_opts(opts),
m_runState(),
- m_operations(m_poller)
+ m_operations(m_poller, resultQueue)
{}
AsyncStoreImpl::~AsyncStoreImpl()
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 0298c74dc5..717723eda3 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -43,7 +43,8 @@ namespace asyncStore {
class AsyncStoreImpl: public qpid::broker::AsyncStore {
public:
AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
- const AsyncStoreOptions& opts);
+ const AsyncStoreOptions& opts,
+ qpid::broker::AsyncResultQueue* resultQueue);
virtual ~AsyncStoreImpl();
void initialize();
uint64_t getNextRid();
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 69ddf7645e..f13114f41e 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -23,11 +23,15 @@
#include "OperationQueue.h"
+#include "qpid/broker/AsyncResultHandle.h"
+
namespace qpid {
namespace asyncStore {
-OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) :
- m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller)
+OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller,
+ qpid::broker::AsyncResultQueue* resultQueue) :
+ m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller),
+ m_resultQueue(resultQueue)
{
m_opQueue.start();
}
@@ -40,7 +44,7 @@ OperationQueue::~OperationQueue()
void
OperationQueue::submit(const AsyncOperation* op)
{
-//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -49,11 +53,17 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
- if ((*i)->m_resCb) {
- ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
+std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+ qpid::broker::BrokerAsyncContext* bc = (*i)->m_brokerCtxt;
+ qpid::broker::ResultCallback rcb = (*i)->m_resCb;
+ if (rcb) {
+// ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
+// rcb(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
+ if (m_resultQueue) {
+ (m_resultQueue->*rcb)(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
+ }
} else {
- delete (*i)->m_brokerCtxt;
+ delete bc;
}
delete (*i);
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h
index 8a79684262..eba7c829a3 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.h
+++ b/cpp/src/qpid/asyncStore/OperationQueue.h
@@ -35,13 +35,15 @@ namespace asyncStore {
class OperationQueue
{
public:
- OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller);
+ OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller,
+ qpid::broker::AsyncResultQueue* resultQueue = 0);
virtual ~OperationQueue();
void submit(const AsyncOperation* op);
protected:
typedef qpid::sys::PollableQueue<const AsyncOperation*> OpQueue;
OpQueue m_opQueue;
+ qpid::broker::AsyncResultQueue* m_resultQueue;
OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e);
};
diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp
index 4f35e8cd2a..0441e9c082 100644
--- a/cpp/src/qpid/asyncStore/Plugin.cpp
+++ b/cpp/src/qpid/asyncStore/Plugin.cpp
@@ -41,7 +41,7 @@ Plugin::earlyInitialize(Target& target)
m_options.m_storeDir = dataDir.getPath ();
}
- m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options));
+ m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options, 0)); // TODO: last arg: point to broker instance of AsyncResultQueue
boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store);
broker->setAsyncStore(brokerAsyncStore);
boost::function<void()> fn = boost::bind(&Plugin::finalize, this);
diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp
new file mode 100644
index 0000000000..26e46fee1c
--- /dev/null
+++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncResultHandle.cpp
+ */
+
+#include "AsyncResultHandle.h"
+
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace broker {
+
+typedef qpid::messaging::PrivateImplRef<AsyncResultHandle> PrivateImpl;
+
+AsyncResultHandle::AsyncResultHandle(AsyncResultHandleImpl* p) :
+ qpid::messaging::Handle<AsyncResultHandleImpl>()
+{
+ PrivateImpl::ctor(*this, p);
+}
+
+AsyncResultHandle::AsyncResultHandle(const AsyncResultHandle& r) :
+ qpid::messaging::Handle<AsyncResultHandleImpl>()
+{
+ PrivateImpl::copy(*this, r);
+}
+
+AsyncResultHandle::~AsyncResultHandle()
+{
+ PrivateImpl::dtor(*this);
+}
+
+AsyncResultHandle&
+AsyncResultHandle::operator=(const AsyncResultHandle& r)
+{
+ return PrivateImpl::assign(*this, r);
+}
+
+int
+AsyncResultHandle::getErrNo() const
+{
+ return impl->getErrNo();
+}
+
+std::string
+AsyncResultHandle::getErrMsg() const
+{
+ return impl->getErrMsg();
+}
+
+const BrokerAsyncContext*
+AsyncResultHandle::getBrokerAsyncContext() const
+{
+ return impl->getBrokerAsyncContext();
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h
new file mode 100644
index 0000000000..6f6290bfcb
--- /dev/null
+++ b/cpp/src/qpid/broker/AsyncResultHandle.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncResultHandle.h
+ */
+
+#ifndef qpid_broker_AsyncResultHandle_h_
+#define qpid_broker_AsyncResultHandle_h_
+
+#include "AsyncResultHandleImpl.h"
+
+#include "qpid/messaging/Handle.h"
+
+namespace qpid {
+namespace broker {
+
+class AsyncResultHandle : public qpid::messaging::Handle<AsyncResultHandleImpl>
+{
+public:
+ AsyncResultHandle(AsyncResultHandleImpl* p = 0);
+ AsyncResultHandle(const AsyncResultHandle& r);
+ virtual ~AsyncResultHandle();
+ AsyncResultHandle& operator=(const AsyncResultHandle& r);
+
+ // AsyncResultHandleImpl methods
+
+ int getErrNo() const;
+ std::string getErrMsg() const;
+ const BrokerAsyncContext* getBrokerAsyncContext() const;
+
+private:
+ typedef qpid::broker::AsyncResultHandleImpl Impl;
+ Impl* impl;
+ friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_AsyncResultHandle_h_
diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp
new file mode 100644
index 0000000000..36d45e7b0a
--- /dev/null
+++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncResultHandleImpl.cpp
+ */
+
+#include "AsyncResultHandleImpl.h"
+
+namespace qpid {
+namespace broker {
+
+AsyncResultHandleImpl::AsyncResultHandleImpl() :
+ m_errNo(0),
+ m_errMsg(),
+ m_bc(0)
+{}
+
+AsyncResultHandleImpl::AsyncResultHandleImpl(const BrokerAsyncContext* bc) :
+ m_errNo(0),
+ m_errMsg(),
+ m_bc(bc)
+{}
+
+AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc) :
+ m_errNo(errNo),
+ m_errMsg(errMsg),
+ m_bc(bc)
+{}
+
+AsyncResultHandleImpl::~AsyncResultHandleImpl()
+{}
+
+int
+AsyncResultHandleImpl::getErrNo() const
+{
+ return m_errNo;
+}
+
+std::string
+AsyncResultHandleImpl::getErrMsg() const
+{
+ return m_errMsg;
+}
+
+const BrokerAsyncContext*
+AsyncResultHandleImpl::getBrokerAsyncContext() const
+{
+ return m_bc;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.h b/cpp/src/qpid/broker/AsyncResultHandleImpl.h
new file mode 100644
index 0000000000..e1bd1fa0e9
--- /dev/null
+++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncResultHandleImpl.h
+ */
+
+#ifndef qpid_broker_AsyncResultHandleImpl_h_
+#define qpid_broker_AsyncResultHandleImpl_h_
+
+#include "AsyncStore.h"
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace broker {
+
+class AsyncResultHandleImpl : public virtual qpid::RefCounted
+{
+public:
+ AsyncResultHandleImpl();
+ AsyncResultHandleImpl(const BrokerAsyncContext* bc);
+ AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc);
+ virtual ~AsyncResultHandleImpl();
+
+ int getErrNo() const;
+ std::string getErrMsg() const;
+ const BrokerAsyncContext* getBrokerAsyncContext() const;
+
+private:
+ const int m_errNo;
+ const std::string m_errMsg;
+ const BrokerAsyncContext* m_bc;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_AsyncResultHandleImpl_h_
diff --git a/cpp/src/qpid/broker/AsyncResultQueue.cpp b/cpp/src/qpid/broker/AsyncResultQueue.cpp
new file mode 100644
index 0000000000..1094a582f4
--- /dev/null
+++ b/cpp/src/qpid/broker/AsyncResultQueue.cpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncResultQueue.cpp
+ */
+
+#include "AsyncResultQueue.h"
+
+namespace qpid {
+namespace broker {
+
+AsyncResultQueue::AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) :
+ m_resQueue(boost::bind(&AsyncResultQueue::handle, this, _1), poller)
+{
+ m_resQueue.start();
+}
+
+AsyncResultQueue::~AsyncResultQueue()
+{
+ m_resQueue.stop();
+}
+
+void
+AsyncResultQueue::submit(AsyncResultHandle* res)
+{
+ m_resQueue.push(res);
+}
+
+//static
+/*
+void
+AsyncResultQueue::submit(AsyncResultQueue* arq, AsyncResultHandle* rh)
+{
+ arq->submit(rh);
+}
+*/
+
+// protected
+AsyncResultQueue::ResultQueue::Batch::const_iterator
+AsyncResultQueue::handle(const ResultQueue::Batch& e)
+{
+ return e.end();
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultQueue.h b/cpp/src/qpid/broker/AsyncResultQueue.h
new file mode 100644
index 0000000000..8881f25bac
--- /dev/null
+++ b/cpp/src/qpid/broker/AsyncResultQueue.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncResultQueue.h
+ */
+
+#ifndef qpid_broker_AsyncResultQueue_h_
+#define qpid_broker_AsyncResultQueue_h_
+
+#include "qpid/sys/PollableQueue.h"
+
+namespace qpid {
+namespace broker {
+
+class AsyncResultHandle;
+
+class AsyncResultQueue
+{
+public:
+ AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller);
+ virtual ~AsyncResultQueue();
+ void submit(AsyncResultHandle* rh);
+// static void submit(AsyncResultQueue* arq, AsyncResultHandle* rh);
+
+protected:
+ typedef qpid::sys::PollableQueue<const AsyncResultHandle*> ResultQueue;
+ ResultQueue m_resQueue;
+
+ ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e);
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_AsyncResultQueue_h_
diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp
index 649049bf41..d37b034648 100644
--- a/cpp/src/qpid/broker/AsyncStore.cpp
+++ b/cpp/src/qpid/broker/AsyncStore.cpp
@@ -34,6 +34,7 @@ AsyncStore::AsyncStore()
AsyncStore::~AsyncStore()
{}
+/*
AsyncResult::AsyncResult() :
errNo(0),
errMsg()
@@ -50,5 +51,6 @@ AsyncResult::destroy()
{
delete this;
}
+*/
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index eb47d62cf0..c57bdaa552 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -37,7 +37,36 @@ public:
virtual ~BrokerAsyncContext();
};
-// Subclassed by broker:
+// Callback definition:
+//struct AsyncResult {
+// int errNo; // 0 implies no error
+// std::string errMsg;
+// AsyncResult();
+// AsyncResult(const int errNo,
+// const std::string& errMsg);
+// void destroy();
+//};
+//typedef void (*ResultCallback)(const AsyncResult*, BrokerAsyncContext*);
+
+class AsyncResultHandle;
+class AsyncResultQueue; // Implements the result callback function
+
+// Singleton class in broker which contains return pollable queue. Use submitAsyncResult() to add reulsts to queue.
+class AsyncResultHandler {
+public:
+ virtual ~AsyncResultHandler();
+
+ // Factory method to create result handle
+
+ virtual AsyncResultHandle createAsyncResultHandle(const int errNo, const std::string& errMsg, BrokerAsyncContext*) = 0;
+
+ // Async return interface
+
+ virtual void submitAsyncResult(AsyncResultHandle&) = 0;
+};
+typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultHandle*);
+//typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultQueue*, AsyncResultHandle*);
+
class DataSource {
public:
virtual ~DataSource();
@@ -45,25 +74,13 @@ public:
virtual void write(char* target) = 0;
};
-// Defined by store, all implement qpid::messaging::Handle-type template to hide ref counting:
class ConfigHandle;
-class QueueHandle;
-class TxnHandle;
+class EnqueueHandle;
class EventHandle;
class MessageHandle;
-class EnqueueHandle;
+class QueueHandle;
+class TxnHandle;
-// Callback definition:
-struct AsyncResult
-{
- int errNo; // 0 implies no error
- std::string errMsg;
- AsyncResult();
- AsyncResult(const int errNo,
- const std::string& errMsg);
- void destroy();
-};
-typedef void (*ResultCallback)(const AsyncResult*, BrokerAsyncContext*);
// Subclassed by store:
class AsyncStore {
@@ -73,12 +90,12 @@ public:
// Factory methods for creating handles
- virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0;
virtual ConfigHandle createConfigHandle() = 0;
- virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0;
+ virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0;
virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0;
virtual MessageHandle createMessageHandle(const DataSource*) = 0;
- virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0;
+ virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0;
// Store async interface
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
index e7cab4d621..fc04bc746e 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
@@ -34,7 +34,7 @@ MockPersistableMessage::MockPersistableMessage(const char* msgData,
qpid::asyncStore::AsyncStoreImpl* store) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
- m_msgHandle(store ? store->createMessageHandle(this) : store->createMessageHandle(0))
+ m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0))
{}
MockPersistableMessage::~MockPersistableMessage()
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
index 009f54a157..49d656aee4 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
@@ -30,6 +30,7 @@
#include "QueuedMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/AsyncResultQueue.h"
namespace tests {
namespace storePerftools {
@@ -37,10 +38,12 @@ namespace asyncPerf {
MockPersistableQueue::MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store) :
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& resultQueue) :
qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
+ m_resultQueue(resultQueue),
m_asyncOpCounter(0UL),
m_persistenceId(0ULL),
m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
@@ -64,6 +67,7 @@ MockPersistableQueue::~MockPersistableQueue()
}
// static
+/*
void
MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
qpid::broker::BrokerAsyncContext* bc)
@@ -102,6 +106,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (bc) delete bc;
if (res) delete res;
}
+*/
const qpid::broker::QueueHandle&
MockPersistableQueue::getHandle() const
@@ -124,10 +129,12 @@ MockPersistableQueue::getStore()
void
MockPersistableQueue::asyncCreate()
{
+ qpid::broker::ResultCallback rcb = &qpid::broker::AsyncResultQueue::submit;
if (m_store) {
m_store->submitCreate(m_queueHandle,
this,
- &handleAsyncResult,
+ rcb,
+// &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
++m_asyncOpCounter;
@@ -141,7 +148,7 @@ MockPersistableQueue::asyncDestroy(const bool deleteQueue)
if (m_store) {
if (deleteQueue) {
m_store->submitDestroy(m_queueHandle,
- &handleAsyncResult,
+ &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
++m_asyncOpCounter;
@@ -329,7 +336,7 @@ MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn,
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(),
txn->getHandle(),
- &handleAsyncResult,
+ &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qm.payload(),
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE));
@@ -346,7 +353,7 @@ MockPersistableQueue::asyncDequeue(MockTransactionContext* txn,
qpid::broker::EnqueueHandle enqHandle = qm.enqHandle();
m_store->submitDequeue(enqHandle,
txn->getHandle(),
- &handleAsyncResult,
+ &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qm.payload(),
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE));
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
index ff6db93542..e62aeec420 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
@@ -38,7 +38,9 @@ namespace qpid {
namespace asyncStore {
class AsyncStoreImpl;
}
-
+namespace broker {
+class AsyncResultQueue;
+}
namespace framing {
class FieldTable;
}}
@@ -61,11 +63,12 @@ class MockPersistableQueue : public boost::enable_shared_from_this<MockPersistab
public:
MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& args,
- qpid::asyncStore::AsyncStoreImpl* store);
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& rq);
virtual ~MockPersistableQueue();
- static void handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc);
+// static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+// qpid::broker::BrokerAsyncContext* bc);
const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
qpid::asyncStore::AsyncStoreImpl* getStore();
@@ -99,6 +102,7 @@ public:
protected:
const std::string m_name;
qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncResultQueue& m_resultQueue;
AsyncOpCounter m_asyncOpCounter;
mutable uint64_t m_persistenceId;
std::string m_persistableData;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
index c444f596e5..0ac0c7732f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
@@ -64,6 +64,7 @@ MockTransactionContext::~MockTransactionContext()
{}
// static
+/*
void
MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
qpid::broker::BrokerAsyncContext* bc)
@@ -96,6 +97,7 @@ MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (bc) delete bc;
if (res) delete res;
}
+*/
const qpid::broker::TxnHandle&
MockTransactionContext::getHandle() const
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
index 3f70b0bfda..d727caede6 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
@@ -53,8 +53,8 @@ public:
MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
const std::string& xid = std::string());
virtual ~MockTransactionContext();
- static void handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc);
+// static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+// qpid::broker::BrokerAsyncContext* bc);
const qpid::broker::TxnHandle& getHandle() const;
qpid::broker::TxnHandle& getHandle();
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 184a899570..66e0bb3dbf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -32,6 +32,7 @@
#include "tests/storePerftools/common/Thread.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/AsyncResultQueue.h"
#include "qpid/sys/Poller.h"
#include <iomanip>
@@ -48,6 +49,7 @@ PerfTest::PerfTest(const TestOptions& to,
m_msgData(new char[to.m_msgSize]),
m_poller(new qpid::sys::Poller),
m_pollingThread(m_poller.get()),
+ m_resultQueue(m_poller),
m_store(0)
{
std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize);
@@ -68,7 +70,7 @@ PerfTest::~PerfTest()
void
PerfTest::prepareStore()
{
- m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
+ m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts, &m_resultQueue);
m_store->initialize();
}
@@ -86,7 +88,7 @@ PerfTest::prepareQueues()
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store));
+ boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
mpq->asyncCreate();
m_queueList.push_back(mpq);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index 3bd3f6bd32..46455e4af0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -28,6 +28,7 @@
#include "tests/storePerftools/common/Streamable.h"
+#include "qpid/broker/AsyncResultQueue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Thread.h"
@@ -69,6 +70,7 @@ protected:
const char* m_msgData;
boost::shared_ptr<qpid::sys::Poller> m_poller;
qpid::sys::Thread m_pollingThread;
+ qpid::broker::AsyncResultQueue m_resultQueue;
qpid::asyncStore::AsyncStoreImpl* m_store;
std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList;
std::deque<boost::shared_ptr<MessageProducer> > m_producers;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 7903d6551a..802279bbf9 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -27,6 +27,7 @@
#include "MockPersistableQueue.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+//#include "qpid/broker/EnqueueHandle.h"
namespace tests {
namespace storePerftools {
@@ -40,7 +41,7 @@ QueuedMessage::QueuedMessage(MockPersistableQueue* q,
boost::shared_ptr<MockPersistableMessage> msg) :
m_queue(q),
m_msg(msg),
- m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()))
+ m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0))
{}
QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
index 2f4461e8b5..dccfc4fcbf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
@@ -86,7 +86,7 @@ TestOptions::doAddOptions()
("durable", qpid::optValue(m_durable),
"Queues and messages are durable")
("destroy-queues", qpid::optValue(m_destroyQueuesOnCompletion),
- "Destroy queue recoreds persistent store on test completion")
+ "Destroy queues in persistent store on test completion")
;
}