diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-11-04 22:15:14 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-11-04 22:15:14 +0000 |
commit | f9808a83e93e1972c615c13edc344b675986420a (patch) | |
tree | fa00b1e26e8895aef2b7e4491f0ab9de074d0c81 | |
parent | 5c1efa08b20d4bca2e8e5b81c7f1e78cdcc87c8a (diff) | |
download | qpid-python-f9808a83e93e1972c615c13edc344b675986420a.tar.gz |
QPID-4984: WIP. Basic enqueue/dequeue/txns work, still no EFP recycling.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1538790 13f79535-47bb-0310-9956-ffa450edef68
28 files changed, 653 insertions, 924 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES new file mode 100644 index 0000000000..0d15744694 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -0,0 +1,36 @@ +LinearStore issues: + +Store: +------ + +1. Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record start, + no way of discriminating old from new at boundary (used to use OWI). + +2. Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve #1 first. + +3. Checksum not implemented in record tail, not checked during read. + +4. Rework qpid management parameters and controls. + +Tests +----- + +* No existing tests for linearstore: +** Basic broker-level tests for txn and non-txn recovery +** Store-level tests which check write boundary conditions +** Unit tests +** Basic performance tests + +Tools +----- + +* Store analysis and status +* Recovery/reading of message content + +Code tidy-up +------------ + +* Remove old comments +* Use c++ cast templates instead of (xxx)y +* Member names: xxx_ +* Add docs to classes diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index 412a4922ca..4f904665ee 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -63,14 +63,8 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_, timer(timer_), _journalLogRef(journalLogRef), getEventsTimerSetFlag(false), -// lastReadRid(0), writeActivityFlag(false), flushTriggeredFlag(true), -// _xidp(0), -// _datap(0), -// _dlen(0), -// _dtok(), -// _external(false), deleteCallback(onDelete) { getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); @@ -97,7 +91,6 @@ JournalImpl::~JournalImpl() } getEventsFireEventsPtr->cancel(); inactivityFireEventPtr->cancel(); -// free_read_buffers(); if (_mgmtObject.get() != 0) { _mgmtObject->resourceDestroy(); @@ -149,7 +142,7 @@ JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_, // oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; // oss << " wcache_num_pages=" << wcache_num_pages; // QLS_LOG2(debug, _jid, oss.str()); - jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp); + jcntl::initialize(efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp); // QLS_LOG2(debug, _jid, "Initialization complete"); // TODO: replace for linearstore: _lpmgr /* @@ -183,7 +176,6 @@ JournalImpl::recover(/*const uint16_t num_jfiles, uint64_t queue_id) { std::ostringstream oss1; -// oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; oss1 << "Recover;"; oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec; oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; @@ -210,11 +202,9 @@ JournalImpl::recover(/*const uint16_t num_jfiles, prep_xid_list.push_back(i->xid); } - jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks, - cbp, &prep_xid_list, highest_rid); + jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid); } else { - jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks, - cbp, 0, highest_rid); + jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid); } // Populate PreparedTransaction lists from _tmap @@ -223,10 +213,10 @@ JournalImpl::recover(/*const uint16_t num_jfiles, for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { - if (tdl_itr->_enq_flag) { // enqueue op - i->enqueues->add(queue_id, tdl_itr->_rid); + if (tdl_itr->enq_flag_) { // enqueue op + i->enqueues->add(queue_id, tdl_itr->rid_); } else { // dequeue op - i->dequeues->add(queue_id, tdl_itr->_drid); + i->dequeues->add(queue_id, tdl_itr->drid_); } } } @@ -260,102 +250,6 @@ JournalImpl::recover_complete() */ } -//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec -//#define AIO_SLEEP_TIME_US 10 // 0.01 ms -// Return true if content is recovered from store; false if content is external and must be recovered from an external store. -// Throw exception for all errors. -/* -bool -JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset) -{ - qpid::sys::Mutex::ScopedLock sl(_read_lock); - if (_dtok.rid() != rid) - { - // Free any previous msg - free_read_buffers(); - - // Last read encountered out-of-order rids, check if this rid is in that list - bool oooFlag = false; - for (std::vector<uint64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) { - if (*i == rid) { - oooFlag = true; - } - } - - // TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering - // jumpover points and allow the read to jump back to the first known jumpover point - but this needs - // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a - // combination of lid/offset. - // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing. - if (oooFlag || rid < lastReadRid) { - _rmgr.invalidate(); - oooRidList.clear(); - } - _dlen = 0; - _dtok.reset(); - _dtok.set_wstate(DataTokenImpl::ENQ); - _dtok.set_rid(0); - _external = false; - size_t xlen = 0; - bool transient = false; - bool done = false; - bool rid_found = false; - while (!done) { - iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok); - switch (res) { - case qpid::qls_jrnl::RHM_IORES_SUCCESS: - if (_dtok.rid() != rid) { - // Check if this is an out-of-order rid that may impact next read - if (_dtok.rid() > rid) - oooRidList.push_back(_dtok.rid()); - free_read_buffers(); - // Reset data token for next read - _dlen = 0; - _dtok.reset(); - _dtok.set_wstate(DataTokenImpl::ENQ); - _dtok.set_rid(0); - } else { - rid_found = _dtok.rid() == rid; - lastReadRid = rid; - done = true; - } - break; - case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT: - if (get_wr_events(&_aio_cmpl_timeout) == qpid::qls_jrnl::jerrno::AIO_TIMEOUT) { - std::stringstream ss; - ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res); - ss << "; timed out waiting for page to be processed."; - throw jexception(qpid::qls_jrnl::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl", - "loadMsgContent"); - } - break; - default: - std::stringstream ss; - ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res); - throw jexception(qpid::qls_jrnl::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl", - "loadMsgContent"); - } - } - if (!rid_found) { - std::stringstream ss; - ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec; - ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec; - ss << " (" << _dtok.rid() << ")"; - throw jexception(qpid::qls_jrnl::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent"); - } - } - - if (_external) return false; - - uint32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(uint32_t)).getLong() + sizeof(uint32_t); - if (hdr_offs + offset + length > _dlen) { - data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset); - } else { - data.append((const char*)_datap + hdr_offs + offset, length); - } - return true; -} -*/ void JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len, @@ -498,29 +392,6 @@ JournalImpl::flush(const bool block_till_aio_cmpl) return res; } -/* -void -JournalImpl::log(qpid::qls_jrnl::log_level ll, const std::string& log_stmt) const -{ - log(ll, log_stmt.c_str()); -} - -void -JournalImpl::log(qpid::qls_jrnl::log_level ll, const char* const log_stmt) const -{ - switch (ll) - { - case LOG_TRACE: QPID_LOG(trace, "QLS Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_DEBUG: QPID_LOG(debug, "QLS Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_INFO: QPID_LOG(info, "QLS Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_NOTICE: QPID_LOG(notice, "QLS Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_WARN: QPID_LOG(warning, "QLS Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_ERROR: QPID_LOG(error, "QLS Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_CRITICAL: QPID_LOG(critical, "QLS Journal \"" << _jid << "\": " << log_stmt); break; - } -} -*/ - void JournalImpl::getEventsFire() { @@ -581,21 +452,6 @@ void JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/) {} -/* -void -JournalImpl::free_read_buffers() -{ - if (_xidp) { - ::free(_xidp); - _xidp = 0; - _datap = 0; - } else if (_datap) { - ::free(_datap); - _datap = 0; - } -} -*/ - void JournalImpl::createStore() { @@ -609,17 +465,6 @@ JournalImpl::handleIoResult(const iores r) { case qpid::qls_jrnl::RHM_IORES_SUCCESS: return; -/* - case qpid::qls_jrnl::RHM_IORES_FULL: - { - std::ostringstream oss; - oss << "Journal full on queue \"" << _jid << "\"."; - QLS_LOG2(critical, _jid, "Journal full."); - if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR); - THROW_STORE_FULL_EXCEPTION(oss.str()); - } -*/ default: { std::ostringstream oss; diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index fd2433803c..ba62b1d64a 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -140,11 +140,6 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void recover_complete(); - // Temporary fn to read and save last msg read from journal so it can be assigned - // in chunks. To be replaced when coding to do this direct from the journal is ready. - // Returns true if the record is extern, false if local. -// bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0); - // Overrides for write inactivity timer void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp, @@ -191,7 +186,6 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void resetDeleteCallback() { deleteCallback = DeleteCallback(); } protected: -// void free_read_buffers(); void createStore(); inline void setGetEventTimer() @@ -227,15 +221,12 @@ class TplJournalImpl : public JournalImpl virtual ~TplJournalImpl() {} -/* // Special version of read_data_record that ignores transactions - needed when reading the TPL inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize, bool& transient, bool& external, qpid::qls_jrnl::data_tok* const dtokp) { return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true); } - inline void read_reset() { _rmgr.invalidate(); } -*/ }; // class TplJournalImpl } // namespace msgstore diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h index a43af23ac6..43c2d7dc9c 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h @@ -26,7 +26,7 @@ #include "qpid/log/Statement.h" #define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg) -#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg) +#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \"" << queue << "\":" << msg) namespace qpid { namespace linearstore { diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index cbc395d61a..100c8925de 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -291,7 +291,10 @@ void MessageStoreImpl::init() } } while (!isInit); - efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog)); + efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), + defaultEfpPartitionNumber, + defaultEfpFileSize_kib, + jrnlLog)); efpMgr->findEfpPartitions(); } @@ -331,6 +334,9 @@ void MessageStoreImpl::truncateInit() dbenv->close(0); isInit = false; } + + // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s). + qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir()); qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir()); qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir()); @@ -730,6 +736,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting // from recovery of a store that has had its size changed externally by the resize utility. // If so, update the queue store settings so that QMF queries will reflect the new values. + // TODO: Update this for new settings, as qpid.file_count and qpid.file_size no longer apply /* const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; qpid::framing::FieldTable::ValuePtr value; @@ -866,8 +873,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, size_t preambleLength = sizeof(uint32_t)/*header size*/; JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); - DataTokenImpl dtok; - size_t readSize = 0; unsigned msg_count = 0; // TODO: This optimization to skip reading if there are no enqueued messages to read @@ -876,19 +881,20 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, //bool read = jc->get_enq_cnt() > 0; bool read = true; - void* dbuff = NULL; size_t dbuffSize = 0; - void* xidbuff = NULL; size_t xidbuffSize = 0; + void* dbuff = NULL; + size_t dbuffSize = 0; + void* xidbuff = NULL; + size_t xidbuffSize = 0; bool transientFlag = false; bool externalFlag = false; - - dtok.set_wstate(DataTokenImpl::ENQ); + DataTokenImpl dtok; + dtok.set_wstate(DataTokenImpl::NONE); // Read the message from the Journal. try { unsigned aio_sleep_cnt = 0; while (read) { qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); - readSize = dtok.dsize(); switch (res) { @@ -909,11 +915,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, // At some future point if delivery attempts are stored, then this call would // become optional depending on that information. msg->setRedelivered(); - // Reset the TTL for the recovered message - msg->computeExpiration(broker->getExpiryPolicy()); + // Reset the TTL for the recovered message + msg->computeExpiration(broker->getExpiryPolicy()); uint32_t contentOffset = headerSize + preambleLength; - uint64_t contentSize = readSize - contentOffset; + uint64_t contentSize = dbuffSize - contentOffset; if (msg->loadContent(contentSize) && !externalFlag) { //now read the content qpid::framing::Buffer contentBuff(data + contentOffset, contentSize); @@ -947,8 +953,8 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, bool enq = false; bool deq = false; for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->_enq_flag && j->_rid == rid) enq = true; - else if (!j->_enq_flag && j->_drid == rid) deq = true; + if (j->enq_flag_ && j->rid_ == rid) enq = true; + else if (!j->enq_flag_ && j->drid_ == rid) deq = true; } if (enq && !deq && citr->second.commit_flag) { rcnt++; @@ -962,7 +968,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } dtok.reset(); - dtok.set_wstate(DataTokenImpl::ENQ); + dtok.set_wstate(DataTokenImpl::NONE); if (xidbuff) ::free(xidbuff); @@ -1065,11 +1071,11 @@ void MessageStoreImpl::readTplStore() bool commitFlag = true; for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->_enq_flag) { - rid = j->_rid; + if (j->enq_flag_) { + rid = j->rid_; enqCnt++; } else { - commitFlag = j->_commit_flag; + commitFlag = j->commit_flag_; deqCnt++; } } @@ -1104,10 +1110,9 @@ void MessageStoreImpl::readTplStore() void MessageStoreImpl::recoverTplStore() { QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()"); -/* - if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) { + if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) { uint64_t thisHighestRid = 0ULL; - tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); + tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit @@ -1118,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore() tplStorePtr->recover_complete(); // start journal. } -*/ } void MessageStoreImpl::recoverLockedMappings(txn_list& txns) @@ -1137,12 +1141,10 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) } } -void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/) +void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) { QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()"); -/* if (tplStorePtr->is_ready()) { - tplStorePtr->read_reset(); readTplStore(); } else { recoverTplStore(); @@ -1152,7 +1154,6 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/) if (!i->second.deq_flag && i->second.tpc_flag) xids.insert(i->first); } -*/ } void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/) @@ -1178,31 +1179,6 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue uint32_t /*length*/) { throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); -/* - checkInit(); - uint64_t messageId (msg->getPersistenceId()); - - if (messageId != 0) { - try { - JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); - if (jc && jc->is_enqueued(messageId) ) { - if (!jc->loadMsgContent(messageId, data, length, offset)) { - std::ostringstream oss; - oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern"; - THROW_STORE_EXCEPTION(oss.str()); - } - } else { - std::ostringstream oss; - oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued"; - THROW_STORE_EXCEPTION(oss.str()); - } - } catch (const qpid::qls_jrnl::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what()); - } - } else { - THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); - } -*/ } void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) @@ -1358,12 +1334,6 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, } } -uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/) -{ -/* checkInit();*/ - return 0; -} - void MessageStoreImpl::completed(TxnCtxt& txn_, bool commit_) { diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index d1ce7a2b8a..a136f0ba80 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -331,7 +331,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem void flush(const qpid::broker::PersistableQueue& queue); - uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue); + inline uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) { return 0; }; // TODO: Deprecate this call void collectPreparedXids(std::set<std::string>& xids); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h index 2f9d406b7c..82d38e716e 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h @@ -23,6 +23,7 @@ #define QPID_LINEARSTORE_ATOMICCOUNTER_H_ #include "qpid/linearstore/jrnl/slock.h" +#include <string> namespace qpid { namespace qls_jrnl { @@ -31,11 +32,12 @@ template <class T> class AtomicCounter { private: + std::string id_; T count_; mutable smutex countMutex; public: - AtomicCounter(const T& initValue = T(0)) : count_(initValue) {} + AtomicCounter(const std::string& id, const T& initValue) : id_(id), count_(initValue) {} virtual ~AtomicCounter() {} @@ -44,6 +46,11 @@ public: return count_; } + void set(const T v) { + slock l(countMutex); + count_ = v; + } + T increment() { slock l(countMutex); return ++count_; @@ -57,7 +64,7 @@ public: T addLimit(const T& a, const T& limit, const uint32_t jerr) { slock l(countMutex); - if (count_ + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit"); + if (count_ + a > limit) throw jexception(jerr, id_, "AtomicCounter", "addLimit"); count_ += a; return count_; } @@ -70,7 +77,7 @@ public: T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) { slock l(countMutex); if (count_ < limit + 1) { - throw jexception(jerr, "AtomicCounter", "decrementLimit"); + throw jexception(jerr, id_, "AtomicCounter", "decrementLimit"); } return --count_; } @@ -83,7 +90,7 @@ public: T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) { slock l(countMutex); - if (count_ < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit"); + if (count_ < limit + s) throw jexception(jerr, id_, "AtomicCounter", "subtractLimit"); count_ -= s; return count_; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp index 613da45633..015c2cbafe 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp @@ -35,6 +35,8 @@ #include <uuid/uuid.h> #include <vector> +//#include <iostream> // DEBUG + namespace qpid { namespace qls_jrnl { @@ -42,7 +44,7 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, const EmptyFilePoolPartition* partitionPtr, JournalLog& journalLogRef) : efpDirectory_(efpDirectory), - efpDataSize_kib_(fileSizeKbFromDirName(efpDirectory, partitionPtr->getPartitionNumber())), + efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, partitionPtr->getPartitionNumber())), partitionPtr_(partitionPtr), journalLogRef_(journalLogRef) {} @@ -134,6 +136,39 @@ void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) { pushEmptyFile(emptyFileName); } +//static +std::string EmptyFilePool::dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib) { + std::ostringstream oss; + oss << efpDataSize_kib << "k"; + return oss.str(); +} + + +// static +efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirName, + const efpPartitionNumber_t partitionNumber) { + // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0. + std::string n(dirName.substr(dirName.rfind('/')+1)); + bool valid = true; + for (uint16_t charNum = 0; charNum < n.length(); ++charNum) { + if (charNum < n.length()-1) { + if (!::isdigit((int)n[charNum])) { + valid = false; + break; + } + } else { + valid = n[charNum] == 'k'; + } + } + efpDataSize_kib_t s = ::atol(n.c_str()); + if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) { + std::ostringstream oss; + oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'"; + throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName"); + } + return s; +} + // --- protected functions --- void EmptyFilePool::createEmptyFile() { @@ -148,7 +183,7 @@ void EmptyFilePool::createEmptyFile() { ofs.put('\0'); ofs.close(); pushEmptyFile(efpfn); -//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; +//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG } else { //std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG } @@ -196,7 +231,8 @@ void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) { std::streampos bytesRead = fs.tellg(); if (std::streamoff(bytesRead) == buffsize) { ::file_hdr_reset((::file_hdr_t*)buff); - ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0 + // set rest of buffer to 0 + ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); fs.seekp(0, std::fstream::beg); fs.write(buff, buffsize); std::streampos bytesWritten = fs.tellp(); @@ -224,7 +260,8 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { // Size matches pool efpDataSize_kib_t expectedSize = (QLS_SBLK_SIZE_KIB + efpDataSize_kib_) * 1024; if ((efpDataSize_kib_t)s.st_size != expectedSize) { - oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size; + oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize + << "; actual=" << s.st_size; journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); return false; } @@ -241,18 +278,19 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { fs.read((char*)buff, buffsize); std::streampos bytesRead = fs.tellg(); if (std::streamoff(bytesRead) != buffsize) { - oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read " << buffsize << " bytes; read " << bytesRead << " bytes"; + oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read " + << buffsize << " bytes; read " << bytesRead << " bytes"; journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); fs.close(); return false; } // Check file header - ::file_hdr_t* header(reinterpret_cast< ::file_hdr_t* >(buff)); - const bool jrnlMagicError = header->_rhdr._magic != QLS_FILE_MAGIC; - const bool jrnlVersionError = header->_rhdr._version != QLS_JRNL_VERSION; - const bool jrnlPartitionError = header->_efp_partition != partitionPtr_->getPartitionNumber(); - const bool jrnlFileSizeError = header->_file_size_kib != efpDataSize_kib_; + ::file_hdr_t* fhp = (::file_hdr_t*)buff; + const bool jrnlMagicError = fhp->_rhdr._magic != QLS_FILE_MAGIC; + const bool jrnlVersionError = fhp->_rhdr._version != QLS_JRNL_VERSION; + const bool jrnlPartitionError = fhp->_efp_partition != partitionPtr_->getPartitionNumber(); + const bool jrnlFileSizeError = fhp->_data_size_kib != efpDataSize_kib_; if (jrnlMagicError || jrnlVersionError || jrnlPartitionError || jrnlFileSizeError) { oss << "ERROR: File " << emptyFileName << ": Invalid file header - mismatched header fields: " << @@ -266,14 +304,15 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { } // Check file header is reset - if (!::is_file_hdr_reset(header)) { - ::file_hdr_reset(header); + if (!::is_file_hdr_reset(fhp)) { + ::file_hdr_reset(fhp); ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0 fs.seekp(0, std::fstream::beg); fs.write(buff, buffsize); std::streampos bytesWritten = fs.tellp(); if (std::streamoff(bytesWritten) != buffsize) { - oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes"; + oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write " + << buffsize << " bytes; wrote " << bytesWritten << " bytes"; journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); fs.close(); return false; @@ -288,31 +327,6 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { } // static -efpDataSize_kib_t EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName, - const efpPartitionNumber_t partitionNumber) { - // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0. - std::string n(dirName.substr(dirName.rfind('/')+1)); - bool valid = true; - for (uint16_t charNum = 0; charNum < n.length(); ++charNum) { - if (charNum < n.length()-1) { - if (!::isdigit((int)n[charNum])) { - valid = false; - break; - } - } else { - valid = n[charNum] == 'k'; - } - } - efpDataSize_kib_t s = ::atol(n.c_str()); - if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) { - std::ostringstream oss; - oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'"; - throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName"); - } - return s; -} - -// static int EmptyFilePool::moveEmptyFile(const std::string& from, const std::string& to) { if (::rename(from.c_str(), to.c_str())) { diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h index e4eeeaa042..cb6887a095 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h @@ -76,6 +76,10 @@ public: std::string takeEmptyFile(const std::string& destDirectory); void returnEmptyFile(const std::string& srcFile); + static std::string dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib); + static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& dirName, + const efpPartitionNumber_t partitionNumber); + protected: void createEmptyFile(); std::string getEfpFileName(); @@ -84,8 +88,6 @@ protected: void resetEmptyFileHeader(const std::string& fqFileName); bool validateEmptyFile(const std::string& emptyFileName) const; - static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName, - const efpPartitionNumber_t partitionNumber); static int moveEmptyFile(const std::string& fromFqPath, const std::string& toFqPath); }; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp index a9b3b12b81..3c2d7d69a2 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp @@ -24,15 +24,22 @@ #include <dirent.h> #include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" #include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/JournalLog.h" #include "qpid/linearstore/jrnl/slock.h" #include <vector> +//#include <iostream> // DEBUG + namespace qpid { namespace qls_jrnl { EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath, + const efpPartitionNumber_t defaultPartitionNumber, + const efpDataSize_kib_t defaultEfpDataSize_kib, JournalLog& journalLogRef) : qlsStorePath_(qlsStorePath), + defaultPartitionNumber_(defaultPartitionNumber), + defaultEfpDataSize_kib_(defaultEfpDataSize_kib), journalLogRef_(journalLogRef) {} @@ -45,53 +52,70 @@ EmptyFilePoolManager::~EmptyFilePoolManager() { } void EmptyFilePoolManager::findEfpPartitions() { - //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG +//std::cout << "*** Reading " << qlsStorePath_ << std::endl; // DEBUG + bool foundPartition = false; std::vector<std::string> dirList; - jdir::read_dir(qlsStorePath_, dirList, true, false, true, false); - for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { - if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN - efpPartitionNumber_t pn = ::atoi(i->c_str() + 1); - std::string fullDirPath(qlsStorePath_ + "/" + (*i)); - EmptyFilePoolPartition* efppp = 0; - try { - efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_); - { - slock l(partitionMapMutex_); - partitionMap_[pn] = efppp; - } - } catch (const std::exception& e) { - if (efppp != 0) { - delete efppp; - efppp = 0; + while (!foundPartition) { + jdir::read_dir(qlsStorePath_, dirList, true, false, true, false); + for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { + efpPartitionNumber_t pn = EmptyFilePoolPartition::getPartitionNumber(*i); + if (pn > 0) { // valid partition name found + std::string fullDirPath(qlsStorePath_ + "/" + (*i)); + EmptyFilePoolPartition* efppp = 0; + try { + efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_); + { + slock l(partitionMapMutex_); + partitionMap_[pn] = efppp; + } + } catch (const std::exception& e) { + if (efppp != 0) { + delete efppp; + efppp = 0; + } +//std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; // DEBUG } - //std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; + if (efppp != 0) + efppp->findEmptyFilePools(); + foundPartition = true; } - if (efppp != 0) - efppp->findEmptyFilePools(); + } + + // If no partition was found, create an empty default partition with a warning. + if (!foundPartition) { + journalLogRef_.log(JournalLog::LOG_WARN, "No EFP partition found, creating an empty partition."); + std::ostringstream oss; + oss << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_) + << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_); + jdir::create_dir(oss.str()); } } - // TODO: Log results -/* - QLS_LOG(info, "EFP Manager initialization complete"); + + journalLogRef_.log(JournalLog::LOG_NOTICE, "EFP Manager initialization complete"); std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList; std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList; getEfpPartitions(partitionList); if (partitionList.size() == 0) { - QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.") + journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible."); } else { - QLS_LOG(info, "> EFP Partitions found: " << partitionList.size()); + std::stringstream oss; + oss << "> EFP Partitions found: " << partitionList.size(); + journalLogRef_.log(JournalLog::LOG_INFO, oss.str()); for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) { filePoolList.clear(); (*i)->getEmptyFilePools(filePoolList); - QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" << - (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'"); + std::stringstream oss; + oss << " * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size() + << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'"; + journalLogRef_.log(JournalLog::LOG_INFO, oss.str()); for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) { - QLS_LOG(info, " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() << - " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB"); + std::ostringstream oss; + oss << " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() << + " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB"; + journalLogRef_.log(JournalLog::LOG_INFO, oss.str()); } } } -*/ } void EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList, @@ -155,7 +179,7 @@ void EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*> } EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) { - return getEmptyFilePool(efpIdentity.first, efpIdentity.second); + return getEmptyFilePool(efpIdentity.pn_, efpIdentity.ds_); } EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h index eab3783d90..d5fab82800 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h @@ -36,13 +36,17 @@ protected: typedef partitionMap_t::iterator partitionMapItr_t; typedef partitionMap_t::const_iterator partitionMapConstItr_t; - std::string qlsStorePath_; + const std::string qlsStorePath_; + const efpPartitionNumber_t defaultPartitionNumber_; + const efpDataSize_kib_t defaultEfpDataSize_kib_; JournalLog& journalLogRef_; partitionMap_t partitionMap_; smutex partitionMapMutex_; public: EmptyFilePoolManager(const std::string& qlsStorePath_, + const efpPartitionNumber_t defaultPartitionNumber, + const efpDataSize_kib_t defaultEfpDataSize_kib, JournalLog& journalLogRef_); virtual ~EmptyFilePoolManager(); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp index 4d8b697b1d..d38db60975 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp @@ -22,11 +22,14 @@ #include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" #include <dirent.h> +#include <iomanip> #include "qpid/linearstore/jrnl/jdir.h" #include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/jexception.h" #include "qpid/linearstore/jrnl/slock.h" +//#include <iostream> // DEBUG + namespace qpid { namespace qls_jrnl { @@ -53,7 +56,7 @@ EmptyFilePoolPartition::~EmptyFilePoolPartition() { void EmptyFilePoolPartition::findEmptyFilePools() { - //std::cout << "Reading " << partitionDir << std::endl; // DEBUG +//std::cout << "Reading " << partitionDir << std::endl; // DEBUG std::vector<std::string> dirList; jdir::read_dir(partitionDir_, dirList, true, false, false, false); bool foundEfpDir = false; @@ -65,7 +68,7 @@ EmptyFilePoolPartition::findEmptyFilePools() { } if (foundEfpDir) { std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_); - //std::cout << "Reading " << efpDir << std::endl; // DEBUG +//std::cout << "Reading " << efpDir << std::endl; // DEBUG dirList.clear(); jdir::read_dir(efpDir, dirList, true, false, false, true); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { @@ -117,6 +120,26 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber() const { return partitionNum_; } +// static +std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNumber_t partitionNumber) { + std::ostringstream oss; + oss << "p" << std::setfill('0') << std::setw(3) << partitionNumber; + return oss.str(); +} + +//static +efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) { + if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) { + long pn = ::strtol(name.c_str() + 1, 0, 0); + if (pn == 0 && errno) { + return 0; + } else { + return (efpPartitionNumber_t)pn; + } + } + return 0; +} + // --- protected functions --- void EmptyFilePoolPartition::validatePartitionDir() { diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h index 5b8ba7b4fa..a9ca36d07b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h @@ -68,6 +68,9 @@ public: std::string getPartitionDirectory() const; efpPartitionNumber_t getPartitionNumber() const; + static std::string getPartionDirectoryName(const efpPartitionNumber_t partitionNumber); + static efpPartitionNumber_t getPartitionNumber(const std::string& name); + protected: void validatePartitionDir(); }; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h index 4c470e4e40..2f9693fd95 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h @@ -22,19 +22,27 @@ #ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ #define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ +#include <iostream> #include <stdint.h> #include <utility> // std::pair namespace qpid { namespace qls_jrnl { - typedef uint64_t efpDataSize_kib_t; ///< Size of data part of file (excluding file header) in kib - typedef uint64_t efpFileSize_kib_t; ///< Size of file (header + data) in kib - typedef uint32_t efpDataSize_sblks_t; ///< Size of data part of file (excluding file header) in sblks - typedef uint32_t efpFileSize_sblks_t; ///< Size of file (header + data) in sblks - typedef uint32_t efpFileCount_t; ///< Number of files in a partition or pool - typedef uint16_t efpPartitionNumber_t; ///< Number assigned to a partition - typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t; ///< Unique identity of a pool, consisting of the partition number and data size +typedef uint64_t efpDataSize_kib_t; ///< Size of data part of file (excluding file header) in kib +typedef uint64_t efpFileSize_kib_t; ///< Size of file (header + data) in kib +typedef uint32_t efpDataSize_sblks_t; ///< Size of data part of file (excluding file header) in sblks +typedef uint32_t efpFileSize_sblks_t; ///< Size of file (header + data) in sblks +typedef uint32_t efpFileCount_t; ///< Number of files in a partition or pool +typedef uint16_t efpPartitionNumber_t; ///< Number assigned to a partition + +typedef struct efpIdentity_t { + efpPartitionNumber_t pn_; + efpDataSize_kib_t ds_; + efpIdentity_t() : pn_(0), ds_(0) {} + efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {} + friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; } +} efpIdentity_t; }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp index e02e965823..3de7b6c1e2 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp @@ -28,10 +28,28 @@ #include "qpid/linearstore/jrnl/utils/file_hdr.h" #include <unistd.h> +//#include <iostream> // DEBUG + namespace qpid { namespace qls_jrnl { JournalFile::JournalFile(const std::string& fqFileName, + const ::file_hdr_t& fileHeader) : + fqFileName_(fqFileName), + fileSeqNum_(fileHeader._file_number), + fileHandle_(-1), + fileCloseFlag_(false), + fileHeaderBasePtr_ (0), + fileHeaderPtr_(0), + aioControlBlockPtr_(0), + fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), + submittedDblkCount_("JournalFile::submittedDblkCount", 0), + completedDblkCount_("JournalFile::completedDblkCount", 0), + outstandingAioOpsCount_("JournalFile::outstandingAioOpsCount", 0) +{} + +JournalFile::JournalFile(const std::string& fqFileName, const uint64_t fileSeqNum, const efpDataSize_kib_t efpDataSize_kib) : fqFileName_(fqFileName), @@ -42,10 +60,10 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), - enqueuedRecordCount_(0), - submittedDblkCount_(0), - completedDblkCount_(0), - outstandingAioOpsCount_(0) + enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), + submittedDblkCount_("JournalFile::submittedDblkCount", 0), + completedDblkCount_("JournalFile::completedDblkCount", 0), + outstandingAioOpsCount_("JournalFile::outstandingAioOpsCount", 0) {} JournalFile::~JournalFile() { @@ -82,14 +100,6 @@ JournalFile::finalize() { } } -const std::string JournalFile::getDirectory() const { - return fqFileName_.substr(0, fqFileName_.rfind('/')); -} - -const std::string JournalFile::getFileName() const { - return fqFileName_.substr(fqFileName_.rfind('/')+1); -} - const std::string JournalFile::getFqFileName() const { return fqFileName_; } @@ -98,10 +108,6 @@ uint64_t JournalFile::getFileSeqNum() const { return fileSeqNum_; } -bool JournalFile::isOpen() const { - return fileHandle_ >= 0; -} - int JournalFile::open() { fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r-- if (fileHandle_ < 0) { @@ -182,30 +188,10 @@ uint32_t JournalFile::incrEnqueuedRecordCount() { return enqueuedRecordCount_.increment(); } -uint32_t JournalFile::addEnqueuedRecordCount(const uint32_t a) { - return enqueuedRecordCount_.add(a); -} - uint32_t JournalFile::decrEnqueuedRecordCount() { return enqueuedRecordCount_.decrementLimit(); } -uint32_t JournalFile::subtrEnqueuedRecordCount(const uint32_t s) { - return enqueuedRecordCount_.subtractLimit(s); -} - -uint32_t JournalFile::getSubmittedDblkCount() const { - return submittedDblkCount_.get(); -} - -uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) { - return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL); -} - -uint32_t JournalFile::getCompletedDblkCount() const { - return completedDblkCount_.get(); -} - uint32_t JournalFile::addCompletedDblkCount(const uint32_t a) { return completedDblkCount_.addLimit(a, submittedDblkCount_.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL); } @@ -214,10 +200,6 @@ uint16_t JournalFile::getOutstandingAioOperationCount() const { return outstandingAioOpsCount_.get(); } -uint16_t JournalFile::incrOutstandingAioOperationCount() { - return outstandingAioOpsCount_.increment(); -} - uint16_t JournalFile::decrOutstandingAioOperationCount() { uint16_t r = outstandingAioOpsCount_.decrementLimit(); if (fileCloseFlag_ && outstandingAioOpsCount_ == 0) { // Delayed close @@ -232,33 +214,10 @@ bool JournalFile::isEmpty() const { return submittedDblkCount_ == 0; } -bool JournalFile::isDataEmpty() const { - return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS; -} - -u_int32_t JournalFile::dblksRemaining() const { - return fileSize_dblks_ - submittedDblkCount_; -} - -bool JournalFile::isFull() const { - return submittedDblkCount_ == fileSize_dblks_; -} - -bool JournalFile::isFullAndComplete() const { - return completedDblkCount_ == fileSize_dblks_; -} - -u_int32_t JournalFile::getOutstandingAioDblks() const { - return submittedDblkCount_ - completedDblkCount_; -} - -bool JournalFile::getNextFile() const { - return isFull(); -} - bool JournalFile::isNoEnqueuedRecordsRemaining() const { - return !isDataEmpty() && // Must be written to, not empty - enqueuedRecordCount_ == 0; // No remaining enqueued records + return /*!enqueueStarted_ &&*/ // Not part-way through encoding an enqueue + isFullAndComplete() && // Full with all AIO returned + enqueuedRecordCount_ == 0; // No remaining enqueued records } // debug aid @@ -284,4 +243,59 @@ const std::string JournalFile::status_str(const uint8_t indentDepth) const { return oss.str(); } +// --- protected functions --- + +const std::string JournalFile::getDirectory() const { + return fqFileName_.substr(0, fqFileName_.rfind('/')); +} + +const std::string JournalFile::getFileName() const { + return fqFileName_.substr(fqFileName_.rfind('/')+1); +} + +bool JournalFile::isOpen() const { + return fileHandle_ >= 0; +} + +uint32_t JournalFile::getSubmittedDblkCount() const { + return submittedDblkCount_.get(); +} + +uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) { + return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL); +} + +uint32_t JournalFile::getCompletedDblkCount() const { + return completedDblkCount_.get(); +} + +uint16_t JournalFile::incrOutstandingAioOperationCount() { + return outstandingAioOpsCount_.increment(); +} + +u_int32_t JournalFile::dblksRemaining() const { + return fileSize_dblks_ - submittedDblkCount_; +} + +bool JournalFile::getNextFile() const { + return isFull(); +} + +u_int32_t JournalFile::getOutstandingAioDblks() const { + return submittedDblkCount_ - completedDblkCount_; +} + +bool JournalFile::isDataEmpty() const { + return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS; +} + +bool JournalFile::isFull() const { + return submittedDblkCount_ == fileSize_dblks_; +} + +bool JournalFile::isFullAndComplete() const { + return completedDblkCount_ == fileSize_dblks_; +} + + }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h index 262d3ce147..0abbc7bcca 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h @@ -52,6 +52,8 @@ protected: public: JournalFile(const std::string& fqFileName, + const ::file_hdr_t& fileHeader); + JournalFile(const std::string& fqFileName, const uint64_t fileSeqNum, const efpDataSize_kib_t efpDataSize_kib); virtual ~JournalFile(); @@ -59,12 +61,9 @@ public: void initialize(const uint32_t completedDblkCount); void finalize(); - const std::string getDirectory() const; - const std::string getFileName() const; const std::string getFqFileName() const; uint64_t getFileSeqNum() const; - bool isOpen() const; int open(); void close(); void asyncFileHeaderWrite(io_context_t ioContextPtr, @@ -81,32 +80,38 @@ public: uint32_t getEnqueuedRecordCount() const; uint32_t incrEnqueuedRecordCount(); - uint32_t addEnqueuedRecordCount(const uint32_t a); uint32_t decrEnqueuedRecordCount(); - uint32_t subtrEnqueuedRecordCount(const uint32_t s); - uint32_t getSubmittedDblkCount() const; - uint32_t addSubmittedDblkCount(const uint32_t a); - - uint32_t getCompletedDblkCount() const; uint32_t addCompletedDblkCount(const uint32_t a); uint16_t getOutstandingAioOperationCount() const; - uint16_t incrOutstandingAioOperationCount(); uint16_t decrOutstandingAioOperationCount(); // Status helper functions bool isEmpty() const; ///< True if no writes of any kind have occurred - bool isDataEmpty() const; ///< True if only file header written, data is still empty - u_int32_t dblksRemaining() const; ///< Dblks remaining until full - bool isFull() const; ///< True if all possible dblks have been submitted (but may not yet have returned from AIO) - bool isFullAndComplete() const; ///< True if all submitted dblks have returned from AIO - u_int32_t getOutstandingAioDblks() const; ///< Dblks still to be written - bool getNextFile() const; ///< True when next file is needed bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued // debug aid const std::string status_str(const uint8_t indentDepth) const; + +protected: + const std::string getDirectory() const; + const std::string getFileName() const; + bool isOpen() const; + + uint32_t getSubmittedDblkCount() const; + uint32_t addSubmittedDblkCount(const uint32_t a); + + uint32_t getCompletedDblkCount() const; + + uint16_t incrOutstandingAioOperationCount(); + + u_int32_t dblksRemaining() const; ///< Dblks remaining until full + bool getNextFile() const; ///< True when next file is needed + u_int32_t getOutstandingAioDblks() const; ///< Dblks still to be written + bool isDataEmpty() const; ///< True if only file header written, data is still empty + bool isFull() const; ///< True if all possible dblks have been submitted (but may not yet have returned from AIO) + bool isFullAndComplete() const; ///< True if all submitted dblks have returned from AIO }; }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp index 925e6597a9..2c1327d645 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp @@ -29,6 +29,8 @@ #include "qpid/linearstore/jrnl/slock.h" #include "qpid/linearstore/jrnl/utils/file_hdr.h" +//#include <iostream> // DEBUG + namespace qpid { namespace qls_jrnl { @@ -36,8 +38,8 @@ LinearFileController::LinearFileController(jcntl& jcntlRef) : jcntlRef_(jcntlRef), emptyFilePoolPtr_(0), currentJournalFilePtr_(0), - fileSeqCounter_(0), - recordIdCounter_(0) + fileSeqCounter_("LinearFileController::fileSeqCounter", 0), + recordIdCounter_("LinearFileController::recordIdCounter", 0) {} LinearFileController::~LinearFileController() {} @@ -47,7 +49,7 @@ void LinearFileController::initialize(const std::string& journalDirectory, uint64_t initialFileNumberVal) { journalDirectory_.assign(journalDirectory); emptyFilePoolPtr_ = emptyFilePoolPtr; - fileSeqCounter_ = initialFileNumberVal; + fileSeqCounter_.set(initialFileNumberVal); } void LinearFileController::finalize() { @@ -57,14 +59,13 @@ void LinearFileController::finalize() { } } -void LinearFileController::addJournalFile(const std::string& fileName, - const uint64_t fileNumber, - const uint32_t fileSize_kib, +void LinearFileController::addJournalFile(JournalFile* journalFilePtr, const uint32_t completedDblkCount) { - if (currentJournalFilePtr_) + if (currentJournalFilePtr_) { currentJournalFilePtr_->close(); - currentJournalFilePtr_ = new JournalFile(fileName, fileNumber, fileSize_kib); - currentJournalFilePtr_->initialize(completedDblkCount); + } + journalFilePtr->initialize(completedDblkCount); + currentJournalFilePtr_ = journalFilePtr; { slock l(journalFileListMutex_); journalFileList_.push_back(currentJournalFilePtr_); @@ -72,18 +73,10 @@ void LinearFileController::addJournalFile(const std::string& fileName, currentJournalFilePtr_->open(); } -efpDataSize_kib_t LinearFileController::dataSize_kib() const { - return emptyFilePoolPtr_->dataSize_kib(); -} - efpDataSize_sblks_t LinearFileController::dataSize_sblks() const { return emptyFilePoolPtr_->dataSize_sblks(); } -efpFileSize_kib_t LinearFileController::fileSize_kib() const { - return emptyFilePoolPtr_->fileSize_kib(); -} - efpFileSize_sblks_t LinearFileController::fileSize_sblks() const { return emptyFilePoolPtr_->fileSize_sblks(); } @@ -100,28 +93,24 @@ void LinearFileController::pullEmptyFileFromEfp() { addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0); } -void LinearFileController::purgeFilesToEfp() { +void LinearFileController::purgeEmptyFilesToEfp() { slock l(journalFileListMutex_); - while (journalFileList_.front()->isNoEnqueuedRecordsRemaining()) { - emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); - delete journalFileList_.front(); - journalFileList_.pop_front(); - } + purgeEmptyFilesToEfpNoLock(); } uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->getEnqueuedRecordCount(); } uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { - assertCurrentJournalFileValid("incrEnqueuedRecordCount"); return find(fileSeqNumber)->incrEnqueuedRecordCount(); } uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { slock l(journalFileListMutex_); - return find(fileSeqNumber)->decrEnqueuedRecordCount(); + uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount(); +// purgeEmptyFilesToEfpNoLock(); + return r; } uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) { @@ -160,101 +149,11 @@ uint64_t LinearFileController::getCurrentFileSeqNum() const { return currentJournalFilePtr_->getFileSeqNum(); } -uint32_t LinearFileController::getEnqueuedRecordCount() const { - assertCurrentJournalFileValid("getEnqueuedRecordCount"); - return currentJournalFilePtr_->getEnqueuedRecordCount(); -} - -uint32_t LinearFileController::incrEnqueuedRecordCount() { - assertCurrentJournalFileValid("incrEnqueuedRecordCount"); - return currentJournalFilePtr_->incrEnqueuedRecordCount(); -} - -uint32_t LinearFileController::addEnqueuedRecordCount(const uint32_t a) { - assertCurrentJournalFileValid("addEnqueuedRecordCount"); - return currentJournalFilePtr_->addEnqueuedRecordCount(a); -} - -uint32_t LinearFileController::decrEnqueuedRecordCount() { - assertCurrentJournalFileValid("decrEnqueuedRecordCount"); - return currentJournalFilePtr_->decrEnqueuedRecordCount(); -} - -uint32_t LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) { - assertCurrentJournalFileValid("subtrEnqueuedRecordCount"); - return currentJournalFilePtr_->subtrEnqueuedRecordCount(s); -} - -uint32_t LinearFileController::getWriteSubmittedDblkCount() const { - assertCurrentJournalFileValid("getWriteSubmittedDblkCount"); - return currentJournalFilePtr_->getSubmittedDblkCount(); -} - -uint32_t LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) { - assertCurrentJournalFileValid("addWriteSubmittedDblkCount"); - return currentJournalFilePtr_->addSubmittedDblkCount(a); -} - -uint32_t LinearFileController::getWriteCompletedDblkCount() const { - assertCurrentJournalFileValid("getWriteCompletedDblkCount"); - return currentJournalFilePtr_->getCompletedDblkCount(); -} - -uint32_t LinearFileController::addWriteCompletedDblkCount(const uint32_t a) { - assertCurrentJournalFileValid("addWriteCompletedDblkCount"); - return currentJournalFilePtr_->addCompletedDblkCount(a); -} - -uint16_t LinearFileController::getOutstandingAioOperationCount() const { - assertCurrentJournalFileValid("getOutstandingAioOperationCount"); - return currentJournalFilePtr_->getOutstandingAioOperationCount(); -} - -uint16_t LinearFileController::incrOutstandingAioOperationCount() { - assertCurrentJournalFileValid("incrOutstandingAioOperationCount"); - return currentJournalFilePtr_->incrOutstandingAioOperationCount(); -} - -uint16_t LinearFileController::decrOutstandingAioOperationCount() { - assertCurrentJournalFileValid("decrOutstandingAioOperationCount"); - return currentJournalFilePtr_->decrOutstandingAioOperationCount(); -} - bool LinearFileController::isEmpty() const { assertCurrentJournalFileValid("isEmpty"); return currentJournalFilePtr_->isEmpty(); } -bool LinearFileController::isDataEmpty() const { - assertCurrentJournalFileValid("isDataEmpty"); - return currentJournalFilePtr_->isDataEmpty(); -} - -u_int32_t LinearFileController::dblksRemaining() const { - assertCurrentJournalFileValid("dblksRemaining"); - return currentJournalFilePtr_->dblksRemaining(); -} - -bool LinearFileController::isFull() const { - assertCurrentJournalFileValid("isFull"); - return currentJournalFilePtr_->isFull(); -} - -bool LinearFileController::isFullAndComplete() const { - assertCurrentJournalFileValid("isFullAndComplete"); - return currentJournalFilePtr_->isFullAndComplete(); -} - -u_int32_t LinearFileController::getOutstandingAioDblks() const { - assertCurrentJournalFileValid("getOutstandingAioDblks"); - return currentJournalFilePtr_->getOutstandingAioDblks(); -} - -bool LinearFileController::needNextFile() const { - assertCurrentJournalFileValid("getNextFile"); - return currentJournalFilePtr_->getNextFile(); -} - const std::string LinearFileController::status(const uint8_t indentDepth) const { std::string indent((size_t)indentDepth, '.'); std::ostringstream oss; @@ -273,8 +172,12 @@ const std::string LinearFileController::status(const uint8_t indentDepth) const // --- protected functions --- -bool LinearFileController::checkCurrentJournalFileValid() const { - return currentJournalFilePtr_ != 0; +void LinearFileController::addJournalFile(const std::string& fileName, + const uint64_t fileNumber, + const uint32_t fileSize_kib, + const uint32_t completedDblkCount) { + JournalFile* jfp = new JournalFile(fileName, fileNumber, fileSize_kib); + addJournalFile(jfp, completedDblkCount); } void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const { @@ -283,6 +186,10 @@ void LinearFileController::assertCurrentJournalFileValid(const char* const funct } } +bool LinearFileController::checkCurrentJournalFileValid() const { + return currentJournalFilePtr_ != 0; +} + // NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) { if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) @@ -301,4 +208,15 @@ uint64_t LinearFileController::getNextFileSeqNum() { return fileSeqCounter_.increment(); } +void LinearFileController::purgeEmptyFilesToEfpNoLock() { +//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG + while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && + journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records +//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG + emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); + delete journalFileList_.front(); + journalFileList_.pop_front(); + } +} + }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h index 7721685196..83d360dad8 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h @@ -62,18 +62,14 @@ public: uint64_t initialFileNumberVal); void finalize(); - void addJournalFile(const std::string& fileName, - const uint64_t fileNumber, - const uint32_t fileSize_kib, + void addJournalFile(JournalFile* journalFilePtr, const uint32_t completedDblkCount); - efpDataSize_kib_t dataSize_kib() const; efpDataSize_sblks_t dataSize_sblks() const; - efpFileSize_kib_t fileSize_kib() const; efpFileSize_sblks_t fileSize_sblks() const; uint64_t getNextRecordId(); void pullEmptyFileFromEfp(); - void purgeFilesToEfp(); + void purgeEmptyFilesToEfp(); // Functions for manipulating counts of non-current JournalFile instances in journalFileList_ uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber); @@ -83,7 +79,7 @@ public: const uint32_t a); uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber); - // Pass-through functions for JournalFile class + // Pass-through functions for current JournalFile class void asyncFileHeaderWrite(io_context_t ioContextPtr, const uint16_t userFlags, const uint64_t recordId, @@ -94,42 +90,25 @@ public: uint32_t dataSize_dblks); uint64_t getCurrentFileSeqNum() const; - - uint32_t getEnqueuedRecordCount() const; - uint32_t incrEnqueuedRecordCount(); - uint32_t addEnqueuedRecordCount(const uint32_t a); - uint32_t decrEnqueuedRecordCount(); - uint32_t subtrEnqueuedRecordCount(const uint32_t s); - - uint32_t getWriteSubmittedDblkCount() const; - uint32_t addWriteSubmittedDblkCount(const uint32_t a); - - uint32_t getWriteCompletedDblkCount() const; - uint32_t addWriteCompletedDblkCount(const uint32_t a); - - uint16_t getOutstandingAioOperationCount() const; - uint16_t incrOutstandingAioOperationCount(); - uint16_t decrOutstandingAioOperationCount(); - - bool isEmpty() const; // True if no writes of any kind have occurred - bool isDataEmpty() const; // True if only file header written, data is still empty - u_int32_t dblksRemaining() const; // Dblks remaining until full - bool isFull() const; // True if all possible dblks have been submitted (but may not yet have returned from AIO) - bool isFullAndComplete() const; // True if all submitted dblks have returned from AIO - u_int32_t getOutstandingAioDblks() const; // Dblks still to be written - bool needNextFile() const; // True when next file is needed + bool isEmpty() const; // Debug aid const std::string status(const uint8_t indentDepth) const; protected: + void addJournalFile(const std::string& fileName, + const uint64_t fileNumber, + const uint32_t fileSize_kib, + const uint32_t completedDblkCount); void assertCurrentJournalFileValid(const char* const functionName) const; bool checkCurrentJournalFileValid() const; JournalFile* find(const efpFileCount_t fileSeqNumber); uint64_t getNextFileSeqNum(); + void purgeEmptyFilesToEfpNoLock(); }; -typedef void (LinearFileController::*lfcAddJournalFileFn)(const std::string&, const uint64_t, const uint32_t, const uint32_t); +typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr, + const uint32_t completedDblkCount); }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp index 3dc11ec27c..a1cf30bae9 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp @@ -31,6 +31,7 @@ #include "qpid/linearstore/jrnl/enq_rec.h" #include "qpid/linearstore/jrnl/jcfg.h" #include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/JournalFile.h" #include "qpid/linearstore/jrnl/JournalLog.h" #include "qpid/linearstore/jrnl/jrec.h" #include "qpid/linearstore/jrnl/LinearFileController.h" @@ -62,7 +63,7 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory, highestRecordId_(0ULL), highestFileNumber_(0ULL), lastFileFullFlag_(false), - fileSize_kib_(0) + efpFileSize_kib_(0) {} RecoveryManager::~RecoveryManager() {} @@ -74,18 +75,19 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr efpIdentity_t efpIdentity; analyzeJournalFileHeaders(efpIdentity); *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); - fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib(); + efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib(); + // Check for file full condition lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; - // Restore all read and write pointers and transactions if (!journalEmptyFlag_) { - while (getNextRecordHeader()) { - } + // Read all records, establish remaining enqueued records + while (getNextRecordHeader()) {} if (inFileStream_.is_open()) { inFileStream_.close(); } + // Remove leading files which have no enqueued records removeEmptyFiles(*emptyFilePoolPtrPtr); @@ -100,14 +102,14 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found // Unlock any affected enqueues in emap for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) { - if (i->_enq_flag) { // enq op - decrement enqueue count - enqueueCountList_[i->_pfid]--; - } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record - int16_t ret = enqueueMapRef_.unlock(i->_drid); + if (i->enq_flag_) { // enq op - decrement enqueue count + fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount(); + } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record + int16_t ret = enqueueMapRef_.unlock(i->drid_); if (ret < enq_map::EMAP_OK) { // fail // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND std::ostringstream oss; - oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid; + oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals"); } } @@ -115,7 +117,10 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr } } } + + // Set up recordIdList_ from enqueue map enqueueMapRef_.rid_list(recordIdList_); + recordIdListConstItr_ = recordIdList_.begin(); } } @@ -144,19 +149,13 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, bool& external, data_tok* const dtokp, bool /*ignore_pending_txns*/) { - if (!dtokp->is_readable()) { - std::ostringstream oss; - oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id(); - oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str(); - throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord"); - } if (recordIdListConstItr_ == recordIdList_.end()) { return false; } enq_map::emap_data_struct_t eds; enqueueMapRef_.get_data(*recordIdListConstItr_, eds); uint64_t fileNumber = eds._pfid; - currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber); + currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); getNextFile(false); inFileStream_.seekg(eds._file_posn, std::ifstream::beg); @@ -195,18 +194,26 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord"); } readJournalData((char*)*dataPtrPtr, dataSize); + + // Set data token + dtokp->set_wstate(data_tok::ENQ); + dtokp->set_rid(enqueueHeader._rhdr._rid); + dtokp->set_dsize(dataSize); + if (xidSize) { + dtokp->set_xid(*xidPtrPtr, xidSize); + } + + ++recordIdListConstItr_; return true; } void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { -//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG - for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) { + for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file? - endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset - fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full - (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount); -//std::cout << " ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG + endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset + efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full + (lfcPtr->*fnPtr)(i->second, fileDblkCount); } } @@ -216,14 +223,19 @@ std::string RecoveryManager::toString(const std::string& jid, if (compact) { oss << "Recovery journal analysis (jid=\"" << jid << "\"):"; oss << " jfl=["; - for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) { - if (i!=fileNumberNameMap_.begin()) oss << " "; - oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1); + for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) { + if (i!=fileNumberMap_.begin()) { + oss << " "; + } + std::string fqFileName = i->second->getFqFileName(); + oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1); } oss << "] ecl=[ "; - for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) { - if (j != enqueueCountList_.begin()) oss << " "; - oss << *j; + for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) { + if (j!=fileNumberMap_.begin()) { + oss << " "; + } + oss << j->second->getEnqueuedRecordCount(); } oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F"); oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)"; @@ -233,15 +245,18 @@ std::string RecoveryManager::toString(const std::string& jid, oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F"); } else { oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl; - oss << " Number of journal files = " << fileNumberNameMap_.size() << std::endl; + oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; oss << " Journal File List:" << std::endl; - for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) { - oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl; + for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { + std::string fqFileName = k->second->getFqFileName(); + oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; } oss << " Enqueue Counts: [ " << std::endl; - for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) { - if (j != enqueueCountList_.begin()) oss << ", "; - oss << *j; + for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) { + if (l != fileNumberMap_.begin()) { + oss << ", "; + } + oss << l->second->getEnqueuedRecordCount(); } oss << " ]" << std::endl; oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; @@ -271,21 +286,26 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); } else { - fileNumberNameMap_[fileHeader._file_number] = *i; + JournalFile* jfp = new JournalFile(*i, fileHeader); + fileNumberMap_[fileHeader._file_number] = jfp; if (fileHeader._file_number > highestFileNumber_) { highestFileNumber_ = fileHeader._file_number; } } } - efpIdentity.first = fileHeader._efp_partition; - efpIdentity.second = fileHeader._file_size_kib; - enqueueCountList_.resize(fileNumberNameMap_.size(), 0); - currentJournalFileConstItr_ = fileNumberNameMap_.begin(); + efpIdentity.pn_ = fileHeader._efp_partition; + efpIdentity.ds_ = fileHeader._data_size_kib; + currentJournalFileConstItr_ = fileNumberMap_.begin(); } void RecoveryManager::checkFileStreamOk(bool checkEof) { if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) { - throw jexception("read failure"); // TODO complete exception + std::ostringstream oss; + oss << "Stream status: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + if (checkEof) { + oss << " eof=" << (inFileStream_.eof()?"T":"F"); + } + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "checkFileStreamOk"); } } @@ -339,7 +359,6 @@ bool RecoveryManager::decodeRecord(jrec& record, ::rec_hdr_t& headerRecord, std::streampos& fileOffset) { -// uint16_t start_fid = getCurrentFileNumber(); std::streampos start_file_offs = fileOffset; if (highestRecordId_ == 0) { @@ -354,14 +373,7 @@ bool RecoveryManager::decodeRecord(jrec& record, done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead); } catch (const jexception& e) { -// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn. -// Original -// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || -// fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw; -// Tried this, but did not work -// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw; checkJournalAlignment(start_file_offs); -// rd._lfid = start_fid; return false; } if (!done && !getNextFile(false)) { @@ -373,7 +385,7 @@ bool RecoveryManager::decodeRecord(jrec& record, } std::string RecoveryManager::getCurrentFileName() const { - return currentJournalFileConstItr_->second; + return currentJournalFileConstItr_->second->getFqFileName(); } uint64_t RecoveryManager::getCurrentFileNumber() const { @@ -382,19 +394,21 @@ uint64_t RecoveryManager::getCurrentFileNumber() const { bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { if (inFileStream_.is_open()) { - if (inFileStream_.eof() || !inFileStream_.good()) - { + if (inFileStream_.eof() || !inFileStream_.good()) { inFileStream_.clear(); endOffset_ = inFileStream_.tellg(); // remember file offset before closing - if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception + if (endOffset_ == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile"); + } inFileStream_.close(); - if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) { + if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { return false; } } } - if (!inFileStream_.is_open()) - { + if (!inFileStream_.is_open()) { inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); if (!inFileStream_.good()) { @@ -402,7 +416,6 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { } // Read file header -//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG file_hdr_t fhdr; inFileStream_.read((char*)&fhdr, sizeof(fhdr)); checkFileStreamOk(true); @@ -412,7 +425,7 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { inFileStream_.seekg(foffs); } else { inFileStream_.close(); - if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) { + if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { journalEmptyFlag_ = true; } return false; @@ -436,7 +449,11 @@ bool RecoveryManager::getNextRecordHeader() } } file_pos = inFileStream_.tellg(); -//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG + if (file_pos == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader"); + } inFileStream_.read((char*)&h, sizeof(rec_hdr_t)); if (inFileStream_.gcount() == sizeof(rec_hdr_t)) { hdr_ok = true; @@ -450,19 +467,21 @@ bool RecoveryManager::getNextRecordHeader() switch(h._magic) { case QLS_ENQ_MAGIC: { -//std::cout << ".e" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG enq_rec er; uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(er, cum_size_read, h, file_pos)) { return false; } if (!er.is_transient()) { // Ignore transient msgs - enqueueCountList_[start_fid]++; + fileNumberMap_[start_fid]->incrEnqueuedRecordCount(); if (er.xid_size()) { er.get_xid(&xidp); - if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader"); + } std::string xid((char*)xidp, er.xid_size()); - transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true)); + transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true)); if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found std::ostringstream oss; oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid; @@ -482,7 +501,7 @@ bool RecoveryManager::getNextRecordHeader() break; case QLS_DEQ_MAGIC: { -//std::cout << ".d" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG deq_rec dr; uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(dr, cum_size_read, h, file_pos)) { @@ -492,10 +511,12 @@ bool RecoveryManager::getNextRecordHeader() // If the enqueue is part of a pending txn, it will not yet be in emap enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error dr.get_xid(&xidp); - if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "DEQ", "RecoveryManager", "getNextRecordHeader"); + } std::string xid((char*)xidp, dr.xid_size()); - transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false, - dr.is_txn_coml_commit())); + transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos, + false, dr.is_txn_coml_commit())); if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found std::ostringstream oss; oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid(); @@ -505,30 +526,30 @@ bool RecoveryManager::getNextRecordHeader() } else { uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error - enqueueCountList_[enq_fid]--; + fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); } } } break; case QLS_TXA_MAGIC: { -//std::cout << ".a" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG txn_rec ar; if (!decodeRecord(ar, cum_size_read, h, file_pos)) { return false; } // Delete this txn from tmap, unlock any locked records in emap ar.get_xid(&xidp); - if (xidp != 0) { - throw jexception("Null xid with non-null xid_size"); // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, ar.xid_size()); txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (itr->_enq_flag) { - enqueueCountList_[itr->_pfid]--; + if (itr->enq_flag_) { + fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount(); } else { - enqueueMapRef_.unlock(itr->_drid); // ignore not found error + enqueueMapRef_.unlock(itr->drid_); // ignore not found error } } std::free(xidp); @@ -536,30 +557,31 @@ bool RecoveryManager::getNextRecordHeader() break; case QLS_TXC_MAGIC: { -//std::cout << ".t" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG txn_rec cr; if (!decodeRecord(cr, cum_size_read, h, file_pos)) { return false; } // Delete this txn from tmap, process records into emap cr.get_xid(&xidp); - if (xidp != 0) { - throw jexception("Null xid with non-null xid_size"); // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, cr.xid_size()); txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (itr->_enq_flag) { // txn enqueue - if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail + if (itr->enq_flag_) { // txn enqueue +//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG + if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; - oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid; + oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_; throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader"); } } else { // txn dequeue uint64_t enq_fid; - if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error - enqueueCountList_[enq_fid]--; + if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error + fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); } } std::free(xidp); @@ -577,11 +599,11 @@ bool RecoveryManager::getNextRecordHeader() } break; case 0: -//std::cout << ".0" << std::endl << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG checkJournalAlignment(file_pos); return false; default: -//std::cout << ".?" << std::endl << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG // Stop as this is the overwrite boundary. checkJournalAlignment(file_pos); return false; @@ -593,16 +615,24 @@ void RecoveryManager::readJournalData(char* target, const std::streamsize readSize) { std::streamoff bytesRead = 0; while (bytesRead < readSize) { - if (inFileStream_.eof()) { - getNextFile(false); + std::streampos file_pos = inFileStream_.tellg(); + if (file_pos == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData"); } - bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024; - std::streamoff actualReadSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg(); - inFileStream_.read(target + bytesRead, actualReadSize); - if (inFileStream_.gcount() != actualReadSize) { - throw jexception(); // TODO - proper exception + inFileStream_.read(target + bytesRead, readSize - bytesRead); + std::streamoff thisReadSize = inFileStream_.gcount(); + if (thisReadSize < readSize) { + getNextFile(false); + file_pos = inFileStream_.tellg(); + if (file_pos == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData"); + } } - bytesRead += actualReadSize; + bytesRead += thisReadSize; } } @@ -633,12 +663,10 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName, } void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { - while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) { - fileNumberNameMapItr_t i = fileNumberNameMap_.begin(); -//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; - emptyFilePoolPtr->returnEmptyFile(i->second); - fileNumberNameMap_.erase(i); - enqueueCountList_.pop_front(); + while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { +//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG + emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName()); + fileNumberMap_.erase(fileNumberMap_.begin()->first); } } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h index 91446c6abf..3cacd7cfb3 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h @@ -49,11 +49,9 @@ protected: // Types typedef std::vector<std::string> directoryList_t; typedef directoryList_t::const_iterator directoryListConstItr_t; - typedef std::map<uint64_t, std::string> fileNumberNameMap_t; - typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t; - typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t; - typedef std::deque<uint32_t> enqueueCountList_t; - typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t; + typedef std::map<uint64_t, JournalFile*> fileNumberMap_t; + typedef fileNumberMap_t::iterator fileNumberMapItr_t; + typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t; typedef std::vector<uint64_t> recordIdList_t; typedef recordIdList_t::const_iterator recordIdListConstItr_t; @@ -65,8 +63,7 @@ protected: JournalLog& journalLogRef_; // Initial journal analysis data - fileNumberNameMap_t fileNumberNameMap_; ///< File number - name map - enqueueCountList_t enqueueCountList_; ///< Number enqueued records found for each file + fileNumberMap_t fileNumberMap_; ///< File number - JournalFilePtr map bool journalEmptyFlag_; ///< Journal data files empty std::streamoff firstRecordOffset_; ///< First record offset in ffid std::streamoff endOffset_; ///< End offset (first byte past last record) @@ -75,8 +72,8 @@ protected: bool lastFileFullFlag_; ///< Last file is full // State for recovery of individual enqueued records - uint32_t fileSize_kib_; - fileNumberNameMapConstItr_t currentJournalFileConstItr_; + uint32_t efpFileSize_kib_; + fileNumberMapConstItr_t currentJournalFileConstItr_; std::string currentFileName_; std::ifstream inFileStream_; recordIdList_t recordIdList_; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp index e8873cddcd..f21deead5d 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp @@ -103,35 +103,10 @@ jcntl::initialize(EmptyFilePool* efpp, _tmap.clear(); _linearFileController.finalize(); - -// _lpmgr.finalize(); - - // Set new file geometry parameters -// assert(num_jfiles >= JRNL_MIN_NUM_FILES); -// assert(num_jfiles <= JRNL_MAX_NUM_FILES); -// _emap.set_num_jfiles(num_jfiles); -// _tmap.set_num_jfiles(num_jfiles); - -// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE); -// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE); -// _jfsize_sblks = jfsize_sblks; - - // Clear any existing journal files - _jdir.clear_dir(); -// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files - + _jdir.clear_dir(); // Clear any existing journal files _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL); _linearFileController.pullEmptyFileFromEfp(); -// std::cout << _linearFileController.status(2); -// _wrfc.initialize(_jfsize_sblks); -// _rrfc.initialize(); -// _rrfc.set_findex(0); -// _rmgr.initialize(cbp); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS); - - // Write info file (<basename>.jinf) to disk -// write_infofile(); - _init_flag = true; } @@ -152,36 +127,14 @@ jcntl::recover(EmptyFilePoolManager* efpmp, _linearFileController.finalize(); -// _lpmgr.finalize(); - -// assert(num_jfiles >= JRNL_MIN_NUM_FILES); -// assert(num_jfiles <= JRNL_MAX_NUM_FILES); -// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE); -// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE); -// _jfsize_sblks = jfsize_sblks; - // Verify journal dir and journal files _jdir.verify_dir(); -// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/); - -// rcvr_janalyze(prep_txn_list_ptr, efpm); - efpIdentity_t efpIdentity; _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr); highest_rid = _recoveryManager.getHighestRecordId(); -// if (_rcvdat._jfull) -// throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover"); _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true)); - -// _lpmgr.recover(_rcvdat, this, &new_fcntl); - _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber()); -// _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber()); _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController); -// _wrfc.initialize(_jfsize_sblks, &_rcvdat); -// _rrfc.initialize(); -// _rrfc.set_findex(_rcvdat.ffid()); -// _rmgr.initialize(cbp); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset())); @@ -194,12 +147,6 @@ jcntl::recover_complete() { if (!_readonly_flag) throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete"); -// for (uint16_t i=0; i<_lpmgr.num_jfiles(); i++) -// _lpmgr.get_fcntlp(i)->reset(&_rcvdat); -// _wrfc.initialize(_jfsize_sblks, &_rcvdat); -// _rrfc.initialize(); -// _rrfc.set_findex(_rcvdat.ffid()); -// _rmgr.recover_complete(); _readonly_flag = false; } @@ -207,7 +154,7 @@ void jcntl::delete_jrnl_files() { stop(true); // wait for AIO to complete - _linearFileController.purgeFilesToEfp(); + _linearFileController.purgeEmptyFilesToEfp(); _jdir.delete_dir(); } @@ -288,73 +235,10 @@ jcntl::read_data_record(void** const datapp, bool ignore_pending_txns) { check_rstatus("read_data"); - if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) + if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) { return RHM_IORES_SUCCESS; - return RHM_IORES_EMPTY; -/* - if (!dtokp->is_readable()) { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "dtok_id=0x" << std::setw(8) << dtokp->id(); - oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid(); - oss << "; dtok_wstate=" << dtokp->wstate_str(); - throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record"); } - std::vector<uint64_t> ridl; - _emap.rid_list(ridl); - enq_map::emap_data_struct_t eds; - for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) { - short res = _emap.get_data(*i, eds); - if (res == enq_map::EMAP_OK) { - std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary); - if (!ifs.good()) { - std::ostringstream oss; - oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn; - throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record"); - } - ifs.seekg(eds._file_posn, std::ifstream::beg); - ::enq_hdr_t eh; - ifs.read((char*)&eh, sizeof(::enq_hdr_t)); - if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) { - std::ostringstream oss; - oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn; - throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record"); - } - dsize = eh._dsize; - xidsize = eh._xidsize; - transient = ::is_enq_transient(&eh); - external = ::is_enq_external(&eh); - if (xidsize) { - *xidpp = ::malloc(xidsize); - ifs.read((char*)(*xidpp), xidsize); - } else { - *xidpp = 0; - } - if (dsize) { - *datapp = ::malloc(dsize); - ifs.read((char*)(*datapp), dsize); - } else { - *datapp = 0; - } - } - } -*/ -/* - check_rstatus("read_data"); - iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns); - if (res == RHM_IORES_RCINVALID) - { - get_wr_events(0); // check for outstanding write events - iores sres = _rmgr.synchronize(); // flushes all outstanding read events - if (sres != RHM_IORES_SUCCESS) - return sres; - // TODO: Does linear store need this? -// _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs - res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns); - } - return res; -*/ - return RHM_IORES_SUCCESS; + return RHM_IORES_EMPTY; } iores diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp index 7c70be1ed7..c8fd2f55fe 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp @@ -53,7 +53,6 @@ const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204; const uint32_t jerrno::JERR_JCNTL_ENQSTATE = 0x0207; const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR = 0x0208; - // class jdir const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300; const uint32_t jerrno::JERR_JDIR_MKDIR = 0x0301; @@ -89,16 +88,11 @@ const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805; const uint32_t jerrno::JERR_WMGR_BADFH = 0x0806; // class RecoveryManager -const uint32_t jerrno::JERR_RCVM_OPENRD = 0x0900; -const uint32_t jerrno::JERR_RCVM_READ = 0x0901; -const uint32_t jerrno::JERR_RCVM_WRITE = 0x0902; - -//// class rmgr -//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900; -//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901; -////const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902; -//const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903; -//const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904; +const uint32_t jerrno::JERR_RCVM_OPENRD = 0x0900; ///< Unable to open file for read +const uint32_t jerrno::JERR_RCVM_STREAMBAD = 0x0901; ///< Read/write stream error +const uint32_t jerrno::JERR_RCVM_READ = 0x0902; ///< Read error: no or insufficient data to read +const uint32_t jerrno::JERR_RCVM_WRITE = 0x0903; ///< Write error +const uint32_t jerrno::JERR_RCVM_NULLXID = 0x0904; ///< Null XID when XID length non-null in header // class data_tok const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00; @@ -184,15 +178,11 @@ jerrno::__init() _err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle."; // class RecoveryManager - _err_map[JERR_RCVM_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write"; - _err_map[JERR_RCVM_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read"; + _err_map[JERR_RCVM_OPENRD] = "JERR_RCVM_OPENRD: Unable to open file for read"; + _err_map[JERR_RCVM_STREAMBAD] = "JERR_RCVM_STREAMBAD: Read/write stream error"; + _err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read"; _err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error"; -// // class rmgr -// _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic."; -// _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID"; -// //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc"; -// _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ"; -// _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type"; + _err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header"; // class data_tok _err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state."; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h index 7bd521ecbf..4c7124ae22 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h @@ -50,40 +50,40 @@ namespace qls_jrnl public: // generic errors - static const uint32_t JERR__MALLOC; ///< Buffer memory allocation failed - static const uint32_t JERR__UNDERFLOW; ///< Underflow error - static const uint32_t JERR__NINIT; ///< Operation on uninitialized class - static const uint32_t JERR__AIO; ///< AIO failure - static const uint32_t JERR__FILEIO; ///< File read or write failure - static const uint32_t JERR__RTCLOCK; ///< Reading real-time clock failed - static const uint32_t JERR__PTHREAD; ///< pthread failure - static const uint32_t JERR__TIMEOUT; ///< Timeout waiting for an event - static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event - static const uint32_t JERR__RECNFOUND; ///< Record not found - static const uint32_t JERR__NOTIMPL; ///< Not implemented - static const uint32_t JERR__NULL; ///< Operation on null pointer + static const uint32_t JERR__MALLOC; ///< Buffer memory allocation failed + static const uint32_t JERR__UNDERFLOW; ///< Underflow error + static const uint32_t JERR__NINIT; ///< Operation on uninitialized class + static const uint32_t JERR__AIO; ///< AIO failure + static const uint32_t JERR__FILEIO; ///< File read or write failure + static const uint32_t JERR__RTCLOCK; ///< Reading real-time clock failed + static const uint32_t JERR__PTHREAD; ///< pthread failure + static const uint32_t JERR__TIMEOUT; ///< Timeout waiting for an event + static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event + static const uint32_t JERR__RECNFOUND; ///< Record not found + static const uint32_t JERR__NOTIMPL; ///< Not implemented + static const uint32_t JERR__NULL; ///< Operation on null pointer // class jcntl - static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal - static const uint32_t JERR_JCNTL_READONLY; ///< Write operation on read-only journal - static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete - static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic - static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first - static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state - static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header + static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal + static const uint32_t JERR_JCNTL_READONLY; ///< Write operation on read-only journal + static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete + static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic + static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first + static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state + static const uint32_t JERR_JCNTL_INVALIDENQHDR; ///< Invalid ENQ header // class jdir - static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory - static const uint32_t JERR_JDIR_MKDIR; ///< Directory creation failed - static const uint32_t JERR_JDIR_OPENDIR; ///< Directory open failed - static const uint32_t JERR_JDIR_READDIR; ///< Directory read failed - static const uint32_t JERR_JDIR_CLOSEDIR; ///< Directory close failed - static const uint32_t JERR_JDIR_RMDIR; ///< Directory delete failed - static const uint32_t JERR_JDIR_NOSUCHFILE; ///< File does not exist - static const uint32_t JERR_JDIR_FMOVE; ///< File move failed - static const uint32_t JERR_JDIR_STAT; ///< File stat failed - static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed - static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode) + static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory + static const uint32_t JERR_JDIR_MKDIR; ///< Directory creation failed + static const uint32_t JERR_JDIR_OPENDIR; ///< Directory open failed + static const uint32_t JERR_JDIR_READDIR; ///< Directory read failed + static const uint32_t JERR_JDIR_CLOSEDIR; ///< Directory close failed + static const uint32_t JERR_JDIR_RMDIR; ///< Directory delete failed + static const uint32_t JERR_JDIR_NOSUCHFILE; ///< File does not exist + static const uint32_t JERR_JDIR_FMOVE; ///< File move failed + static const uint32_t JERR_JDIR_STAT; ///< File stat failed + static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed + static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode) // class JournalFile static const uint32_t JERR_JNLF_OPEN; ///< Unable to open file for write @@ -92,47 +92,42 @@ namespace qls_jrnl static const uint32_t JERR_JNLF_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs // class LinearFileController - static const uint32_t JERR_LFCR_SEQNUMNOTFOUND;///< File sequence number not found + static const uint32_t JERR_LFCR_SEQNUMNOTFOUND; ///< File sequence number not found // class jrec, enq_rec, deq_rec, txn_rec - static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header - static const uint32_t JERR_JREC_BADRECTAIL; ///< Invalid data record tail + static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header + static const uint32_t JERR_JREC_BADRECTAIL; ///< Invalid data record tail // class wmgr - static const uint32_t JERR_WMGR_BADPGSTATE; ///< Page buffer in illegal state. - static const uint32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state. - static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl. - static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl. - static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued - static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle + static const uint32_t JERR_WMGR_BADPGSTATE; ///< Page buffer in illegal state. + static const uint32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state. + static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl. + static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl. + static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued + static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle // class RecoveryManager - static const uint32_t JERR_RCVM_OPENRD; ///< Unable to open file for read - static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read + static const uint32_t JERR_RCVM_OPENRD; ///< Unable to open file for read + static const uint32_t JERR_RCVM_STREAMBAD; ///< Read/write stream error + static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read static const uint32_t JERR_RCVM_WRITE; ///< Write error - -// // class rmgr -// static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic -// static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok -// //static const uint32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc -// static const uint32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ -// static const uint32_t JERR_RMGR_BADRECTYPE; ///< Attempted op on incorrect rec type + static const uint32_t JERR_RCVM_NULLXID; ///< Null XID when XID length non-null in header // class data_tok - static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state -// static const uint32_t JERR_DTOK_RIDNOTSET; ///< Record ID not set + static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state +// static const uint32_t JERR_DTOK_RIDNOTSET; ///< Record ID not set // class enq_map, txn_map - static const uint32_t JERR_MAP_DUPLICATE; ///< Attempted to insert using duplicate key - static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map - static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn + static const uint32_t JERR_MAP_DUPLICATE; ///< Attempted to insert using duplicate key + static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map + static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn // EFP errors static const uint32_t JERR_EFP_BADPARTITIONNAME; ///< Partition name invalid or of value 0 - static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid - static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory - static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size - static const uint32_t JERR_EFP_EMPTY; ///< EFP empty + static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid + static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory + static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size + static const uint32_t JERR_EFP_EMPTY; ///< EFP empty // Negative returns for some functions static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp index d1972ea1f2..9c32f4128b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp @@ -39,14 +39,19 @@ int16_t txn_map::TMAP_OK = 0; int16_t txn_map::TMAP_NOT_SYNCED = 0; int16_t txn_map::TMAP_SYNCED = 1; -txn_data_struct::txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid, - const bool enq_flag, const bool commit_flag): - _rid(rid), - _drid(drid), - _pfid(pfid), - _enq_flag(enq_flag), - _commit_flag(commit_flag), - _aio_compl(false) +txn_data_t::txn_data_t(const uint64_t rid, + const uint64_t drid, + const uint16_t pfid, + const uint64_t foffs, + const bool enq_flag, + const bool commit_flag): + rid_(rid), + drid_(drid), + pfid_(pfid), + foffs_(foffs), + enq_flag_(enq_flag), + commit_flag_(commit_flag), + aio_compl_(false) {} txn_map::txn_map(): @@ -56,24 +61,8 @@ txn_map::txn_map(): txn_map::~txn_map() {} -/* -void -txn_map::set_num_jfiles(const uint16_t num_jfiles) -{ - _pfid_txn_cnt.resize(num_jfiles, 0); -} -*/ - -/* -uint32_t -txn_map::get_txn_pfid_cnt(const uint16_t pfid) const -{ - return _pfid_txn_cnt.at(pfid); -} -*/ - bool -txn_map::insert_txn_data(const std::string& xid, const txn_data& td) +txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td) { bool ok = true; slock s(_mutex); @@ -88,7 +77,6 @@ txn_map::insert_txn_data(const std::string& xid, const txn_data& td) } else itr->second.push_back(td); -// _pfid_txn_cnt.at(td._pfid)++; return ok; } @@ -117,8 +105,6 @@ txn_map::get_remove_tdata_list(const std::string& xid) return _empty_data_list; txn_data_list list = itr->second; _map.erase(itr); -// for (tdl_itr i=list.begin(); i!=list.end(); i++) -// _pfid_txn_cnt.at(i->_pfid)--; return list; } @@ -151,7 +137,7 @@ txn_map::cnt(const bool enq_flag) { for (tdl_itr j = i->second.begin(); j < i->second.end(); j++) { - if (j->_enq_flag == enq_flag) + if (j->enq_flag_ == enq_flag) c++; } } @@ -168,7 +154,7 @@ txn_map::is_txn_synced(const std::string& xid) bool is_synced = true; for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++) { - if (!litr->_aio_compl) + if (!litr->aio_compl_) { is_synced = false; break; @@ -186,9 +172,9 @@ txn_map::set_aio_compl(const std::string& xid, const uint64_t rid) return TMAP_XID_NOT_FOUND; for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++) { - if (litr->_rid == rid) + if (litr->rid_ == rid) { - litr->_aio_compl = true; + litr->aio_compl_ = true; return TMAP_OK; // rid found } } @@ -206,7 +192,7 @@ txn_map::data_exists(const std::string& xid, const uint64_t rid) tdl_itr itr = tdl.begin(); while (itr != tdl.end() && !found) { - found = itr->_rid == rid; + found = itr->rid_ == rid; itr++; } } @@ -224,10 +210,10 @@ txn_map::is_enq(const uint64_t rid) txn_data_list list = i->second; for (tdl_itr j = list.begin(); j < list.end() && !found; j++) { - if (j->_enq_flag) - found = j->_rid == rid; + if (j->enq_flag_) + found = j->rid_ == rid; else - found = j->_drid == rid; + found = j->drid_ == rid; } } } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h index b69d17da1c..146997abe0 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h @@ -45,19 +45,23 @@ namespace qls_jrnl * \brief Struct encapsulating transaction data necessary for processing a transaction * in the journal once it is closed with either a commit or abort. */ - struct txn_data_struct + typedef struct txn_data_t { - uint64_t _rid; ///< Record id for this operation - uint64_t _drid; ///< Dequeue record id for this operation - uint16_t _pfid; ///< Physical file id, to be used when transferring to emap on commit - bool _enq_flag; ///< If true, enq op, otherwise deq op - bool _commit_flag; ///< (2PC transactions) Records 2PC complete c/a mode - bool _aio_compl; ///< Initially false, set to true when record AIO returns - txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid, - const bool enq_flag, const bool commit_flag = false); - }; - typedef txn_data_struct txn_data; - typedef std::vector<txn_data> txn_data_list; + uint64_t rid_; ///< Record id for this operation + uint64_t drid_; ///< Dequeue record id for this operation + uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit + uint64_t foffs_; ///< Offset in file for this record + bool enq_flag_; ///< If true, enq op, otherwise deq op + bool commit_flag_; ///< (2PC transactions) Records 2PC complete c/a mode + bool aio_compl_; ///< Initially false, set to true when record AIO returns + txn_data_t(const uint64_t rid, + const uint64_t drid, + const uint16_t pfid, + const uint64_t foffs, + const bool enq_flag, + const bool commit_flag = false); + } txn_data_t; + typedef std::vector<txn_data_t> txn_data_list; typedef txn_data_list::iterator tdl_itr; /** @@ -112,16 +116,13 @@ namespace qls_jrnl xmap _map; smutex _mutex; -// std::vector<uint32_t> _pfid_txn_cnt; const txn_data_list _empty_data_list; public: txn_map(); virtual ~txn_map(); -// void set_num_jfiles(const uint16_t num_jfiles); -// uint32_t get_txn_pfid_cnt(const uint16_t pfid) const; - bool insert_txn_data(const std::string& xid, const txn_data& td); + bool insert_txn_data(const std::string& xid, const txn_data_t& td); const txn_data_list get_tdata_list(const std::string& xid); const txn_data_list get_remove_tdata_list(const std::string& xid); bool in_map(const std::string& xid); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c index 7e53be5e18..dcc24bac8a 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c @@ -28,7 +28,7 @@ void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t vers dest->_fhdr_size_sblks = fhdr_size_sblks; dest->_efp_partition = efp_partition; dest->_reserved = 0; - dest->_file_size_kib = file_size; + dest->_data_size_kib = file_size; dest->_fro = 0; dest->_ts_nsec = 0; dest->_ts_sec = 0; @@ -58,7 +58,7 @@ void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) { rec_hdr_copy(&dest->_rhdr, &src->_rhdr); dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied? dest->_efp_partition = src->_efp_partition; // Should this be copied? - dest->_file_size_kib = src->_file_size_kib; + dest->_data_size_kib = src->_data_size_kib; dest->_fro = src->_fro; dest->_ts_sec = src->_ts_sec; dest->_ts_nsec = src->_ts_nsec; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h index efdd97b624..6a53c631d5 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h @@ -79,7 +79,7 @@ typedef struct file_hdr_t { uint16_t _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */ uint16_t _efp_partition; /**< EFP Partition number from which this file was obtained */ uint32_t _reserved; - uint64_t _file_size_kib; /**< Size of this file in KiB, excluding header sblk */ + uint64_t _data_size_kib; /**< Size of the data part of this file in KiB. (ie file size excluding file header sblk) */ uint64_t _fro; /**< First Record Offset (FRO) */ uint64_t _ts_sec; /**< Time stamp (seconds part) */ uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index a4ed6cbf19..991e26a3b9 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -33,7 +33,7 @@ #include <sstream> #include <stdint.h> -#include <iostream> // DEBUG +//#include <iostream> // DEBUG namespace qpid { @@ -99,7 +99,7 @@ wmgr::initialize(aio_callback* const cbp, if (eo) { const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS; - uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr + uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); // exclude file header _pg_cntr = data_dblks / wr_pg_size_dblks; _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); } @@ -115,6 +115,7 @@ wmgr::enqueue(const void* const data_buff, const bool transient, const bool external) { +//std::cout << _lfc.status(10) << std::endl; if (xid_len) assert(xid_ptr != 0); @@ -160,7 +161,7 @@ wmgr::enqueue(const void* const data_buff, dtokp->clear_xid(); _enq_busy = true; } -//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG +//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " po=0x" << _pg_offset_dblks << " cs=0x" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << " " << std::dec << std::flush; // DEBUG bool done = false; while (!done) { @@ -193,12 +194,12 @@ wmgr::enqueue(const void* const data_buff, // message. AIO callbacks will then only process this token when entire message is // enqueued. _lfc.incrEnqueuedRecordCount(dtokp->fid()); -//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG +//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(dtokp->fid()) << std::dec << std::flush; // DEBUG if (xid_len) // If part of transaction, add to transaction map { std::string xid((const char*)xid_ptr, xid_len); - _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true)); + _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true)); } else { @@ -213,7 +214,7 @@ wmgr::enqueue(const void* const data_buff, done = true; } else { -//std::cout << "$" << std::endl << std::flush; // DEBUG +//std::cout << "$" << std::flush; // DEBUG dtokp->set_wstate(data_tok::ENQ_PART); } @@ -313,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp, // If the enqueue is part of a pending txn, it will not yet be in emap _emap.lock(dequeue_rid); // ignore rid not found error std::string xid((const char*)xid_ptr, xid_len); - _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false)); + _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false)); } else { @@ -427,10 +428,10 @@ wmgr::abort(data_tok* dtokp, txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (!itr->_enq_flag) - _emap.unlock(itr->_drid); // ignore rid not found error - if (itr->_enq_flag) - _lfc.decrEnqueuedRecordCount(itr->_pfid); + if (!itr->enq_flag_) + _emap.unlock(itr->drid_); // ignore rid not found error + if (itr->enq_flag_) + _lfc.decrEnqueuedRecordCount(itr->pfid_); } std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); if (!res.second) @@ -441,9 +442,9 @@ wmgr::abort(data_tok* dtokp, } done = true; - } - else + } else { dtokp->set_wstate(data_tok::ABORT_PART); + } file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); flush_check(res, cont, done, rid); @@ -525,20 +526,20 @@ wmgr::commit(data_tok* dtokp, txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (itr->_enq_flag) // txn enqueue + if (itr->enq_flag_) // txn enqueue { - if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail + if (_emap.insert_pfid(itr->rid_, itr->pfid_, 0) < enq_map::EMAP_OK) // fail { // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; - oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid; + oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_; throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit"); } } else // txn dequeue { uint64_t fid; - short eres = _emap.get_remove_pfid(itr->_drid, fid, true); + short eres = _emap.get_remove_pfid(itr->drid_, fid, true); if (eres < enq_map::EMAP_OK) // fail { if (eres == enq_map::EMAP_RID_NOT_FOUND) @@ -566,9 +567,9 @@ wmgr::commit(data_tok* dtokp, } done = true; - } - else + } else { dtokp->set_wstate(data_tok::COMMIT_PART); + } file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); flush_check(res, cont, done, rid); @@ -585,6 +586,7 @@ wmgr::file_header_check(const uint64_t rid, { if (_lfc.isEmpty()) // File never written (i.e. no header or data) { +//std::cout << "e" << std::flush; std::size_t fro = 0; if (cont) { bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file @@ -608,6 +610,7 @@ wmgr::flush_check(iores& res, // Is page is full, flush if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) { +//std::cout << "^" << _pg_offset_dblks << ">=" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << std::flush; res = write_flush(); assert(res == RHM_IORES_SUCCESS); @@ -621,6 +624,7 @@ wmgr::flush_check(iores& res, uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks; if (_pg_cntr >= fileSize_pgs) { +//std::cout << _pg_cntr << ">=" << fileSize_pgs << std::flush; get_next_file(); if (!done) { cont = true; @@ -649,7 +653,7 @@ wmgr::write_flush() if (_cached_offset_dblks) { if (_page_cb_arr[_pg_index]._state == AIO_PENDING) { -//std::cout << "#"; // DEBUG +//std::cout << "#" << std::flush; // DEBUG res = RHM_IORES_PAGE_AIOWAIT; } else { if (_page_cb_arr[_pg_index]._state != IN_USE) @@ -688,7 +692,7 @@ void wmgr::get_next_file() { _pg_cntr = 0; -//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG +//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG _lfc.pullEmptyFileFromEfp(); } @@ -972,6 +976,7 @@ wmgr::dblk_roundup() uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS; while (_cached_offset_dblks < wdblks) { +//std::cout << "^0x" << std::hex << _cached_offset_dblks << "<0x" << wdblks << std::dec << std::flush; void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic)); #ifdef QLS_CLEAN |