summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-11 12:19:07 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-11 12:19:07 +0000
commita3861c46a7151a250fd06f54a60b9c1fe4bd6a1e (patch)
tree05da4567a96e4e3461f7aae7c76494d8111cd3b4
parent7ed6d96613a02d53a34a5c8e8d524a0b3c19d83d (diff)
downloadqpid-python-a3861c46a7151a250fd06f54a60b9c1fe4bd6a1e.tar.gz
QPID-3858: Added async queue deletion and mechanism to correctly wait for async completion of store deletion before destroying queue objects
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1337126 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp3
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp13
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h3
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp51
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h15
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp17
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h6
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp48
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h11
9 files changed, 97 insertions, 70 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp
index 939d65ddf9..6dac318d14 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp
@@ -58,7 +58,8 @@ AsyncStoreOptions::validate()
{}
//static
-std::string& AsyncStoreOptions::getDefaultStoreDir()
+std::string&
+AsyncStoreOptions::getDefaultStoreDir()
{
static std::string s_defaultStoreDir = "/tmp";
return s_defaultStoreDir;
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp
index 75fc921494..5cc829f4d2 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp
@@ -22,6 +22,7 @@
*/
#include "MockPersistableMessage.h"
+#include "MockPersistableQueue.h" // debug statements in enqueueComplete() and dequeueComplete()
#include "qpid/asyncStore/AsyncStoreImpl.h"
@@ -59,7 +60,7 @@ MockPersistableMessage::MessageContext::destroy()
MockPersistableMessage::MockPersistableMessage(const char* msgData,
const uint32_t msgSize,
- AsyncStoreImplPtr store,
+ qpid::asyncStore::AsyncStoreImpl* store,
const bool persistent) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
@@ -163,16 +164,18 @@ MockPersistableMessage::write(char* target)
// protected
void
-MockPersistableMessage::enqueueComplete(const MessageContext* /*mc*/)
+MockPersistableMessage::enqueueComplete(const MessageContext* mc)
{
-//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl;
+//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush;
+ assert(mc->m_msg.get() == this);
}
// protected
void
-MockPersistableMessage::dequeueComplete(const MessageContext* /*mc*/)
+MockPersistableMessage::dequeueComplete(const MessageContext* mc)
{
-//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl;
+//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush;
+ assert(mc->m_msg.get() == this);
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h
index a139328690..fa9bc8e937 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h
@@ -44,7 +44,6 @@ namespace asyncPerf {
class MockPersistableMessage;
class MockPersistableQueue;
-typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr;
class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource
@@ -66,7 +65,7 @@ public:
MockPersistableMessage(const char* msgData,
const uint32_t msgSize,
- AsyncStoreImplPtr store,
+ qpid::asyncStore::AsyncStoreImpl* store,
const bool persistent = true);
virtual ~MockPersistableMessage();
static void handleAsyncResult(const qpid::broker::AsyncResult* res,
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
index 3b92af9803..69af020a26 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
@@ -37,7 +37,7 @@ namespace asyncPerf {
// --- Inner class MockPersistableQueue::QueueContext ---
-MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q,
+MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueuePtr q,
const qpid::asyncStore::AsyncOperation::opCode op) :
m_q(q),
m_op(op)
@@ -62,9 +62,10 @@ MockPersistableQueue::QueueContext::destroy()
MockPersistableQueue::MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- AsyncStoreImplPtr store,
+ qpid::asyncStore::AsyncStoreImpl* store,
const TestOptions& to,
const char* msgData) :
+ qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
m_persistenceId(0ULL),
@@ -74,11 +75,6 @@ MockPersistableQueue::MockPersistableQueue(const std::string& name,
{
const qpid::types::Variant::Map qo;
m_queueHandle = m_store->createQueueHandle(m_name, qo);
- qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE);
- m_store->submitCreate(m_queueHandle,
- dynamic_cast<const qpid::broker::DataSource*>(this),
- &handleAsyncResult,
- bc);
}
MockPersistableQueue::~MockPersistableQueue()
@@ -131,6 +127,25 @@ MockPersistableQueue::getHandle()
return m_queueHandle;
}
+// static
+void
+MockPersistableQueue::asyncStoreCreate(MockPersistableQueuePtr& qp)
+{
+ qp->m_store->submitCreate(qp->m_queueHandle,
+ dynamic_cast<const qpid::broker::DataSource*>(qp.get()),
+ &handleAsyncResult,
+ new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
+}
+
+// static
+void
+MockPersistableQueue::asyncStoreDestroy(MockPersistableQueuePtr& qp)
+{
+ qp->m_store->submitDestroy(qp->m_queueHandle,
+ &handleAsyncResult,
+ new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
+}
+
void*
MockPersistableQueue::runEnqueues()
{
@@ -145,7 +160,9 @@ MockPersistableQueue::runEnqueues()
MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true));
msg->setPersistenceId(m_store->getNextRid());
qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle);
- MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this);
+ MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg,
+ qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
+ this);
if (useTxn) {
m_store->submitEnqueue(enqHandle,
txn->getHandle(),
@@ -158,7 +175,6 @@ MockPersistableQueue::runEnqueues()
}
QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn));
push(qm);
-//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl;
if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) {
txn->commit();
txnCnt = 0;
@@ -207,8 +223,6 @@ MockPersistableQueue::runDequeues()
txn->commit();
txnCnt = 0;
}
- } else {
-// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning
}
}
if (txnCnt > 0) {
@@ -290,23 +304,26 @@ MockPersistableQueue::write(char* target)
// protected
void
-MockPersistableQueue::createComplete(const QueueContext* /*qc*/)
+MockPersistableQueue::createComplete(const QueueContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl;
+//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush;
+ assert(qc->m_q.get() == this);
}
// protected
void
-MockPersistableQueue::flushComplete(const QueueContext* /*qc*/)
+MockPersistableQueue::flushComplete(const QueueContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl;
+//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush;
+ assert(qc->m_q.get() == this);
}
// protected
void
-MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/)
+MockPersistableQueue::destroyComplete(const QueueContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl;
+//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush;
+ assert(qc->m_q.get() == this);
}
// protected
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
index a77f41743c..c3b461477c 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
@@ -47,30 +47,31 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class MockPersistableQueue;
class QueuedMessage;
class TestOptions;
-typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+typedef boost::shared_ptr<MockPersistableQueue> MockPersistableQueuePtr;
typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr;
-class MockPersistableQueue : public qpid::broker::PersistableQueue, qpid::broker::DataSource
+class MockPersistableQueue : public qpid::broker::PersistableQueue, public qpid::broker::DataSource
{
public:
class QueueContext : public qpid::broker::BrokerContext
{
public:
- QueueContext(MockPersistableQueue* q,
+ QueueContext(MockPersistableQueuePtr q,
const qpid::asyncStore::AsyncOperation::opCode op);
virtual ~QueueContext();
const char* getOp() const;
void destroy();
- MockPersistableQueue* m_q;
+ MockPersistableQueuePtr m_q;
const qpid::asyncStore::AsyncOperation::opCode m_op;
};
MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& args,
- AsyncStoreImplPtr store,
+ qpid::asyncStore::AsyncStoreImpl* store,
const TestOptions& perfTestParams,
const char* msgData);
virtual ~MockPersistableQueue();
@@ -79,6 +80,8 @@ public:
static void handleAsyncResult(const qpid::broker::AsyncResult* res,
qpid::broker::BrokerContext* bc);
qpid::broker::QueueHandle& getHandle();
+ static void asyncStoreCreate(MockPersistableQueuePtr& qp);
+ static void asyncStoreDestroy(MockPersistableQueuePtr& qp);
// --- Performance test thread entry points ---
void* runEnqueues();
@@ -103,7 +106,7 @@ public:
protected:
const std::string m_name;
- AsyncStoreImplPtr m_store;
+ qpid::asyncStore::AsyncStoreImpl* m_store;
mutable uint64_t m_persistenceId;
std::string m_persistableData;
qpid::broker::QueueHandle m_queueHandle;
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp
index e564876daa..10be34c6f5 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp
@@ -57,7 +57,7 @@ MockTransactionContext::TransactionContext::destroy()
// --- Class MockTransactionContext ---
-MockTransactionContext::MockTransactionContext(AsyncStoreImplPtr store,
+MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
const std::string& xid) :
m_store(store),
m_txnHandle(store->createTxnHandle(xid)),
@@ -190,30 +190,33 @@ MockTransactionContext::localPrepare()
// protected
void
-MockTransactionContext::prepareComplete(const TransactionContext* /*tc*/)
+MockTransactionContext::prepareComplete(const TransactionContext* tc)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
while (!m_enqueuedMsgs.empty()) {
m_enqueuedMsgs.front()->clearTransaction();
m_enqueuedMsgs.pop_front();
}
-//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": prepareComplete()" << std::endl;
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush;
+ assert(tc->m_tc == this);
}
// protected
void
-MockTransactionContext::abortComplete(const TransactionContext* /*tc*/)
+MockTransactionContext::abortComplete(const TransactionContext* tc)
{
-//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": abortComplete()" << std::endl;
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush;
+ assert(tc->m_tc == this);
}
// protected
void
-MockTransactionContext::commitComplete(const TransactionContext* /*tc*/)
+MockTransactionContext::commitComplete(const TransactionContext* tc)
{
-//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": commitComplete()" << std::endl;
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush;
+ assert(tc->m_tc == this);
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h
index 35de7374c5..3c467a7b5d 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h
@@ -45,8 +45,6 @@ namespace asyncPerf {
class QueuedMessage;
-typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
-
class MockTransactionContext : public qpid::broker::TransactionContext
{
public:
@@ -65,7 +63,7 @@ public:
const qpid::asyncStore::AsyncOperation::opCode m_op;
};
- MockTransactionContext(AsyncStoreImplPtr store,
+ MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
const std::string& xid = std::string());
virtual ~MockTransactionContext();
static void handleAsyncResult(const qpid::broker::AsyncResult* res,
@@ -81,7 +79,7 @@ public:
void commit();
protected:
- AsyncStoreImplPtr m_store;
+ qpid::asyncStore::AsyncStoreImpl* m_store;
qpid::broker::TxnHandle m_txnHandle;
bool m_prepared;
std::deque<QueuedMessage*> m_enqueuedMsgs;
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp
index fe66604774..983f088616 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp
@@ -44,7 +44,8 @@ PerfTest::PerfTest(const TestOptions& to,
m_testResult(to),
m_msgData(new char[to.m_msgSize]),
m_poller(new qpid::sys::Poller),
- m_pollingThread(m_poller.get())
+ m_pollingThread(m_poller.get()),
+ m_store(0)
{
std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize);
}
@@ -53,33 +54,38 @@ PerfTest::~PerfTest()
{
m_poller->shutdown();
m_pollingThread.join();
+
+ m_queueList.clear();
+
+ if (m_store) delete m_store;
delete[] m_msgData;
}
-AsyncStoreImplPtr
+void
PerfTest::prepareStore()
{
- AsyncStoreImplPtr store(new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts));
- store->initialize();
- return store;
+ m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
+ m_store->initialize();
}
void
-PerfTest::prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList,
- AsyncStoreImplPtr store)
+PerfTest::prepareQueues()
{
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, store, m_testOpts, m_msgData));
- jrnlList.push_back(mpq);
+ MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData));
+ mpq->asyncStoreCreate(mpq);
+ m_queueList.push_back(mpq);
}
}
void
-PerfTest::destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList)
+PerfTest::destroyQueues()
{
- jrnlList.clear();
+ for (std::deque<MockPersistableQueuePtr>::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) {
+ (*i)->asyncStoreDestroy(*i);
+ }
}
void
@@ -87,10 +93,8 @@ PerfTest::run()
{
typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads
- AsyncStoreImplPtr store = prepareStore();
-
- std::deque<MockPersistableQueuePtr> queueList;
- prepareQueues(queueList, store);
+ prepareStore();
+ prepareQueues();
std::deque<ThreadPtr> threads;
{ // --- Start of timed section ---
@@ -98,13 +102,13 @@ PerfTest::run()
for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) {
for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads
- ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startEnqueues,
- reinterpret_cast<void*>(queueList[q].get())));
+ ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues,
+ reinterpret_cast<void*>(m_queueList[q].get())));
threads.push_back(tp);
}
for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads
- ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startDequeues,
- reinterpret_cast<void*>(queueList[q].get())));
+ ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues,
+ reinterpret_cast<void*>(m_queueList[q].get())));
threads.push_back(tp);
}
}
@@ -113,10 +117,8 @@ PerfTest::run()
threads.pop_front();
}
} // --- End of timed section ---
-
- destroyQueues(queueList);
-// DEBUG MEASURE - REMOVE WHEN FIXED
-//::sleep(2);
+ // TODO: Add test param to allow queues to be destroyed or left when test ends
+ destroyQueues();
}
void
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h
index 1eb11a51fa..544cd6b3ae 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h
@@ -48,7 +48,7 @@ namespace asyncPerf {
class MockPersistableQueue;
class TestOptions;
-typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+//typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
typedef boost::shared_ptr<MockPersistableQueue> MockPersistableQueuePtr;
class PerfTest : public tests::storePerftools::common::Streamable
@@ -68,11 +68,12 @@ protected:
const char* m_msgData;
boost::shared_ptr<qpid::sys::Poller> m_poller;
qpid::sys::Thread m_pollingThread;
+ qpid::asyncStore::AsyncStoreImpl* m_store;
+ std::deque<MockPersistableQueuePtr> m_queueList;
- AsyncStoreImplPtr prepareStore();
- void prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList,
- AsyncStoreImplPtr store);
- void destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList);
+ void prepareStore();
+ void prepareQueues();
+ void destroyQueues();
};