diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-11-04 22:15:14 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-11-04 22:15:14 +0000 |
| commit | 1be9d0e97c9c6f59d0b69096b5cd73bf651200a9 (patch) | |
| tree | 89d21109e0c4da085d45585b225f83d1018bad13 /cpp/src/qpid/linearstore/MessageStoreImpl.cpp | |
| parent | a115ed83708fdd5c24f7553a8154373b65eddd42 (diff) | |
| download | qpid-python-1be9d0e97c9c6f59d0b69096b5cd73bf651200a9.tar.gz | |
QPID-4984: WIP. Basic enqueue/dequeue/txns work, still no EFP recycling.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1538790 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 82 |
1 files changed, 26 insertions, 56 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index cbc395d61a..100c8925de 100644 --- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -291,7 +291,10 @@ void MessageStoreImpl::init() } } while (!isInit); - efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog)); + efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), + defaultEfpPartitionNumber, + defaultEfpFileSize_kib, + jrnlLog)); efpMgr->findEfpPartitions(); } @@ -331,6 +334,9 @@ void MessageStoreImpl::truncateInit() dbenv->close(0); isInit = false; } + + // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s). + qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir()); qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir()); qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir()); @@ -730,6 +736,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting // from recovery of a store that has had its size changed externally by the resize utility. // If so, update the queue store settings so that QMF queries will reflect the new values. + // TODO: Update this for new settings, as qpid.file_count and qpid.file_size no longer apply /* const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; qpid::framing::FieldTable::ValuePtr value; @@ -866,8 +873,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, size_t preambleLength = sizeof(uint32_t)/*header size*/; JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); - DataTokenImpl dtok; - size_t readSize = 0; unsigned msg_count = 0; // TODO: This optimization to skip reading if there are no enqueued messages to read @@ -876,19 +881,20 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, //bool read = jc->get_enq_cnt() > 0; bool read = true; - void* dbuff = NULL; size_t dbuffSize = 0; - void* xidbuff = NULL; size_t xidbuffSize = 0; + void* dbuff = NULL; + size_t dbuffSize = 0; + void* xidbuff = NULL; + size_t xidbuffSize = 0; bool transientFlag = false; bool externalFlag = false; - - dtok.set_wstate(DataTokenImpl::ENQ); + DataTokenImpl dtok; + dtok.set_wstate(DataTokenImpl::NONE); // Read the message from the Journal. try { unsigned aio_sleep_cnt = 0; while (read) { qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); - readSize = dtok.dsize(); switch (res) { @@ -909,11 +915,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, // At some future point if delivery attempts are stored, then this call would // become optional depending on that information. msg->setRedelivered(); - // Reset the TTL for the recovered message - msg->computeExpiration(broker->getExpiryPolicy()); + // Reset the TTL for the recovered message + msg->computeExpiration(broker->getExpiryPolicy()); uint32_t contentOffset = headerSize + preambleLength; - uint64_t contentSize = readSize - contentOffset; + uint64_t contentSize = dbuffSize - contentOffset; if (msg->loadContent(contentSize) && !externalFlag) { //now read the content qpid::framing::Buffer contentBuff(data + contentOffset, contentSize); @@ -947,8 +953,8 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, bool enq = false; bool deq = false; for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->_enq_flag && j->_rid == rid) enq = true; - else if (!j->_enq_flag && j->_drid == rid) deq = true; + if (j->enq_flag_ && j->rid_ == rid) enq = true; + else if (!j->enq_flag_ && j->drid_ == rid) deq = true; } if (enq && !deq && citr->second.commit_flag) { rcnt++; @@ -962,7 +968,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } dtok.reset(); - dtok.set_wstate(DataTokenImpl::ENQ); + dtok.set_wstate(DataTokenImpl::NONE); if (xidbuff) ::free(xidbuff); @@ -1065,11 +1071,11 @@ void MessageStoreImpl::readTplStore() bool commitFlag = true; for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->_enq_flag) { - rid = j->_rid; + if (j->enq_flag_) { + rid = j->rid_; enqCnt++; } else { - commitFlag = j->_commit_flag; + commitFlag = j->commit_flag_; deqCnt++; } } @@ -1104,10 +1110,9 @@ void MessageStoreImpl::readTplStore() void MessageStoreImpl::recoverTplStore() { QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()"); -/* - if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) { + if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) { uint64_t thisHighestRid = 0ULL; - tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); + tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit @@ -1118,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore() tplStorePtr->recover_complete(); // start journal. } -*/ } void MessageStoreImpl::recoverLockedMappings(txn_list& txns) @@ -1137,12 +1141,10 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) } } -void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/) +void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) { QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()"); -/* if (tplStorePtr->is_ready()) { - tplStorePtr->read_reset(); readTplStore(); } else { recoverTplStore(); @@ -1152,7 +1154,6 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/) if (!i->second.deq_flag && i->second.tpc_flag) xids.insert(i->first); } -*/ } void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/) @@ -1178,31 +1179,6 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue uint32_t /*length*/) { throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); -/* - checkInit(); - uint64_t messageId (msg->getPersistenceId()); - - if (messageId != 0) { - try { - JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); - if (jc && jc->is_enqueued(messageId) ) { - if (!jc->loadMsgContent(messageId, data, length, offset)) { - std::ostringstream oss; - oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern"; - THROW_STORE_EXCEPTION(oss.str()); - } - } else { - std::ostringstream oss; - oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued"; - THROW_STORE_EXCEPTION(oss.str()); - } - } catch (const qpid::qls_jrnl::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what()); - } - } else { - THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); - } -*/ } void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) @@ -1358,12 +1334,6 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, } } -uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/) -{ -/* checkInit();*/ - return 0; -} - void MessageStoreImpl::completed(TxnCtxt& txn_, bool commit_) { |
