diff options
22 files changed, 371 insertions, 247 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 8c5b08bb61..3aec281858 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -34,29 +34,36 @@ Current/pending: ** Basic performance tests 5362 - Linearstore: No store tools exist for examining the journals svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. + svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze + svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze + svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze * Store analysis and status * Recovery/reading of message content * Empty file pool status and management 5464 - [linearstore] Incompletely created journal files accumulate in EFP - 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message - * Probablilty: 2 of 600 (0.3%) using tx-test-soak.sh - 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message +# 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message + * Probability: 2 of 600 (0.3%) using tx-test-soak.sh +# 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message * Probability: 6 of 600 (1.0%) using tx-test-soak.sh * If broker is started a second time after failure, it starts correctly and test completes ok. + * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally). + * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary 5484 1035843 Slow performance for producers svn r.1558592 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers. - - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000 + svn r.1558913 replaces use of /dev/urandom with several calls to rand() to construct a 64-bit random number. + * Recommend rebuilding and testing for performance again with these two fixes. +# - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000 UNABLE TO REPRODUCE - but Frantizek has additional info - 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL * Possible dup of 1039525 - * May be fixed by QPID-5483 - waiting for needinfo + * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing - 1039525 Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL * Possible dup of 1039522 - * May be fixed by QPID-5483 - waiting for needinfo - 5487 - [linearstore] Replace use of /dev/urandom with c random generator calls + * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing +# - 1049870 [LinearStore] auto-delete property does not survive restart -Fixed/closed: -============= +Fixed/closed (in commit order): +=============================== Q-JIRA RHBZ Description / Comments ------ ------- ---------------------- 5357 1052518 Linearstore: Empty file recycling not functional @@ -85,6 +92,8 @@ NO-JIRA - Added missing Apache copyright/license text svn r.1558589 2014-01-15: Proposed fix * May be linked to RHBZ 1039522 - waiting for needinfo * May be linked to RHBZ 1039525 - waiting for needinfo + 5487 1054448 [linearstore] Replace use of /dev/urandom with c random generator calls + svn r.1558913 2014-01-16: Proposed fix Future: ======= @@ -101,3 +110,4 @@ Code tidy-up * Member names: xxx_ * Rename classes, functions and variables to camel-case * Add Doxygen docs to classes +* Make fid's consistent in name (fid, file_id, pfid) and format (hex vs decimal) diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 483b494c2c..ff5b41b962 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -593,7 +593,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) std::ostringstream oss; oss << "Recovered transaction prepared list:"; for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { - oss << std::endl << " " << str2hexnum(i->xid); + oss << std::endl << " " << qpid::linearstore::journal::jcntl::str2hexnum(i->xid); } QLS_LOG(debug, oss.str()); @@ -1292,7 +1292,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_, mgmtObject->inc_tplTxnAborts(); } } catch (const std::exception& e) { - QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what()); + QLS_LOG(error, "Error completing xid " << qpid::linearstore::journal::jcntl::str2hexnum(txn_.getXid()) << ": " << e.what()); throw; } } @@ -1516,15 +1516,6 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) { journalList.erase(j_.id()); } -std::string MessageStoreImpl::str2hexnum(const std::string& str) { - std::ostringstream oss; - oss << "(" << str.size() << ")0x" << std::hex; - for (unsigned i=str.size(); i>0; --i) { - oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1]; - } - return oss.str(); -} - MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : qpid::Options(name_), truncateFlag(defTruncateFlag), diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 3157b9be9d..c2eb0deab0 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -235,8 +235,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem } void chkTplStoreInit(); - static std::string str2hexnum(const std::string& str); - public: typedef boost::shared_ptr<MessageStoreImpl> shared_ptr; diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp index 743d12989a..df2e7a442d 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -52,7 +52,9 @@ void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) { jc->txn_abort(dtokp.get(), getXid()); } } catch (const qpid::linearstore::journal::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Error commit") + e.what()); + std::ostringstream oss; + oss << "Error during " << (commit ? "commit" : "abort") << ": " << e.what(); + THROW_STORE_EXCEPTION(oss.str()); } } } diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp index 1b2025bd5a..fc6ced4fd2 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp @@ -27,18 +27,18 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include <unistd.h> -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { JournalFile::JournalFile(const std::string& fqFileName, const efpIdentity_t& efpIdentity, - const uint64_t fileSeqNum) : + const uint64_t fileSeqNum, + const std::string queueName) : efpIdentity_(efpIdentity), fqFileName_(fqFileName), fileSeqNum_(fileSeqNum), + queueName_(queueName), serial_(getRandom64()), firstRecordOffset_(0ULL), fileHandle_(-1), @@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::string& fqFileName, {} JournalFile::JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader) : + const ::file_hdr_t& fileHeader, + const std::string queueName) : efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib), fqFileName_(fqFileName), fileSeqNum_(fileHeader._file_number), + queueName_(queueName), serial_(fileHeader._rhdr._serial), firstRecordOffset_(fileHeader._fro), fileHandle_(-1), @@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -78,18 +82,21 @@ JournalFile::~JournalFile() { void JournalFile::initialize(const uint32_t completedDblkCount) { - if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) - { - std::ostringstream oss; - oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); - oss << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + if (!initializedFlag_) { + if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) + { + std::ostringstream oss; + oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + } + fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; + aioControlBlockPtr_ = new aio_cb; + initializedFlag_ = true; } - fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; - aioControlBlockPtr_ = new aio_cb; if (completedDblkCount > 0UL) { - submittedDblkCount_.add(completedDblkCount); - completedDblkCount_.add(completedDblkCount); + submittedDblkCount_.set(completedDblkCount); + completedDblkCount_.set(completedDblkCount); } } @@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, const efpDataSize_kib_t efpDataSize_kib, const uint16_t userFlags, const uint64_t recordId, - const uint64_t firstRecordOffset, - const std::string queueName) { + const uint64_t firstRecordOffset) { firstRecordOffset_ = firstRecordOffset; ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib); ::file_hdr_init(fileHeaderBasePtr_, @@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, recordId, firstRecordOffset, fileSeqNum_, - queueName.size(), - queueName.data()); - aio::prep_pwrite(aioControlBlockPtr_, - fileHandle_, - (void*)fileHeaderBasePtr_, - QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024, - 0UL); - if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); + queueName_.size(), + queueName_.data()); + const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024; + aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL); + if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) { + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0"; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite"); + } addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); incrOutstandingAioOperationCount(); } @@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_context_t ioContextPtr, aio_cb* aioControlBlockPtr, void* data, uint32_t dataSize_dblks) { - aio::prep_pwrite_2(aioControlBlockPtr, - fileHandle_, - data, - dataSize_dblks * QLS_DBLK_SIZE_BYTES, - submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES); + const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES; + const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES; + aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs); pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb) pcbp->_wdblks = dataSize_dblks; pcbp->_jfp = this; if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) { - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite"); } addSubmittedDblkCount(dataSize_dblks); incrOutstandingAioOperationCount(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h index f0ad432fd8..e33830ef7f 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h +++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h @@ -38,6 +38,7 @@ protected: const efpIdentity_t efpIdentity_; const std::string fqFileName_; const uint64_t fileSeqNum_; + const std::string queueName_; const uint64_t serial_; uint64_t firstRecordOffset_; int fileHandle_; @@ -46,6 +47,7 @@ protected: ::file_hdr_t* fileHeaderPtr_; aio_cb* aioControlBlockPtr_; uint32_t fileSize_dblks_; ///< File size in data blocks, including file header + bool initializedFlag_; AtomicCounter<uint32_t> enqueuedRecordCount_; ///< Count of enqueued records AtomicCounter<uint32_t> submittedDblkCount_; ///< Write file count (data blocks) for submitted AIO @@ -56,10 +58,12 @@ public: // Constructor for creating new file with known fileSeqNum and random serial JournalFile(const std::string& fqFileName, const efpIdentity_t& efpIdentity, - const uint64_t fileSeqNum); + const uint64_t fileSeqNum, + const std::string queueName); // Constructor for recovery in which fileSeqNum and serial are recovered from fileHeader param JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader); + const ::file_hdr_t& fileHeader, + const std::string queueName); virtual ~JournalFile(); void initialize(const uint32_t completedDblkCount); @@ -76,13 +80,13 @@ public: const efpDataSize_kib_t efpDataSize_kib, const uint16_t userFlags, const uint64_t recordId, - const uint64_t firstRecordOffset, - const std::string queueName); + const uint64_t firstRecordOffset); void asyncPageWrite(io_context_t ioContextPtr, aio_cb* aioControlBlockPtr, void* data, uint32_t dataSize_dblks); + uint32_t getSubmittedDblkCount() const; uint32_t getEnqueuedRecordCount() const; uint32_t incrEnqueuedRecordCount(); uint32_t decrEnqueuedRecordCount(); @@ -109,7 +113,6 @@ protected: static uint64_t getRandom64(); bool isOpen() const; - uint32_t getSubmittedDblkCount() const; uint32_t addSubmittedDblkCount(const uint32_t a); uint32_t getCompletedDblkCount() const; diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp index 5483f3bb94..86d1b0e93c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp @@ -25,8 +25,6 @@ #include "qpid/linearstore/journal/jcntl.h" #include "qpid/linearstore/journal/JournalFile.h" -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { @@ -34,10 +32,10 @@ namespace journal { LinearFileController::LinearFileController(jcntl& jcntlRef) : jcntlRef_(jcntlRef), emptyFilePoolPtr_(0), - currentJournalFilePtr_(0), fileSeqCounter_("LinearFileController::fileSeqCounter", 0), recordIdCounter_("LinearFileController::recordIdCounter", 0), - decrCounter_("LinearFileController::decrCounter", 0) + decrCounter_("LinearFileController::decrCounter", 0), + currentJournalFilePtr_(0) {} LinearFileController::~LinearFileController() {} @@ -53,7 +51,7 @@ void LinearFileController::initialize(const std::string& journalDirectory, void LinearFileController::finalize() { if (currentJournalFilePtr_) { currentJournalFilePtr_->close(); - currentJournalFilePtr_ = NULL; + currentJournalFilePtr_ = 0; } while (!journalFileList_.empty()) { delete journalFileList_.front(); @@ -62,17 +60,21 @@ void LinearFileController::finalize() { } void LinearFileController::addJournalFile(JournalFile* journalFilePtr, - const uint32_t completedDblkCount) { - if (currentJournalFilePtr_) { + const uint32_t completedDblkCount, + const bool makeCurrentFlag) { + if (makeCurrentFlag && currentJournalFilePtr_) { currentJournalFilePtr_->close(); + currentJournalFilePtr_ = 0; } journalFilePtr->initialize(completedDblkCount); - currentJournalFilePtr_ = journalFilePtr; { slock l(journalFileListMutex_); - journalFileList_.push_back(currentJournalFilePtr_); + journalFileList_.push_back(journalFilePtr); + } + if (makeCurrentFlag) { + currentJournalFilePtr_ = journalFilePtr; + currentJournalFilePtr_->open(); } - currentJournalFilePtr_->open(); } efpDataSize_sblks_t LinearFileController::dataSize_sblks() const { @@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController::fileSize_sblks() const { return emptyFilePoolPtr_->fileSize_sblks(); } +void LinearFileController::getNextJournalFile() { + if (currentJournalFilePtr_) + currentJournalFilePtr_->close(); + pullEmptyFileFromEfp(); +} + uint64_t LinearFileController::getNextRecordId() { return recordIdCounter_.increment(); } -void LinearFileController::pullEmptyFileFromEfp() { - if (currentJournalFilePtr_) - currentJournalFilePtr_->close(); - std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name -//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG - addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); +void LinearFileController::removeFileToEfp(const std::string& fileName) { + if (emptyFilePoolPtr_) { + emptyFilePoolPtr_->returnEmptyFile(fileName); + } } void LinearFileController::restoreEmptyFile(const std::string& fileName) { @@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) { void LinearFileController::purgeEmptyFilesToEfp() { slock l(journalFileListMutex_); - purgeEmptyFilesToEfpNoLock(); + while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records + emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); + delete journalFileList_.front(); + journalFileList_.pop_front(); + } } uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { @@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t file } uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount(); // TODO: Re-evaluate after testing and profiling @@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file // records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data // records) without impacting performance or performing badly (leaving excessive empty files in the journals). if (decrCounter_.increment() % 100ULL == 0ULL) { - purgeEmptyFilesToEfpNoLock(); + purgeEmptyFilesToEfp(); } return r; } uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->addCompletedDblkCount(a); } uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->decrOutstandingAioOperationCount(); } @@ -142,12 +149,11 @@ void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr, const uint64_t recordId, const uint64_t firstRecordOffset) { currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr, - emptyFilePoolPtr_->getPartitionNumber(), - emptyFilePoolPtr_->dataSize_kib(), - userFlags, - recordId, - firstRecordOffset, - jcntlRef_.id()); + emptyFilePoolPtr_->getPartitionNumber(), + emptyFilePoolPtr_->dataSize_kib(), + userFlags, + recordId, + firstRecordOffset); } void LinearFileController::asyncPageWrite(io_context_t ioContextPtr, @@ -195,8 +201,8 @@ void LinearFileController::addJournalFile(const std::string& fileName, const efpIdentity_t& efpIdentity, const uint64_t fileNumber, const uint32_t completedDblkCount) { - JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber); - addJournalFile(jfp, completedDblkCount); + JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id()); + addJournalFile(jfp, completedDblkCount, true); } void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const { @@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJournalFileValid() const { return currentJournalFilePtr_ != 0; } -// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) { - if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) + if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) return currentJournalFilePtr_; + + slock l(journalFileListMutex_); for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) { if ((*i)->getFileSeqNum() == fileSeqNumber) { return *i; } } + std::ostringstream oss; oss << "fileSeqNumber=" << fileSeqNumber; throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find"); @@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFileSeqNum() { return fileSeqCounter_.increment(); } -void LinearFileController::purgeEmptyFilesToEfpNoLock() { -//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG - while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && - journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records -//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG - emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); - delete journalFileList_.front(); - journalFileList_.pop_front(); - } +void LinearFileController::pullEmptyFileFromEfp() { + std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name + addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); } }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h index 933b9792a4..05f08144b9 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h +++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h @@ -44,12 +44,12 @@ protected: jcntl& jcntlRef_; std::string journalDirectory_; EmptyFilePool* emptyFilePoolPtr_; - JournalFile* currentJournalFilePtr_; AtomicCounter<uint64_t> fileSeqCounter_; AtomicCounter<uint64_t> recordIdCounter_; AtomicCounter<uint64_t> decrCounter_; JournalFileList_t journalFileList_; + JournalFile* currentJournalFilePtr_; smutex journalFileListMutex_; public: @@ -62,12 +62,14 @@ public: void finalize(); void addJournalFile(JournalFile* journalFilePtr, - const uint32_t completedDblkCount); + const uint32_t completedDblkCount, + const bool makeCurrentFlag); efpDataSize_sblks_t dataSize_sblks() const; efpFileSize_sblks_t fileSize_sblks() const; + void getNextJournalFile(); uint64_t getNextRecordId(); - void pullEmptyFileFromEfp(); + void removeFileToEfp(const std::string& fileName); void restoreEmptyFile(const std::string& fileName); void purgeEmptyFilesToEfp(); @@ -105,11 +107,12 @@ protected: bool checkCurrentJournalFileValid() const; JournalFile* find(const efpFileCount_t fileSeqNumber); uint64_t getNextFileSeqNum(); - void purgeEmptyFilesToEfpNoLock(); + void pullEmptyFileFromEfp(); }; typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr, - const uint32_t completedDblkCount); + const uint32_t completedDblkCount, + const bool makeCurrentFlag); }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 72308cc929..a1cec53ca1 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -56,11 +56,15 @@ RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t pendingTransaction_(ptxn) {} - bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) { return a.recordId_ < b.recordId_; } +RecoveredFileData_t::RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount) : + journalFilePtr_(journalFilePtr), + completedDblkCount_(completedDblkCount) +{} + RecoveryManager::RecoveryManager(const std::string& journalDirectory, const std::string& queuename, enq_map& enqueueMapRef, @@ -77,11 +81,17 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory, highestRecordId_(0ULL), highestFileNumber_(0ULL), lastFileFullFlag_(false), + initial_fid_(0), currentSerial_(0), efpFileSize_kib_(0) {} -RecoveryManager::~RecoveryManager() {} +RecoveryManager::~RecoveryManager() { + for (fileNumberMapItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { + delete i->second; + } + fileNumberMap_.clear(); +} void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr, EmptyFilePoolManager* emptyFilePoolManager, @@ -92,9 +102,6 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib(); - // Check for file full condition - lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; - if (!journalEmptyFlag_) { // Read all records, establish remaining enqueued records @@ -106,6 +113,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr inFileStream_.close(); } + // Check for file full condition + lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; + // Remove leading files which have no enqueued records removeEmptyFiles(*emptyFilePoolPtrPtr); @@ -121,7 +131,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr // Unlock any affected enqueues in emap for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) { if (i->enq_flag_) { // enq op - decrement enqueue count - fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount(); + fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND @@ -174,7 +184,7 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, } } while (!foundRecord); - if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) { + if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) { if (!getFile(recordIdListConstItr_->fileId_, false)) { std::ostringstream oss; oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_; @@ -231,7 +241,6 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, ::rec_tail_t enqueueTail; inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t)); uint32_t cs = checksum.getChecksum(); -//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs); if (res != 0) { std::stringstream oss; @@ -266,17 +275,30 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { if (journalEmptyFlag_) { - if (uninitializedJournal_.size() > 0) { - lfcPtr->restoreEmptyFile(uninitializedJournal_); + if (uninitFileList_.size() > 0) { + std::string uninitFile = uninitFileList_.back(); + uninitFileList_.pop_back(); + lfcPtr->restoreEmptyFile(uninitFile); } } else { for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { - uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file? - endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset - efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full - (lfcPtr->*fnPtr)(i->second, fileDblkCount); + (lfcPtr->*fnPtr)(i->second->journalFilePtr_, i->second->completedDblkCount_, i->first == initial_fid_); } } + + std::ostringstream oss; + bool logFlag = !notNeededFilesList_.empty(); + if (logFlag) { + oss << "Files removed from head of journal: prior truncation during recovery:"; + } + while (!notNeededFilesList_.empty()) { + lfcPtr->removeFileToEfp(notNeededFilesList_.back()); + oss << std::endl << " * " << notNeededFilesList_.back(); + notNeededFilesList_.pop_back(); + } + if (logFlag) { + journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str()); + } } std::string RecoveryManager::toString(const std::string& jid) { @@ -285,7 +307,7 @@ std::string RecoveryManager::toString(const std::string& jid) { oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; oss << " Journal File List:" << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { - std::string fqFileName = k->second->getFqFileName(); + std::string fqFileName = k->second->journalFilePtr_->getFqFileName(); oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; } oss << " Enqueue Counts: [ "; @@ -293,7 +315,7 @@ std::string RecoveryManager::toString(const std::string& jid) { if (l != fileNumberMap_.begin()) { oss << ", "; } - oss << l->second->getEnqueuedRecordCount(); + oss << l->second->journalFilePtr_->getEnqueuedRecordCount(); } oss << " ]" << std::endl; oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; @@ -330,15 +352,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { << std::setw(10) << "--------" << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { - std::string fqFileName = k->second->getFqFileName(); + std::string fqFileName = k->second->journalFilePtr_->getFqFileName(); + std::ostringstream fid; + fid << std::hex << "0x" << k->first; std::ostringstream fro; - fro << std::hex << "0x" << k->second->getFirstRecordOffset(); - oss << indentStr << std::setw(7) << k->first + fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset(); + oss << indentStr << std::setw(7) << fid.str() << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1) << std::setw(16) << fro.str() - << std::setw(12) << k->second->getEnqueuedRecordCount() - << std::setw(5) << k->second->getEfpIdentity().pn_ - << std::setw(9) << k->second->getEfpIdentity().ds_ << "k" + << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount() + << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_ + << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k" << std::endl; } oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ << @@ -347,7 +371,7 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; - oss << indentStr << "Enqueued records (txn & non-txn):"; + //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report } return oss.str(); } @@ -357,27 +381,28 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { std::string headerQueueName; ::file_hdr_t fileHeader; - directoryList_t directoryList; + stringList_t directoryList; jdir::read_dir(journalDirectory_, directoryList, false, true, false, true); - for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { + for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { readJournalFileHeader(*i, fileHeader, headerQueueName); if (headerQueueName.empty()) { std::ostringstream oss; - if (uninitializedJournal_.empty()) { - oss << "Journal file " << (*i) << " is first uninitialized (not yet written) journal file."; - journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str()); - uninitializedJournal_ = *i; - } else { - oss << "Journal file " << (*i) << " is second or greater uninitialized journal file - ignoring"; - journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); - } + oss << "Journal file " << (*i) << " is uninitialized"; + journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); + uninitFileList_.push_back(*i); } else if (headerQueueName.compare(queueName_) != 0) { std::ostringstream oss; oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); } else { - JournalFile* jfp = new JournalFile(*i, fileHeader); - fileNumberMap_[fileHeader._file_number] = jfp; + JournalFile* jfp = new JournalFile(*i, fileHeader, queueName_); + std::pair<fileNumberMapItr_t, bool> res = fileNumberMap_.insert( + std::pair<uint64_t, RecoveredFileData_t*>(fileHeader._file_number, new RecoveredFileData_t(jfp, 0))); + if (!res.second) { + std::ostringstream oss; + oss << "Journal file " << (*i) << " has fid=0x" << std::hex << jfp->getFileSeqNum() << " which already exists for this journal."; + throw jexception(oss.str()); // TODO: complete this exception + } if (fileHeader._file_number > highestFileNumber_) { highestFileNumber_ = fileHeader._file_number; } @@ -393,7 +418,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { - currentJournalFileConstItr_ = fileNumberMap_.begin(); + currentJournalFileItr_ = fileNumberMap_.begin(); } } @@ -408,7 +433,7 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) { } } -void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) { +void RecoveryManager::checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition) { if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) { std::ostringstream oss; oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition; @@ -420,12 +445,13 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) if (sblkOffset) { std::ostringstream oss1; - oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber(); + oss1 << std::hex << "Bad record alignment found at fid=0x" << start_fid; oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec; oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required."; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str()); - std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary); + fileNumberMapConstItr_t fnmItr = fileNumberMap_.find(start_fid); + std::ofstream outFileStream(fnmItr->second->journalFilePtr_->getFqFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary); if (!outFileStream.good()) { throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment"); } @@ -447,7 +473,7 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment"); } std::ostringstream oss2; - oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber(); + oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << start_fid; oss2 << " offs=0x" << currentPosn; journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str()); currentPosn = outFileStream.tellp(); @@ -456,16 +482,15 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) std::free(writeBuffer); journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed."); } - endOffset_ = currentPosn; + lastRecord(start_fid, currentPosn); } bool RecoveryManager::decodeRecord(jrec& record, std::size_t& cumulativeSizeRead, ::rec_hdr_t& headerRecord, - std::streampos& fileOffset) + const uint64_t start_fid, + const std::streampos recordOffset) { - std::streampos start_file_offs = fileOffset; - if (highestRecordId_ == 0) { highestRecordId_ = headerRecord._rid; } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit @@ -475,7 +500,7 @@ bool RecoveryManager::decodeRecord(jrec& record, bool done = false; while (!done) { try { - done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead); + done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead, recordOffset); } catch (const jexception& e) { if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) { @@ -485,11 +510,12 @@ bool RecoveryManager::decodeRecord(jrec& record, } else { journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what()); } - checkJournalAlignment(start_file_offs); + checkJournalAlignment(start_fid, recordOffset); return false; } if (!done && needNextFile()) { if (!getNextFile(false)) { + checkJournalAlignment(start_fid, recordOffset); return false; } } @@ -498,11 +524,11 @@ bool RecoveryManager::decodeRecord(jrec& record, } std::string RecoveryManager::getCurrentFileName() const { - return currentJournalFileConstItr_->second->getFqFileName(); + return currentJournalFileItr_->second->journalFilePtr_->getFqFileName(); } uint64_t RecoveryManager::getCurrentFileNumber() const { - return currentJournalFileConstItr_->first; + return currentJournalFileItr_->first; } bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) { @@ -511,8 +537,8 @@ bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordO //std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ } - currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); - if (currentJournalFileConstItr_ == fileNumberMap_.end()) { + currentJournalFileItr_ = fileNumberMap_.find(fileNumber); + if (currentJournalFileItr_ == fileNumberMap_.end()) { return false; } inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); @@ -536,7 +562,8 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { if (inFileStream_.is_open()) { inFileStream_.close(); //std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG - if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { + currentJournalFileItr_->second->completedDblkCount_ = efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; + if (++currentJournalFileItr_ == fileNumberMap_.end()) { return false; } inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ @@ -562,13 +589,15 @@ bool RecoveryManager::getNextRecordHeader() rec_hdr_t h; bool hdr_ok = false; - std::streampos file_pos; + uint64_t file_id = 0; + std::streampos file_pos = 0; while (!hdr_ok) { if (needNextFile()) { if (!getNextFile(true)) { return false; } } + file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum(); file_pos = inFileStream_.tellg(); if (file_pos == std::streampos(-1)) { std::ostringstream oss; @@ -587,21 +616,21 @@ bool RecoveryManager::getNextRecordHeader() } } + uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary switch(h._magic) { case QLS_ENQ_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } enq_rec er; - uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary - if (!decodeRecord(er, cum_size_read, h, file_pos)) { + if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) { return false; } if (!er.is_transient()) { // Ignore transient msgs - fileNumberMap_[start_fid]->incrEnqueuedRecordCount(); + fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount(); if (er.xid_size()) { er.get_xid(&xidp); if (xidp == 0) { @@ -629,12 +658,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } deq_rec dr; - uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary - if (!decodeRecord(dr, cum_size_read, h, file_pos)) { + if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) { return false; } if (dr.xid_size()) { @@ -655,7 +683,7 @@ bool RecoveryManager::getNextRecordHeader() } else { uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error - fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); + fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount(); } } } @@ -664,11 +692,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } txn_rec ar; - if (!decodeRecord(ar, cum_size_read, h, file_pos)) { + if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) { return false; } // Delete this txn from tmap, unlock any locked records in emap @@ -680,7 +708,7 @@ bool RecoveryManager::getNextRecordHeader() txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) { - fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount(); + fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else { enqueueMapRef_.unlock(itr->drid_); // ignore not found error } @@ -691,11 +719,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } txn_rec cr; - if (!decodeRecord(cr, cum_size_read, h, file_pos)) { + if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) { return false; } // Delete this txn from tmap, process records into emap @@ -717,7 +745,7 @@ bool RecoveryManager::getNextRecordHeader() } else { // txn dequeue uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error - fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); + fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount(); } } } @@ -729,7 +757,9 @@ bool RecoveryManager::getNextRecordHeader() inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t)); checkFileStreamOk(false); if (needNextFile()) { + file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES; if (!getNextFile(false)) { + lastRecord(start_fid, file_pos); return false; } } @@ -737,17 +767,36 @@ bool RecoveryManager::getNextRecordHeader() break; case 0: //std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG - checkJournalAlignment(file_pos); + checkJournalAlignment(getCurrentFileNumber(), file_pos); return false; default: //std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG // Stop as this is the overwrite boundary. - checkJournalAlignment(file_pos); + checkJournalAlignment(getCurrentFileNumber(), file_pos); return false; } return true; } +void RecoveryManager::lastRecord(const uint64_t file_id, const std::streamoff endOffset) { + endOffset_ = endOffset; + initial_fid_ = file_id; + fileNumberMap_[file_id]->completedDblkCount_ = endOffset_ / QLS_DBLK_SIZE_BYTES; + + // Remove any files in fileNumberMap_ beyond initial_fid_ + fileNumberMapItr_t unwantedFirstItr = fileNumberMap_.find(file_id); + if (++unwantedFirstItr != fileNumberMap_.end()) { + fileNumberMapItr_t itr = unwantedFirstItr; + notNeededFilesList_.push_back(unwantedFirstItr->second->journalFilePtr_->getFqFileName()); + while (++itr != fileNumberMap_.end()) { + notNeededFilesList_.push_back(itr->second->journalFilePtr_->getFqFileName()); + delete itr->second->journalFilePtr_; + delete itr->second; + } + fileNumberMap_.erase(unwantedFirstItr, fileNumberMap_.end()); + } +} + bool RecoveryManager::needNextFile() { if (inFileStream_.is_open()) { return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024); @@ -820,7 +869,7 @@ bool RecoveryManager::readFileHeader() { currentSerial_ = fhdr._rhdr._serial; } else { inFileStream_.close(); - if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { + if (currentJournalFileItr_ == fileNumberMap_.begin()) { journalEmptyFlag_ = true; } return false; @@ -855,9 +904,11 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName, } void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { - while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { -//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG - emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName()); + while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { + RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second; + emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName()); + delete rfdp->journalFilePtr_; + delete rfdp; fileNumberMap_.erase(fileNumberMap_.begin()->first); } } diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h index 997596938b..e19f92e305 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h @@ -51,15 +51,21 @@ struct RecoveredRecordData_t { RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn); }; +struct RecoveredFileData_t { + JournalFile* journalFilePtr_; + uint32_t completedDblkCount_; + RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount); +}; + bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b); class RecoveryManager { protected: // Types - typedef std::vector<std::string> directoryList_t; - typedef directoryList_t::const_iterator directoryListConstItr_t; - typedef std::map<uint64_t, JournalFile*> fileNumberMap_t; + typedef std::vector<std::string> stringList_t; + typedef stringList_t::const_iterator stringListConstItr_t; + typedef std::map<uint64_t, RecoveredFileData_t*> fileNumberMap_t; typedef fileNumberMap_t::iterator fileNumberMapItr_t; typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t; typedef std::vector<RecoveredRecordData_t> recordIdList_t; @@ -74,18 +80,20 @@ protected: // Initial journal analysis data fileNumberMap_t fileNumberMap_; ///< File number - JournalFilePtr map + stringList_t notNeededFilesList_; ///< Files not needed and to be returned to EFP + stringList_t uninitFileList_; ///< File name of uninitialized journal files found during header analysis bool journalEmptyFlag_; ///< Journal data files empty std::streamoff firstRecordOffset_; ///< First record offset in ffid std::streamoff endOffset_; ///< End offset (first byte past last record) uint64_t highestRecordId_; ///< Highest rid found uint64_t highestFileNumber_; ///< Highest file number found bool lastFileFullFlag_; ///< Last file is full - std::string uninitializedJournal_; ///< File name of uninitialized journal found during header analysis + uint64_t initial_fid_; ///< File id where initial write after recovery will occur // State for recovery of individual enqueued records uint64_t currentSerial_; uint32_t efpFileSize_kib_; - fileNumberMapConstItr_t currentJournalFileConstItr_; + fileNumberMapConstItr_t currentJournalFileItr_; std::string currentFileName_; std::ifstream inFileStream_; recordIdList_t recordIdList_; @@ -121,16 +129,18 @@ public: protected: void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity); void checkFileStreamOk(bool checkEof); - void checkJournalAlignment(const std::streampos recordPosition); + void checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition); bool decodeRecord(jrec& record, std::size_t& cumulativeSizeRead, ::rec_hdr_t& recordHeader, - std::streampos& fileOffset); + const uint64_t start_fid, + const std::streampos recordOffset); std::string getCurrentFileName() const; uint64_t getCurrentFileNumber() const; bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag); bool getNextFile(bool jumpToFirstRecordOffsetFlag); bool getNextRecordHeader(); + void lastRecord(const uint64_t file_id, const std::streamoff endOffset); bool needNextFile(); void prepareRecordList(); bool readFileHeader(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp index a4882aaa9c..90ca27d082 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp @@ -181,7 +181,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch } bool -deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) +deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) { if (rec_offs == 0) { @@ -228,7 +228,7 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } - check_rec_tail(); + check_rec_tail(rec_start); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); @@ -274,7 +274,7 @@ deq_rec::rec_size() const } void -deq_rec::check_rec_tail() const { +deq_rec::check_rec_tail(const std::streampos rec_start) const { Checksum checksum; checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t)); if (_deq_hdr._xidsize > 0) { @@ -284,7 +284,7 @@ deq_rec::check_rec_tail() const { uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; - oss << std::hex; + oss << std::endl << " Record offset: 0x" << std::hex << rec_start; if (res & ::REC_TAIL_MAGIC_ERR_MASK) { oss << std::endl << " Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h index ead0eed72a..9f55032e76 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h @@ -49,7 +49,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp, const std::size_t xidlen, const bool txn_coml_commit); uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); - bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start); inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); } inline uint64_t rid() const { return _deq_hdr._rhdr._rid; } @@ -59,7 +59,7 @@ public: inline std::size_t data_size() const { return 0; } // This record never carries data std::size_t xid_size() const; std::size_t rec_size() const; - void check_rec_tail() const; + void check_rec_tail(const std::streampos rec_start) const; private: virtual void clean(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp index f95a722308..0fecd90cbf 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp @@ -218,7 +218,7 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch } bool -enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) +enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) { if (rec_offs == 0) { @@ -291,7 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } - check_rec_tail(); + check_rec_tail(rec_start); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); @@ -352,7 +352,7 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool } void -enq_rec::check_rec_tail() const { +enq_rec::check_rec_tail(const std::streampos rec_start) const { Checksum checksum; checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t)); if (_enq_hdr._xidsize > 0) { @@ -365,7 +365,7 @@ enq_rec::check_rec_tail() const { uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; - oss << std::hex; + oss << std::endl << " Record offset: 0x" << std::hex << rec_start; if (res & ::REC_TAIL_MAGIC_ERR_MASK) { oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h index 1655e2cc4d..d85cde42f5 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h @@ -51,7 +51,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen, const void* const xidp, const std::size_t xidlen, const bool transient, const bool external); uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); - bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start); std::size_t get_xid(void** const xidpp); std::size_t get_data(void** const datapp); @@ -63,7 +63,7 @@ public: std::size_t rec_size() const; static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external); inline uint64_t rid() const { return _enq_hdr._rhdr._rid; } - void check_rec_tail() const; + void check_rec_tail(const std::streampos rec_start) const; private: virtual void clean(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp index 55bac4a2e5..ab367754d5 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp @@ -21,6 +21,7 @@ #include "qpid/linearstore/journal/jcntl.h" +#include <iomanip> #include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/JournalLog.h" @@ -90,7 +91,7 @@ jcntl::initialize(EmptyFilePool* efpp, _linearFileController.finalize(); _jdir.clear_dir(); // Clear any existing journal files _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL); - _linearFileController.pullEmptyFileFromEfp(); + _linearFileController.getNextJournalFile(); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS); _init_flag = true; } @@ -120,6 +121,9 @@ jcntl::recover(EmptyFilePoolManager* efpmp, _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5)); _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber()); _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController); + if (_recoveryManager.isLastFileFull()) { + _linearFileController.getNextJournalFile(); + } _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset())); @@ -316,6 +320,20 @@ jcntl::getLinearFileControllerRef() { return _linearFileController; } +// static +std::string +jcntl::str2hexnum(const std::string& str) { + if (str.empty()) { + return "<null>"; + } + std::ostringstream oss; + oss << "(" << str.size() << ")0x" << std::hex; + for (unsigned i=str.size(); i>0; --i) { + oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1]; + } + return oss.str(); +} + iores jcntl::flush(const bool block_till_aio_cmpl) { diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h index 2db0e707a7..94c00d2fab 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h @@ -537,6 +537,8 @@ public: inline virtual void instr_incr_outstanding_aio_cnt() {} inline virtual void instr_decr_outstanding_aio_cnt() {} + static std::string str2hexnum(const std::string& str); + protected: static bool _init; static bool init_statics(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp index 01c432d37b..8765396b31 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp @@ -167,8 +167,8 @@ jerrno::__init() _err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found"; // class jrec, enq_rec, deq_rec, txn_rec - _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header."; - _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid data record tail."; + _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid record header."; + _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid record tail."; // class wmgr _err_map[JERR_WMGR_BADPGSTATE] = "JERR_WMGR_BADPGSTATE: Page buffer in illegal state for operation."; diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h index 7645e646f6..cad0e5d7a2 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h @@ -98,7 +98,7 @@ public: * \returns Number of data-blocks encoded. */ virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0; - virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0; + virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) = 0; virtual std::string& str(std::string& str) const = 0; virtual std::size_t data_size() const = 0; diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp index 1368fd4be2..298ab608b1 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp @@ -176,7 +176,7 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch } bool -txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) +txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) { if (rec_offs == 0) { @@ -218,7 +218,7 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } - check_rec_tail(); + check_rec_tail(rec_start); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); @@ -266,7 +266,7 @@ txn_rec::rec_size() const } void -txn_rec::check_rec_tail() const { +txn_rec::check_rec_tail(const std::streampos rec_start) const { Checksum checksum; checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t)); if (_txn_hdr._xidsize > 0) { @@ -276,7 +276,7 @@ txn_rec::check_rec_tail() const { uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; - oss << std::hex; + oss << std::endl << " Record offset: 0x" << std::hex << rec_start; if (res & ::REC_TAIL_MAGIC_ERR_MASK) { oss << std::endl << " Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h index daca16d9d4..4552071595 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h @@ -49,7 +49,7 @@ public: void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp, const std::size_t xidlen); uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); - bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start); std::size_t get_xid(void** const xidpp); std::string& str(std::string& str) const; @@ -57,7 +57,7 @@ public: std::size_t xid_size() const; std::size_t rec_size() const; inline uint64_t rid() const { return _txn_hdr._rhdr._rid; } - void check_rec_tail() const; + void check_rec_tail(const std::streampos rec_start) const; private: virtual void clean(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index c246837a8d..7c590713f5 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -30,8 +30,6 @@ #include "qpid/linearstore/journal/LinearFileController.h" #include "qpid/linearstore/journal/utils/file_hdr.h" -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { @@ -49,7 +47,7 @@ wmgr::wmgr(jcntl* jc, _deq_busy(false), _abort_busy(false), _commit_busy(false), - _txn_pending_set() + _txn_pending_map() {} wmgr::wmgr(jcntl* jc, @@ -67,7 +65,7 @@ wmgr::wmgr(jcntl* jc, _deq_busy(false), _abort_busy(false), _commit_busy(false), - _txn_pending_set() + _txn_pending_map() {} wmgr::~wmgr() @@ -281,6 +279,7 @@ wmgr::dequeue(data_tok* dtokp, _deq_busy = true; } //std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG + std::string xid((const char*)xid_ptr, xid_len); bool done = false; Checksum checksum; while (!done) @@ -292,9 +291,27 @@ wmgr::dequeue(data_tok* dtokp, uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); - // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { - dtokp->set_fid(_lfc.getCurrentFileSeqNum()); + uint64_t fid; + short eres = _emap.get_pfid(dtokp->dequeue_rid(), fid); + if (eres == enq_map::EMAP_OK) { + dtokp->set_fid(fid); + } else if (xid_len > 0) { + txn_data_list_t tdl = _tmap.get_tdata_list(xid); + bool found = false; + for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) { + if (i->rid_ == dtokp->dequeue_rid()) { + found = true; + dtokp->set_fid(i->pfid_); + break; + } + } + if (!found) { + throw jexception("rid found in neither emap nor tmap, transactional"); + } + } else { + throw jexception("rid not found in emap, non-transactional"); + } } _pg_offset_dblks += ret; _cached_offset_dblks += ret; @@ -325,7 +342,7 @@ wmgr::dequeue(data_tok* dtokp, if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; + oss << std::hex << "emap: rid=0x" << rid; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); } if (eres == enq_map::EMAP_LOCKED) @@ -335,10 +352,6 @@ wmgr::dequeue(data_tok* dtokp, throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } -//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG -//try { - _lfc.decrEnqueuedRecordCount(fid); -//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; } } done = true; @@ -427,14 +440,16 @@ wmgr::abort(data_tok* dtokp, // Delete this txn from tmap, unlock any locked records in emap std::string xid((const char*)xid_ptr, xid_len); txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + fidl_t fidl; for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (!itr->enq_flag_) _emap.unlock(itr->drid_); // ignore rid not found error - if (itr->enq_flag_) - _lfc.decrEnqueuedRecordCount(itr->pfid_); + if (itr->enq_flag_) { + fidl.push_back(itr->pfid_); + } } - std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); + std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl)); if (!res.second) { std::ostringstream oss; @@ -526,6 +541,7 @@ wmgr::commit(data_tok* dtokp, // Delete this txn from tmap, process records into emap std::string xid((const char*)xid_ptr, xid_len); txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + fidl_t fidl; for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) // txn enqueue @@ -547,20 +563,20 @@ wmgr::commit(data_tok* dtokp, if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); + oss << std::hex << "emap: rid=0x" << itr->drid_; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "commit"); } if (eres == enq_map::EMAP_LOCKED) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; - throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); + oss << std::hex << "rid=0x" << itr->drid_; + throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "commit"); } } - _lfc.decrEnqueuedRecordCount(fid); + fidl.push_back(fid); } } - std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); + std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl)); if (!res.second) { std::ostringstream oss; @@ -695,7 +711,7 @@ wmgr::get_next_file() { _pg_cntr = 0; //std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG - _lfc.pullEmptyFileFromEfp(); + _lfc.getNextJournalFile(); } int32_t @@ -757,7 +773,7 @@ wmgr::get_events(timespec* const timeout, data_tok* dtokp = pcbp->_pdtokl->at(k); if (dtokp->decr_pg_cnt() == 0) { - std::set<std::string>::iterator it; + pending_txn_map_itr_t it; switch (dtokp->wstate()) { case data_tok::ENQ_SUBM: @@ -770,6 +786,9 @@ wmgr::get_events(timespec* const timeout, _tmap.set_aio_compl(dtokp->xid(), dtokp->rid()); break; case data_tok::DEQ_SUBM: + if (!dtokp->has_xid()) { + _lfc.decrEnqueuedRecordCount(dtokp->fid()); + } dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::DEQ); @@ -781,31 +800,35 @@ wmgr::get_events(timespec* const timeout, dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::ABORTED); - it = _txn_pending_set.find(dtokp->xid()); - if (it == _txn_pending_set.end()) + it = _txn_pending_map.find(dtokp->xid()); + if (it == _txn_pending_map.end()) { std::ostringstream oss; - oss << std::hex << "_txn_pending_set: abort xid=\""; - oss << dtokp->xid() << "\""; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", - "get_events"); + oss << std::hex << "_txn_pending_set: abort xid=\"" + << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events"); + } + for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) { + _lfc.decrEnqueuedRecordCount(*i); } - _txn_pending_set.erase(it); + _txn_pending_map.erase(it); break; case data_tok::COMMIT_SUBM: dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::COMMITTED); - it = _txn_pending_set.find(dtokp->xid()); - if (it == _txn_pending_set.end()) + it = _txn_pending_map.find(dtokp->xid()); + if (it == _txn_pending_map.end()) { std::ostringstream oss; - oss << std::hex << "_txn_pending_set: commit xid=\""; - oss << dtokp->xid() << "\""; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", - "get_events"); + oss << std::hex << "_txn_pending_set: commit xid=\"" + << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events"); + } + for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) { + _lfc.decrEnqueuedRecordCount(*i); } - _txn_pending_set.erase(it); + _txn_pending_map.erase(it); break; case data_tok::ENQ_PART: case data_tok::DEQ_PART: @@ -858,8 +881,8 @@ wmgr::is_txn_synced(const std::string& xid) if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED) return false; // Check for outstanding commit/aborts - std::set<std::string>::iterator it = _txn_pending_set.find(xid); - return it == _txn_pending_set.end(); + pending_txn_map_itr_t it = _txn_pending_map.find(xid); + return it == _txn_pending_map.end(); } void @@ -871,7 +894,6 @@ wmgr::initialize(aio_callback* const cbp, pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); wmgr::clean(); _page_cb_arr[0]._state = IN_USE; - _ddtokl.clear(); _cached_offset_dblks = 0; _enq_busy = false; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h index 8837e51e97..8eaa2364ad 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h @@ -22,9 +22,11 @@ #ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H #define QPID_LINEARSTORE_JOURNAL_WMGR_H +#include <deque> +#include <map> #include "qpid/linearstore/journal/enums.h" #include "qpid/linearstore/journal/pmgr.h" -#include <set> +#include <vector> namespace qpid { namespace linearstore { @@ -51,11 +53,15 @@ class LinearFileController; class wmgr : public pmgr { private: + typedef std::vector<uint64_t> fidl_t; + typedef fidl_t::iterator fidl_itr_t; + typedef std::map<std::string, fidl_t> pending_txn_map_t; + typedef pending_txn_map_t::iterator pending_txn_map_itr_t; + LinearFileController& _lfc; ///< Linear File Controller ref uint32_t _max_dtokpp; ///< Max data writes per page uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) - std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list // TODO: Convert _enq_busy etc into a proper threadsafe lock // TODO: Convert to enum? Are these encodes mutually exclusive? @@ -70,7 +76,7 @@ private: enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding txn_rec _txn_rec; ///< Transaction record used for encoding/decoding - std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts + pending_txn_map_t _txn_pending_map; ///< Set containing xids of pending commits/aborts public: wmgr(jcntl* jc, |