diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-07 12:42:37 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-07 12:42:37 +0000 |
commit | 22d453646b4815752134ad62e0b27841a103afb2 (patch) | |
tree | 152b6447a5c097b9617c10b7309775fc7987f996 | |
parent | 45d67efe63abecddf5ca7a68c45f308664bd1466 (diff) | |
download | qpid-python-22d453646b4815752134ad62e0b27841a103afb2.tar.gz |
QPID-3858: WIP - added AsyncResultQueue for async result return path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1347588 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 468 insertions, 47 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 7ca1a145c6..b5ac6af825 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1076,6 +1076,9 @@ set (qpidbroker_SOURCES ${qpidbroker_platform_SOURCES} qpid/amqp_0_10/Connection.h qpid/amqp_0_10/Connection.cpp + qpid/broker/AsyncResultHandle.cpp + qpid/broker/AsyncResultHandleImpl.cpp + qpid/broker/AsyncResultQueue.cpp qpid/broker/AsyncStore.cpp qpid/broker/Broker.cpp qpid/broker/Credit.cpp @@ -1498,6 +1501,8 @@ set (asyncStore_SOURCES qpid/asyncStore/QueueHandleImpl.cpp qpid/asyncStore/RunState.cpp qpid/asyncStore/TxnHandleImpl.cpp + qpid/broker/AsyncResultHandle.cpp + qpid/broker/AsyncResultHandleImpl.cpp qpid/broker/ConfigHandle.cpp qpid/broker/EnqueueHandle.cpp qpid/broker/EventHandle.cpp diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 083034acc4..6283d07ee9 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -38,11 +38,12 @@ namespace qpid { namespace asyncStore { AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, - const AsyncStoreOptions& opts) : + const AsyncStoreOptions& opts, + qpid::broker::AsyncResultQueue* resultQueue) : m_poller(poller), m_opts(opts), m_runState(), - m_operations(m_poller) + m_operations(m_poller, resultQueue) {} AsyncStoreImpl::~AsyncStoreImpl() diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 0298c74dc5..717723eda3 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -43,7 +43,8 @@ namespace asyncStore { class AsyncStoreImpl: public qpid::broker::AsyncStore { public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, - const AsyncStoreOptions& opts); + const AsyncStoreOptions& opts, + qpid::broker::AsyncResultQueue* resultQueue); virtual ~AsyncStoreImpl(); void initialize(); uint64_t getNextRid(); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 69ddf7645e..f13114f41e 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -23,11 +23,15 @@ #include "OperationQueue.h" +#include "qpid/broker/AsyncResultHandle.h" + namespace qpid { namespace asyncStore { -OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) : - m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller) +OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller, + qpid::broker::AsyncResultQueue* resultQueue) : + m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller), + m_resultQueue(resultQueue) { m_opQueue.start(); } @@ -40,7 +44,7 @@ OperationQueue::~OperationQueue() void OperationQueue::submit(const AsyncOperation* op) { -//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; +std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -49,11 +53,17 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; - if ((*i)->m_resCb) { - ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); +std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; + qpid::broker::BrokerAsyncContext* bc = (*i)->m_brokerCtxt; + qpid::broker::ResultCallback rcb = (*i)->m_resCb; + if (rcb) { +// ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); +// rcb(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc))); + if (m_resultQueue) { + (m_resultQueue->*rcb)(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc))); + } } else { - delete (*i)->m_brokerCtxt; + delete bc; } delete (*i); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h index 8a79684262..eba7c829a3 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.h +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -35,13 +35,15 @@ namespace asyncStore { class OperationQueue { public: - OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller); + OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller, + qpid::broker::AsyncResultQueue* resultQueue = 0); virtual ~OperationQueue(); void submit(const AsyncOperation* op); protected: typedef qpid::sys::PollableQueue<const AsyncOperation*> OpQueue; OpQueue m_opQueue; + qpid::broker::AsyncResultQueue* m_resultQueue; OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e); }; diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp index 4f35e8cd2a..0441e9c082 100644 --- a/cpp/src/qpid/asyncStore/Plugin.cpp +++ b/cpp/src/qpid/asyncStore/Plugin.cpp @@ -41,7 +41,7 @@ Plugin::earlyInitialize(Target& target) m_options.m_storeDir = dataDir.getPath (); } - m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options)); + m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options, 0)); // TODO: last arg: point to broker instance of AsyncResultQueue boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store); broker->setAsyncStore(brokerAsyncStore); boost::function<void()> fn = boost::bind(&Plugin::finalize, this); diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp new file mode 100644 index 0000000000..26e46fee1c --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultHandle.cpp + */ + +#include "AsyncResultHandle.h" + +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace broker { + +typedef qpid::messaging::PrivateImplRef<AsyncResultHandle> PrivateImpl; + +AsyncResultHandle::AsyncResultHandle(AsyncResultHandleImpl* p) : + qpid::messaging::Handle<AsyncResultHandleImpl>() +{ + PrivateImpl::ctor(*this, p); +} + +AsyncResultHandle::AsyncResultHandle(const AsyncResultHandle& r) : + qpid::messaging::Handle<AsyncResultHandleImpl>() +{ + PrivateImpl::copy(*this, r); +} + +AsyncResultHandle::~AsyncResultHandle() +{ + PrivateImpl::dtor(*this); +} + +AsyncResultHandle& +AsyncResultHandle::operator=(const AsyncResultHandle& r) +{ + return PrivateImpl::assign(*this, r); +} + +int +AsyncResultHandle::getErrNo() const +{ + return impl->getErrNo(); +} + +std::string +AsyncResultHandle::getErrMsg() const +{ + return impl->getErrMsg(); +} + +const BrokerAsyncContext* +AsyncResultHandle::getBrokerAsyncContext() const +{ + return impl->getBrokerAsyncContext(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h new file mode 100644 index 0000000000..6f6290bfcb --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultHandle.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultHandle.h + */ + +#ifndef qpid_broker_AsyncResultHandle_h_ +#define qpid_broker_AsyncResultHandle_h_ + +#include "AsyncResultHandleImpl.h" + +#include "qpid/messaging/Handle.h" + +namespace qpid { +namespace broker { + +class AsyncResultHandle : public qpid::messaging::Handle<AsyncResultHandleImpl> +{ +public: + AsyncResultHandle(AsyncResultHandleImpl* p = 0); + AsyncResultHandle(const AsyncResultHandle& r); + virtual ~AsyncResultHandle(); + AsyncResultHandle& operator=(const AsyncResultHandle& r); + + // AsyncResultHandleImpl methods + + int getErrNo() const; + std::string getErrMsg() const; + const BrokerAsyncContext* getBrokerAsyncContext() const; + +private: + typedef qpid::broker::AsyncResultHandleImpl Impl; + Impl* impl; + friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_AsyncResultHandle_h_ diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp new file mode 100644 index 0000000000..36d45e7b0a --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultHandleImpl.cpp + */ + +#include "AsyncResultHandleImpl.h" + +namespace qpid { +namespace broker { + +AsyncResultHandleImpl::AsyncResultHandleImpl() : + m_errNo(0), + m_errMsg(), + m_bc(0) +{} + +AsyncResultHandleImpl::AsyncResultHandleImpl(const BrokerAsyncContext* bc) : + m_errNo(0), + m_errMsg(), + m_bc(bc) +{} + +AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc) : + m_errNo(errNo), + m_errMsg(errMsg), + m_bc(bc) +{} + +AsyncResultHandleImpl::~AsyncResultHandleImpl() +{} + +int +AsyncResultHandleImpl::getErrNo() const +{ + return m_errNo; +} + +std::string +AsyncResultHandleImpl::getErrMsg() const +{ + return m_errMsg; +} + +const BrokerAsyncContext* +AsyncResultHandleImpl::getBrokerAsyncContext() const +{ + return m_bc; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.h b/cpp/src/qpid/broker/AsyncResultHandleImpl.h new file mode 100644 index 0000000000..e1bd1fa0e9 --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultHandleImpl.h + */ + +#ifndef qpid_broker_AsyncResultHandleImpl_h_ +#define qpid_broker_AsyncResultHandleImpl_h_ + +#include "AsyncStore.h" + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace broker { + +class AsyncResultHandleImpl : public virtual qpid::RefCounted +{ +public: + AsyncResultHandleImpl(); + AsyncResultHandleImpl(const BrokerAsyncContext* bc); + AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc); + virtual ~AsyncResultHandleImpl(); + + int getErrNo() const; + std::string getErrMsg() const; + const BrokerAsyncContext* getBrokerAsyncContext() const; + +private: + const int m_errNo; + const std::string m_errMsg; + const BrokerAsyncContext* m_bc; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_AsyncResultHandleImpl_h_ diff --git a/cpp/src/qpid/broker/AsyncResultQueue.cpp b/cpp/src/qpid/broker/AsyncResultQueue.cpp new file mode 100644 index 0000000000..1094a582f4 --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultQueue.cpp @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultQueue.cpp + */ + +#include "AsyncResultQueue.h" + +namespace qpid { +namespace broker { + +AsyncResultQueue::AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) : + m_resQueue(boost::bind(&AsyncResultQueue::handle, this, _1), poller) +{ + m_resQueue.start(); +} + +AsyncResultQueue::~AsyncResultQueue() +{ + m_resQueue.stop(); +} + +void +AsyncResultQueue::submit(AsyncResultHandle* res) +{ + m_resQueue.push(res); +} + +//static +/* +void +AsyncResultQueue::submit(AsyncResultQueue* arq, AsyncResultHandle* rh) +{ + arq->submit(rh); +} +*/ + +// protected +AsyncResultQueue::ResultQueue::Batch::const_iterator +AsyncResultQueue::handle(const ResultQueue::Batch& e) +{ + return e.end(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueue.h b/cpp/src/qpid/broker/AsyncResultQueue.h new file mode 100644 index 0000000000..8881f25bac --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultQueue.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultQueue.h + */ + +#ifndef qpid_broker_AsyncResultQueue_h_ +#define qpid_broker_AsyncResultQueue_h_ + +#include "qpid/sys/PollableQueue.h" + +namespace qpid { +namespace broker { + +class AsyncResultHandle; + +class AsyncResultQueue +{ +public: + AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller); + virtual ~AsyncResultQueue(); + void submit(AsyncResultHandle* rh); +// static void submit(AsyncResultQueue* arq, AsyncResultHandle* rh); + +protected: + typedef qpid::sys::PollableQueue<const AsyncResultHandle*> ResultQueue; + ResultQueue m_resQueue; + + ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e); +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_AsyncResultQueue_h_ diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp index 649049bf41..d37b034648 100644 --- a/cpp/src/qpid/broker/AsyncStore.cpp +++ b/cpp/src/qpid/broker/AsyncStore.cpp @@ -34,6 +34,7 @@ AsyncStore::AsyncStore() AsyncStore::~AsyncStore() {} +/* AsyncResult::AsyncResult() : errNo(0), errMsg() @@ -50,5 +51,6 @@ AsyncResult::destroy() { delete this; } +*/ }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index eb47d62cf0..c57bdaa552 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -37,7 +37,36 @@ public: virtual ~BrokerAsyncContext(); }; -// Subclassed by broker: +// Callback definition: +//struct AsyncResult { +// int errNo; // 0 implies no error +// std::string errMsg; +// AsyncResult(); +// AsyncResult(const int errNo, +// const std::string& errMsg); +// void destroy(); +//}; +//typedef void (*ResultCallback)(const AsyncResult*, BrokerAsyncContext*); + +class AsyncResultHandle; +class AsyncResultQueue; // Implements the result callback function + +// Singleton class in broker which contains return pollable queue. Use submitAsyncResult() to add reulsts to queue. +class AsyncResultHandler { +public: + virtual ~AsyncResultHandler(); + + // Factory method to create result handle + + virtual AsyncResultHandle createAsyncResultHandle(const int errNo, const std::string& errMsg, BrokerAsyncContext*) = 0; + + // Async return interface + + virtual void submitAsyncResult(AsyncResultHandle&) = 0; +}; +typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultHandle*); +//typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultQueue*, AsyncResultHandle*); + class DataSource { public: virtual ~DataSource(); @@ -45,25 +74,13 @@ public: virtual void write(char* target) = 0; }; -// Defined by store, all implement qpid::messaging::Handle-type template to hide ref counting: class ConfigHandle; -class QueueHandle; -class TxnHandle; +class EnqueueHandle; class EventHandle; class MessageHandle; -class EnqueueHandle; +class QueueHandle; +class TxnHandle; -// Callback definition: -struct AsyncResult -{ - int errNo; // 0 implies no error - std::string errMsg; - AsyncResult(); - AsyncResult(const int errNo, - const std::string& errMsg); - void destroy(); -}; -typedef void (*ResultCallback)(const AsyncResult*, BrokerAsyncContext*); // Subclassed by store: class AsyncStore { @@ -73,12 +90,12 @@ public: // Factory methods for creating handles - virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; virtual ConfigHandle createConfigHandle() = 0; - virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; + virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0; virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0; virtual MessageHandle createMessageHandle(const DataSource*) = 0; - virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0; + virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Store async interface diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp index e7cab4d621..fc04bc746e 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp @@ -34,7 +34,7 @@ MockPersistableMessage::MockPersistableMessage(const char* msgData, qpid::asyncStore::AsyncStoreImpl* store) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), - m_msgHandle(store ? store->createMessageHandle(this) : store->createMessageHandle(0)) + m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) {} MockPersistableMessage::~MockPersistableMessage() diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp index 009f54a157..49d656aee4 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp @@ -30,6 +30,7 @@ #include "QueuedMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/AsyncResultQueue.h" namespace tests { namespace storePerftools { @@ -37,10 +38,12 @@ namespace asyncPerf { MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store) : + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& resultQueue) : qpid::broker::PersistableQueue(), m_name(name), m_store(store), + m_resultQueue(resultQueue), m_asyncOpCounter(0UL), m_persistenceId(0ULL), m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. @@ -64,6 +67,7 @@ MockPersistableQueue::~MockPersistableQueue() } // static +/* void MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerAsyncContext* bc) @@ -102,6 +106,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, if (bc) delete bc; if (res) delete res; } +*/ const qpid::broker::QueueHandle& MockPersistableQueue::getHandle() const @@ -124,10 +129,12 @@ MockPersistableQueue::getStore() void MockPersistableQueue::asyncCreate() { + qpid::broker::ResultCallback rcb = &qpid::broker::AsyncResultQueue::submit; if (m_store) { m_store->submitCreate(m_queueHandle, this, - &handleAsyncResult, + rcb, +// &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); ++m_asyncOpCounter; @@ -141,7 +148,7 @@ MockPersistableQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { m_store->submitDestroy(m_queueHandle, - &handleAsyncResult, + &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); ++m_asyncOpCounter; @@ -329,7 +336,7 @@ MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn, //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(), txn->getHandle(), - &handleAsyncResult, + &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qm.payload(), qpid::asyncStore::AsyncOperation::MSG_ENQUEUE)); @@ -346,7 +353,7 @@ MockPersistableQueue::asyncDequeue(MockTransactionContext* txn, qpid::broker::EnqueueHandle enqHandle = qm.enqHandle(); m_store->submitDequeue(enqHandle, txn->getHandle(), - &handleAsyncResult, + &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qm.payload(), qpid::asyncStore::AsyncOperation::MSG_DEQUEUE)); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h index ff6db93542..e62aeec420 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h @@ -38,7 +38,9 @@ namespace qpid { namespace asyncStore { class AsyncStoreImpl; } - +namespace broker { +class AsyncResultQueue; +} namespace framing { class FieldTable; }} @@ -61,11 +63,12 @@ class MockPersistableQueue : public boost::enable_shared_from_this<MockPersistab public: MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store); + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& rq); virtual ~MockPersistableQueue(); - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc); +// static void handleAsyncResult(const qpid::broker::AsyncResult* res, +// qpid::broker::BrokerAsyncContext* bc); const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); qpid::asyncStore::AsyncStoreImpl* getStore(); @@ -99,6 +102,7 @@ public: protected: const std::string m_name; qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; AsyncOpCounter m_asyncOpCounter; mutable uint64_t m_persistenceId; std::string m_persistableData; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp index c444f596e5..0ac0c7732f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp @@ -64,6 +64,7 @@ MockTransactionContext::~MockTransactionContext() {} // static +/* void MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerAsyncContext* bc) @@ -96,6 +97,7 @@ MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, if (bc) delete bc; if (res) delete res; } +*/ const qpid::broker::TxnHandle& MockTransactionContext::getHandle() const diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h index 3f70b0bfda..d727caede6 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h @@ -53,8 +53,8 @@ public: MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, const std::string& xid = std::string()); virtual ~MockTransactionContext(); - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc); +// static void handleAsyncResult(const qpid::broker::AsyncResult* res, +// qpid::broker::BrokerAsyncContext* bc); const qpid::broker::TxnHandle& getHandle() const; qpid::broker::TxnHandle& getHandle(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 184a899570..66e0bb3dbf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -32,6 +32,7 @@ #include "tests/storePerftools/common/Thread.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/AsyncResultQueue.h" #include "qpid/sys/Poller.h" #include <iomanip> @@ -48,6 +49,7 @@ PerfTest::PerfTest(const TestOptions& to, m_msgData(new char[to.m_msgSize]), m_poller(new qpid::sys::Poller), m_pollingThread(m_poller.get()), + m_resultQueue(m_poller), m_store(0) { std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); @@ -68,7 +70,7 @@ PerfTest::~PerfTest() void PerfTest::prepareStore() { - m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); + m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts, &m_resultQueue); m_store->initialize(); } @@ -86,7 +88,7 @@ PerfTest::prepareQueues() for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store)); + boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index 3bd3f6bd32..46455e4af0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -28,6 +28,7 @@ #include "tests/storePerftools/common/Streamable.h" +#include "qpid/broker/AsyncResultQueue.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Thread.h" @@ -69,6 +70,7 @@ protected: const char* m_msgData; boost::shared_ptr<qpid::sys::Poller> m_poller; qpid::sys::Thread m_pollingThread; + qpid::broker::AsyncResultQueue m_resultQueue; qpid::asyncStore::AsyncStoreImpl* m_store; std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList; std::deque<boost::shared_ptr<MessageProducer> > m_producers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 7903d6551a..802279bbf9 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -27,6 +27,7 @@ #include "MockPersistableQueue.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +//#include "qpid/broker/EnqueueHandle.h" namespace tests { namespace storePerftools { @@ -40,7 +41,7 @@ QueuedMessage::QueuedMessage(MockPersistableQueue* q, boost::shared_ptr<MockPersistableMessage> msg) : m_queue(q), m_msg(msg), - m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle())) + m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0)) {} QueuedMessage::QueuedMessage(const QueuedMessage& qm) : diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp index 2f4461e8b5..dccfc4fcbf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp @@ -86,7 +86,7 @@ TestOptions::doAddOptions() ("durable", qpid::optValue(m_durable), "Queues and messages are durable") ("destroy-queues", qpid::optValue(m_destroyQueuesOnCompletion), - "Destroy queue recoreds persistent store on test completion") + "Destroy queues in persistent store on test completion") ; } |