From c75685031b2d46bb522e55da24ebaf37bc3b369c Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Thu, 21 May 2015 12:48:30 +0000 Subject: QPID-6551: [C++ broker]: linearstore raising JERR_LFCR_SEQNUMNOTFOUND after sending many DTX transactions git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1680861 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/linearstore/journal/LinearFileController.cpp | 16 ++++++++-------- .../src/qpid/linearstore/journal/LinearFileController.h | 14 +++++++------- .../cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | 12 ++++++------ qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp | 4 ++-- qpid/cpp/src/qpid/linearstore/journal/txn_map.h | 4 ++-- qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp | 8 ++++---- 6 files changed, 29 insertions(+), 29 deletions(-) diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp index e6e07dc8e2..08d565ca2e 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp @@ -114,15 +114,15 @@ void LinearFileController::purgeEmptyFilesToEfp() { } } -uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { +uint32_t LinearFileController::getEnqueuedRecordCount(const uint64_t fileSeqNumber) { return find(fileSeqNumber)->getEnqueuedRecordCount(); } -uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { +uint32_t LinearFileController::incrEnqueuedRecordCount(const uint64_t fileSeqNumber) { return find(fileSeqNumber)->incrEnqueuedRecordCount(); } -uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { +uint32_t LinearFileController::decrEnqueuedRecordCount(const uint64_t fileSeqNumber) { uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount(); // TODO: Re-evaluate after testing and profiling @@ -136,11 +136,11 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file return r; } -uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) { +uint32_t LinearFileController::addWriteCompletedDblkCount(const uint64_t fileSeqNumber, const uint32_t a) { return find(fileSeqNumber)->addCompletedDblkCount(a); } -uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) { +uint16_t LinearFileController::decrOutstandingAioOperationCount(const uint64_t fileSeqNumber) { return find(fileSeqNumber)->decrOutstandingAioOperationCount(); } @@ -199,9 +199,9 @@ const std::string LinearFileController::status(const uint8_t indentDepth) const void LinearFileController::addJournalFile(const std::string& fileName, const efpIdentity_t& efpIdentity, - const uint64_t fileNumber, + const uint64_t fileSeqNumber, const uint32_t completedDblkCount) { - JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id()); + JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileSeqNumber, jcntlRef_.id()); addJournalFile(jfp, completedDblkCount, true); } @@ -215,7 +215,7 @@ bool LinearFileController::checkCurrentJournalFileValid() const { return currentJournalFilePtr_ != 0; } -JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) { +JournalFile* LinearFileController::find(const uint64_t fileSeqNumber) { if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) return currentJournalFilePtr_; diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h index 05f08144b9..3cdfb72a37 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h +++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h @@ -74,12 +74,12 @@ public: void purgeEmptyFilesToEfp(); // Functions for manipulating counts of non-current JournalFile instances in journalFileList_ - uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber); - uint32_t incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber); - uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber); - uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, + uint32_t getEnqueuedRecordCount(const uint64_t fileSeqNumber); + uint32_t incrEnqueuedRecordCount(const uint64_t fileSeqNumber); + uint32_t decrEnqueuedRecordCount(const uint64_t fileSeqNumber); + uint32_t addWriteCompletedDblkCount(const uint64_t fileSeqNumber, const uint32_t a); - uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber); + uint16_t decrOutstandingAioOperationCount(const uint64_t fileSeqNumber); // Pass-through functions for current JournalFile class void asyncFileHeaderWrite(io_context_t ioContextPtr, @@ -101,11 +101,11 @@ public: protected: void addJournalFile(const std::string& fileName, const efpIdentity_t& efpIdentity, - const uint64_t fileNumber, + const uint64_t fileSeqNumber, const uint32_t completedDblkCount); void assertCurrentJournalFileValid(const char* const functionName) const; bool checkCurrentJournalFileValid() const; - JournalFile* find(const efpFileCount_t fileSeqNumber); + JournalFile* find(const uint64_t fileSeqNumber); uint64_t getNextFileSeqNum(); void pullEmptyFileFromEfp(); }; diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 73a16f01b7..254566e824 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -148,7 +148,7 @@ void RecoveryManager::analyzeJournals(const std::vector* preparedTr // Unlock any affected enqueues in emap for (tdl_itr_t i=tdl.begin(); ienq_flag_) { // enq op - decrement enqueue count - fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); + fileNumberMap_[i->fid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND @@ -738,7 +738,7 @@ bool RecoveryManager::getNextRecordHeader() txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) { - fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); + fileNumberMap_[itr->fid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else { enqueueMapRef_.unlock(itr->drid_); // ignore not found error } @@ -765,11 +765,11 @@ bool RecoveryManager::getNextRecordHeader() txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) { // txn enqueue -//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG - if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail +//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->fid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG + if (enqueueMapRef_.insert_pfid(itr->rid_, itr->fid_, itr->foffs_) < enq_map::EMAP_OK) { // fail // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; - oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_; + oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_; throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader"); } } else { // txn dequeue @@ -854,7 +854,7 @@ void RecoveryManager::prepareRecordList() { qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j); for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) { if (k->enq_flag_) { - recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true)); + recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->fid_, k->foffs_, true)); } } } diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp index 3ada3d2504..8336d36b80 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp @@ -36,14 +36,14 @@ int16_t txn_map::TMAP_SYNCED = 1; txn_data_t::txn_data_t(const uint64_t rid, const uint64_t drid, - const uint16_t pfid, + const uint64_t fid, const uint64_t foffs, const bool enq_flag, const bool tpc_flag, const bool commit_flag): rid_(rid), drid_(drid), - pfid_(pfid), + fid_(fid), foffs_(foffs), enq_flag_(enq_flag), tpc_flag_(tpc_flag), diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h index 996d54bdac..e79c0522d8 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h @@ -39,7 +39,7 @@ namespace journal { { uint64_t rid_; ///< Record id for this operation uint64_t drid_; ///< Dequeue record id for this operation - uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit + uint64_t fid_; ///< File seq number, to be used when transferring to emap on commit uint64_t foffs_; ///< Offset in file for this record bool enq_flag_; ///< If true, enq op, otherwise deq op bool tpc_flag_; ///< 2PC transaction if true @@ -47,7 +47,7 @@ namespace journal { bool aio_compl_; ///< Initially false, set to true when record AIO returns txn_data_t(const uint64_t rid, const uint64_t drid, - const uint16_t pfid, + const uint64_t fid, const uint64_t foffs, const bool enq_flag, const bool tpc_flag, diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index 7dc8f1d416..1ff18da663 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -307,7 +307,7 @@ wmgr::dequeue(data_tok* dtokp, for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) { if (i->rid_ == dtokp->dequeue_rid()) { found = true; - dtokp->set_fid(i->pfid_); + dtokp->set_fid(i->fid_); break; } } @@ -451,7 +451,7 @@ wmgr::abort(data_tok* dtokp, if (!itr->enq_flag_) _emap.unlock(itr->drid_); // ignore rid not found error if (itr->enq_flag_) { - fidl.push_back(itr->pfid_); + fidl.push_back(itr->fid_); } } std::pair res = _txn_pending_map.insert(std::pair(xid, fidl)); @@ -551,11 +551,11 @@ wmgr::commit(data_tok* dtokp, { if (itr->enq_flag_) // txn enqueue { - if (_emap.insert_pfid(itr->rid_, itr->pfid_, 0) < enq_map::EMAP_OK) // fail + if (_emap.insert_pfid(itr->rid_, itr->fid_, 0) < enq_map::EMAP_OK) // fail { // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; - oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_; + oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_; throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit"); } } -- cgit v1.2.1