diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-09 14:19:28 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-09 14:19:28 +0000 |
commit | 2e3690efe94e6161d129ebce2f8c22cb25819ec1 (patch) | |
tree | 8fc2b2aa701127d7ee827319b809aeaf543f2a59 /cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp | |
parent | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (diff) | |
download | qpid-python-2e3690efe94e6161d129ebce2f8c22cb25819ec1.tar.gz |
QPID-3858: Initial checkin of async interface implementation and test harness.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1336220 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp')
-rw-r--r-- | cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp new file mode 100644 index 0000000000..6efdc06fc8 --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Journal.cpp + */ + +#include "Journal.h" + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/DataToken.h" + +# define X_JRNL_FN_DEQUEUE(dtok) dequeue(dtok, 0, 0) +# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue(dtok, msgData, msgSize, 0, 0, false) +# define X_JRNL_FN_FLUSH(jrnlPtr) { jrnlPtr->flush(); jrnlPtr->sync(); } +# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok +# define X_JRNL_FN_GETEVENTS(timeout) processCompletedAioWriteEvents(timeout) +# define X_JRNL_FN_GETIOSTR(iores) qpid::asyncStore::jrnl2::g_ioResAsString(iores) +# define X_JRNL_IO_OP_RES qpid::asyncStore::jrnl2::jrnlOpRes +# define X_JRNL_IO_OP_RES_BUSY qpid::asyncStore::jrnl2::RHM_IORES_BUSY +# define X_JRNL_IO_OP_RES_ENQCAPTHRESH qpid::asyncStore::jrnl2::RHM_IORES_ENQCAPTHRESH +# define X_JRNL_IO_OP_RES_SUCCESS 0 +# define X_SCOPED_LOCK qpid::asyncStore::jrnl2::ScopedLock +#else +# include "jrnl/jcntl.hpp" +# include "jrnl/data_tok.hpp" + +# define X_JRNL_FN_DEQUEUE(dtok) dequeue_data_record(dtok) +# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue_data_record(msgData, msgSize, msgSize, dtok); +# define X_JRNL_FN_FLUSH(jrnlPtr) jrnlPtr->flush(true) +# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok->status_str() +# define X_JRNL_FN_GETEVENTS(timeout) get_wr_events(timeout) +# define X_JRNL_FN_GETIOSTR(iores) mrg::journal::iores_str(iores) +# define X_JRNL_IO_OP_RES mrg::journal::iores +# define X_JRNL_IO_OP_RES_BUSY mrg::journal::RHM_IORES_BUSY +# define X_JRNL_IO_OP_RES_ENQCAPTHRESH mrg::journal::RHM_IORES_ENQCAPTHRESH +# define X_JRNL_IO_OP_RES_SUCCESS mrg::journal::RHM_IORES_SUCCESS +# define X_SCOPED_LOCK mrg::journal::slock +#endif + +#include <iostream> + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +Journal::Journal(const uint32_t numMsgs, + const uint32_t msgSize, + const char* msgData, + X_ASYNC_JOURNAL* const jrnlPtr) : + m_numMsgs(numMsgs), + m_msgSize(msgSize), + m_msgData(msgData), + m_jrnlPtr(jrnlPtr) +{} + +Journal::~Journal() +{ + delete m_jrnlPtr; +} + + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +// Enqueue thread entry point +void* +Journal::runEnqueues() +{ + bool misfireFlag = false; + uint32_t i = 0; + while (i < m_numMsgs) { + X_DATA_TOKEN* mtokPtr = new X_DATA_TOKEN(); + X_JRNL_IO_OP_RES jrnlIoRes = m_jrnlPtr->X_JRNL_FN_ENQUEUE(mtokPtr, m_msgData, m_msgSize); + switch (jrnlIoRes) { + case X_JRNL_IO_OP_RES_SUCCESS: + i++; + misfireFlag = false; + break; + case X_JRNL_IO_OP_RES_BUSY: + if (!misfireFlag) { + std::cout << "-" << std::flush; + } + delete mtokPtr; + misfireFlag = true; + break; + case X_JRNL_IO_OP_RES_ENQCAPTHRESH: + if (!misfireFlag) { + std::cout << "*" << std::flush; + } + delete mtokPtr; + misfireFlag = true; + ::usleep(10); + break; + default: + delete mtokPtr; + std::cerr << "enqueue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << std::endl; + } + } + /// \todo handle these results + X_JRNL_FN_FLUSH(m_jrnlPtr); + return NULL; +} + + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +// Dequeue thread entry point +void* +Journal::runDequeues() +{ + uint32_t i = 0; + X_JRNL_IO_OP_RES jrnlIoRes; + while (i < m_numMsgs) { + X_DATA_TOKEN* mtokPtr = 0; + while (!mtokPtr) { + bool procAioEventsFlag; + { // --- START OF CRITICAL SECTION --- + X_SCOPED_LOCK l(m_unprocCallbacksMutex); + procAioEventsFlag = m_unprocCallbacks.empty(); + if (!procAioEventsFlag) { + mtokPtr = m_unprocCallbacks.back(); + m_unprocCallbacks.pop_back(); + } + } // --- END OF CRITICAL SECTION --- + if (procAioEventsFlag) { + m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); + ::usleep(1); + } + } + bool done = false; + while (!done) { + jrnlIoRes = m_jrnlPtr->X_JRNL_FN_DEQUEUE(mtokPtr); + switch (jrnlIoRes) { + case X_JRNL_IO_OP_RES_SUCCESS: + i ++; + done = true; + break; + case X_JRNL_IO_OP_RES_BUSY: + //::usleep(10); + break; + default: + std::cerr << "dequeue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << ": " + << X_JRNL_FN_GETDTOKSTATUS(mtokPtr) << std::endl; + delete mtokPtr; + done = true; + } + } + m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); + } + /// \todo handle these results + X_JRNL_FN_FLUSH(m_jrnlPtr); + return NULL; +} + +//static +void* +Journal::startEnqueues(void* ptr) +{ + return reinterpret_cast<Journal*>(ptr)->runEnqueues(); +} + +//static +void* +Journal:: startDequeues(void* ptr) +{ + return reinterpret_cast<Journal*>(ptr)->runDequeues(); +} + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +void +Journal::X_AIO_WR_CALLBACK(std::vector<X_DATA_TOKEN*>& msgTokenList) +{ + X_DATA_TOKEN* mtokPtr; + while (msgTokenList.size()) { + mtokPtr = msgTokenList.back(); + msgTokenList.pop_back(); +#ifdef JOURNAL2 + switch (mtokPtr->getDataOpState().get()) { + case qpid::asyncStore::jrnl2::OP_ENQUEUE: +#else + switch (mtokPtr->wstate()) { + case X_DATA_TOKEN::ENQ: +#endif + { // --- START OF CRITICAL SECTION --- + X_SCOPED_LOCK l(m_unprocCallbacksMutex); + m_unprocCallbacks.push_back(mtokPtr); + } // --- END OF CRITICAL SECTION --- + break; + default: + delete mtokPtr; + } + } +} + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +void +Journal::X_AIO_RD_CALLBACK(std::vector<uint16_t>& /*buffPageCtrlBlkIndexList*/) +{} + +}}} // namespace tests::storePerftools::jrnlPerf |