/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /** * \file MockPersistableQueue.cpp */ #include "MockPersistableQueue.h" #include "MockPersistableMessage.h" #include "MockTransactionContext.h" #include "QueuedMessage.h" #include "TestOptions.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/EnqueueHandle.h" namespace tests { namespace storePerftools { namespace asyncPerf { // --- Inner class MockPersistableQueue::QueueContext --- MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q, const qpid::asyncStore::AsyncOperation::opCode op) : m_q(q), m_op(op) {} MockPersistableQueue::QueueContext::~QueueContext() {} const char* MockPersistableQueue::QueueContext::getOp() const { return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } void MockPersistableQueue::QueueContext::destroy() { delete this; } // --- Class MockPersistableQueue --- MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, AsyncStoreImplPtr store, const TestOptions& to, const char* msgData) : m_name(name), m_store(store), m_persistenceId(0ULL), m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. m_perfTestOpts(to), m_msgData(msgData) { const qpid::types::Variant::Map qo; m_queueHandle = m_store->createQueueHandle(m_name, qo); qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE); m_store->submitCreate(m_queueHandle, dynamic_cast(this), &handleAsyncResult, bc); } MockPersistableQueue::~MockPersistableQueue() { // m_store->flush(*this); // TODO: Make destroying the store a test parameter // m_store->destroy(*this); // m_store = 0; } // static void MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerContext* bc) { if (bc && res) { QueueContext* qc = dynamic_cast(bc); if (qc->m_q) { if (res->errNo) { // TODO: Handle async failure here std::cerr << "Queue name=\"" << qc->m_q->m_name << "\": Operation " << qc->getOp() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; } else { // Handle async success here switch(qc->m_op) { case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: qc->m_q->createComplete(qc); break; case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: qc->m_q->flushComplete(qc); break; case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: qc->m_q->destroyComplete(qc); break; default: std::ostringstream oss; oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->m_op; throw qpid::Exception(oss.str()); }; } } } if (bc) delete bc; if (res) delete res; } qpid::broker::QueueHandle& MockPersistableQueue::getHandle() { return m_queueHandle; } void* MockPersistableQueue::runEnqueues() { uint32_t numMsgs = 0; uint16_t txnCnt = 0; const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0; MockTransactionContextPtr txn; while (numMsgs < m_perfTestOpts.m_numMsgs) { if (useTxn && txnCnt == 0) { txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() } MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); msg->setPersistenceId(m_store->getNextRid()); qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this); if (useTxn) { m_store->submitEnqueue(enqHandle, txn->getHandle(), &MockPersistableMessage::handleAsyncResult, dynamic_cast(msgCtxt)); } else { m_store->submitEnqueue(enqHandle, &MockPersistableMessage::handleAsyncResult, dynamic_cast(msgCtxt)); } QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); push(qm); //std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl; if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { txn->commit(); txnCnt = 0; } ++numMsgs; } if (txnCnt > 0) { txn->commit(); txnCnt = 0; } return 0; } void* MockPersistableQueue::runDequeues() { uint32_t numMsgs = 0; const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue; uint16_t txnCnt = 0; const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0; MockTransactionContextPtr txn; QueuedMessagePtr qm; while (numMsgs < numMsgsToDequeue) { if (useTxn && txnCnt == 0) { txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() } pop(qm); if (qm.get()) { qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle(); qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(), qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, this); if (useTxn) { m_store->submitDequeue(enqHandle, txn->getHandle(), &MockPersistableMessage::handleAsyncResult, bc); } else { m_store->submitDequeue(enqHandle, &MockPersistableMessage::handleAsyncResult, bc); } ++numMsgs; qm.reset(static_cast(0)); if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) { txn->commit(); txnCnt = 0; } } else { // ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning } } if (txnCnt > 0) { txn->commit(); txnCnt = 0; } return 0; } //static void* MockPersistableQueue::startEnqueues(void* ptr) { return reinterpret_cast(ptr)->runEnqueues(); } //static void* MockPersistableQueue::startDequeues(void* ptr) { return reinterpret_cast(ptr)->runDequeues(); } void MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); } uint32_t MockPersistableQueue::encodedSize() const { return m_name.size() + 1; } uint64_t MockPersistableQueue::getPersistenceId() const { return m_persistenceId; } void MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const { m_persistenceId = persistenceId; } void MockPersistableQueue::flush() { //if(m_store) m_store->flush(*this); } const std::string& MockPersistableQueue::getName() const { return m_name; } void MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { if (externalQueueStore != inst && externalQueueStore) delete externalQueueStore; externalQueueStore = inst; } uint64_t MockPersistableQueue::getSize() { return m_persistableData.size(); } void MockPersistableQueue::write(char* target) { ::memcpy(target, m_persistableData.data(), m_persistableData.size()); } // protected void MockPersistableQueue::createComplete(const QueueContext* /*qc*/) { //std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl; } // protected void MockPersistableQueue::flushComplete(const QueueContext* /*qc*/) { //std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl; } // protected void MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/) { //std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl; } // protected void MockPersistableQueue::push(QueuedMessagePtr& qm) { qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); m_enqueuedMsgs.push_back(qm); m_dequeueCondition.notify(); } // protected void MockPersistableQueue::pop(QueuedMessagePtr& qm) { qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); if (m_enqueuedMsgs.empty()) { m_dequeueCondition.wait(m_enqueuedMsgsMutex); } qm = m_enqueuedMsgs.front(); if (qm->isTransactional()) { // The next msg is still in an open transaction, skip and find next non-open-txn msg MsgEnqListItr i = m_enqueuedMsgs.begin(); while (++i != m_enqueuedMsgs.end()) { if (!(*i)->isTransactional()) { qm = *i; m_enqueuedMsgs.erase(i); } } } else { // The next msg is not in an open txn m_enqueuedMsgs.pop_front(); } } }}} // namespace tests::storePerftools::asyncPerf