diff options
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/RecoveryManager.cpp')
-rw-r--r-- | cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | 195 |
1 files changed, 123 insertions, 72 deletions
diff --git a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 72308cc929..a1cec53ca1 100644 --- a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -56,11 +56,15 @@ RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t pendingTransaction_(ptxn) {} - bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) { return a.recordId_ < b.recordId_; } +RecoveredFileData_t::RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount) : + journalFilePtr_(journalFilePtr), + completedDblkCount_(completedDblkCount) +{} + RecoveryManager::RecoveryManager(const std::string& journalDirectory, const std::string& queuename, enq_map& enqueueMapRef, @@ -77,11 +81,17 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory, highestRecordId_(0ULL), highestFileNumber_(0ULL), lastFileFullFlag_(false), + initial_fid_(0), currentSerial_(0), efpFileSize_kib_(0) {} -RecoveryManager::~RecoveryManager() {} +RecoveryManager::~RecoveryManager() { + for (fileNumberMapItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { + delete i->second; + } + fileNumberMap_.clear(); +} void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr, EmptyFilePoolManager* emptyFilePoolManager, @@ -92,9 +102,6 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib(); - // Check for file full condition - lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; - if (!journalEmptyFlag_) { // Read all records, establish remaining enqueued records @@ -106,6 +113,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr inFileStream_.close(); } + // Check for file full condition + lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; + // Remove leading files which have no enqueued records removeEmptyFiles(*emptyFilePoolPtrPtr); @@ -121,7 +131,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr // Unlock any affected enqueues in emap for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) { if (i->enq_flag_) { // enq op - decrement enqueue count - fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount(); + fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND @@ -174,7 +184,7 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, } } while (!foundRecord); - if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) { + if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) { if (!getFile(recordIdListConstItr_->fileId_, false)) { std::ostringstream oss; oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_; @@ -231,7 +241,6 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, ::rec_tail_t enqueueTail; inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t)); uint32_t cs = checksum.getChecksum(); -//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs); if (res != 0) { std::stringstream oss; @@ -266,17 +275,30 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { if (journalEmptyFlag_) { - if (uninitializedJournal_.size() > 0) { - lfcPtr->restoreEmptyFile(uninitializedJournal_); + if (uninitFileList_.size() > 0) { + std::string uninitFile = uninitFileList_.back(); + uninitFileList_.pop_back(); + lfcPtr->restoreEmptyFile(uninitFile); } } else { for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { - uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file? - endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset - efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full - (lfcPtr->*fnPtr)(i->second, fileDblkCount); + (lfcPtr->*fnPtr)(i->second->journalFilePtr_, i->second->completedDblkCount_, i->first == initial_fid_); } } + + std::ostringstream oss; + bool logFlag = !notNeededFilesList_.empty(); + if (logFlag) { + oss << "Files removed from head of journal: prior truncation during recovery:"; + } + while (!notNeededFilesList_.empty()) { + lfcPtr->removeFileToEfp(notNeededFilesList_.back()); + oss << std::endl << " * " << notNeededFilesList_.back(); + notNeededFilesList_.pop_back(); + } + if (logFlag) { + journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str()); + } } std::string RecoveryManager::toString(const std::string& jid) { @@ -285,7 +307,7 @@ std::string RecoveryManager::toString(const std::string& jid) { oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; oss << " Journal File List:" << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { - std::string fqFileName = k->second->getFqFileName(); + std::string fqFileName = k->second->journalFilePtr_->getFqFileName(); oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; } oss << " Enqueue Counts: [ "; @@ -293,7 +315,7 @@ std::string RecoveryManager::toString(const std::string& jid) { if (l != fileNumberMap_.begin()) { oss << ", "; } - oss << l->second->getEnqueuedRecordCount(); + oss << l->second->journalFilePtr_->getEnqueuedRecordCount(); } oss << " ]" << std::endl; oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; @@ -330,15 +352,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { << std::setw(10) << "--------" << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { - std::string fqFileName = k->second->getFqFileName(); + std::string fqFileName = k->second->journalFilePtr_->getFqFileName(); + std::ostringstream fid; + fid << std::hex << "0x" << k->first; std::ostringstream fro; - fro << std::hex << "0x" << k->second->getFirstRecordOffset(); - oss << indentStr << std::setw(7) << k->first + fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset(); + oss << indentStr << std::setw(7) << fid.str() << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1) << std::setw(16) << fro.str() - << std::setw(12) << k->second->getEnqueuedRecordCount() - << std::setw(5) << k->second->getEfpIdentity().pn_ - << std::setw(9) << k->second->getEfpIdentity().ds_ << "k" + << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount() + << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_ + << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k" << std::endl; } oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ << @@ -347,7 +371,7 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; - oss << indentStr << "Enqueued records (txn & non-txn):"; + //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report } return oss.str(); } @@ -357,27 +381,28 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { std::string headerQueueName; ::file_hdr_t fileHeader; - directoryList_t directoryList; + stringList_t directoryList; jdir::read_dir(journalDirectory_, directoryList, false, true, false, true); - for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { + for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { readJournalFileHeader(*i, fileHeader, headerQueueName); if (headerQueueName.empty()) { std::ostringstream oss; - if (uninitializedJournal_.empty()) { - oss << "Journal file " << (*i) << " is first uninitialized (not yet written) journal file."; - journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str()); - uninitializedJournal_ = *i; - } else { - oss << "Journal file " << (*i) << " is second or greater uninitialized journal file - ignoring"; - journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); - } + oss << "Journal file " << (*i) << " is uninitialized"; + journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); + uninitFileList_.push_back(*i); } else if (headerQueueName.compare(queueName_) != 0) { std::ostringstream oss; oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); } else { - JournalFile* jfp = new JournalFile(*i, fileHeader); - fileNumberMap_[fileHeader._file_number] = jfp; + JournalFile* jfp = new JournalFile(*i, fileHeader, queueName_); + std::pair<fileNumberMapItr_t, bool> res = fileNumberMap_.insert( + std::pair<uint64_t, RecoveredFileData_t*>(fileHeader._file_number, new RecoveredFileData_t(jfp, 0))); + if (!res.second) { + std::ostringstream oss; + oss << "Journal file " << (*i) << " has fid=0x" << std::hex << jfp->getFileSeqNum() << " which already exists for this journal."; + throw jexception(oss.str()); // TODO: complete this exception + } if (fileHeader._file_number > highestFileNumber_) { highestFileNumber_ = fileHeader._file_number; } @@ -393,7 +418,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { - currentJournalFileConstItr_ = fileNumberMap_.begin(); + currentJournalFileItr_ = fileNumberMap_.begin(); } } @@ -408,7 +433,7 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) { } } -void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) { +void RecoveryManager::checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition) { if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) { std::ostringstream oss; oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition; @@ -420,12 +445,13 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) if (sblkOffset) { std::ostringstream oss1; - oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber(); + oss1 << std::hex << "Bad record alignment found at fid=0x" << start_fid; oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec; oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required."; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str()); - std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary); + fileNumberMapConstItr_t fnmItr = fileNumberMap_.find(start_fid); + std::ofstream outFileStream(fnmItr->second->journalFilePtr_->getFqFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary); if (!outFileStream.good()) { throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment"); } @@ -447,7 +473,7 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment"); } std::ostringstream oss2; - oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber(); + oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << start_fid; oss2 << " offs=0x" << currentPosn; journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str()); currentPosn = outFileStream.tellp(); @@ -456,16 +482,15 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) std::free(writeBuffer); journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed."); } - endOffset_ = currentPosn; + lastRecord(start_fid, currentPosn); } bool RecoveryManager::decodeRecord(jrec& record, std::size_t& cumulativeSizeRead, ::rec_hdr_t& headerRecord, - std::streampos& fileOffset) + const uint64_t start_fid, + const std::streampos recordOffset) { - std::streampos start_file_offs = fileOffset; - if (highestRecordId_ == 0) { highestRecordId_ = headerRecord._rid; } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit @@ -475,7 +500,7 @@ bool RecoveryManager::decodeRecord(jrec& record, bool done = false; while (!done) { try { - done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead); + done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead, recordOffset); } catch (const jexception& e) { if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) { @@ -485,11 +510,12 @@ bool RecoveryManager::decodeRecord(jrec& record, } else { journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what()); } - checkJournalAlignment(start_file_offs); + checkJournalAlignment(start_fid, recordOffset); return false; } if (!done && needNextFile()) { if (!getNextFile(false)) { + checkJournalAlignment(start_fid, recordOffset); return false; } } @@ -498,11 +524,11 @@ bool RecoveryManager::decodeRecord(jrec& record, } std::string RecoveryManager::getCurrentFileName() const { - return currentJournalFileConstItr_->second->getFqFileName(); + return currentJournalFileItr_->second->journalFilePtr_->getFqFileName(); } uint64_t RecoveryManager::getCurrentFileNumber() const { - return currentJournalFileConstItr_->first; + return currentJournalFileItr_->first; } bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) { @@ -511,8 +537,8 @@ bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordO //std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ } - currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); - if (currentJournalFileConstItr_ == fileNumberMap_.end()) { + currentJournalFileItr_ = fileNumberMap_.find(fileNumber); + if (currentJournalFileItr_ == fileNumberMap_.end()) { return false; } inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); @@ -536,7 +562,8 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { if (inFileStream_.is_open()) { inFileStream_.close(); //std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG - if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { + currentJournalFileItr_->second->completedDblkCount_ = efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; + if (++currentJournalFileItr_ == fileNumberMap_.end()) { return false; } inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ @@ -562,13 +589,15 @@ bool RecoveryManager::getNextRecordHeader() rec_hdr_t h; bool hdr_ok = false; - std::streampos file_pos; + uint64_t file_id = 0; + std::streampos file_pos = 0; while (!hdr_ok) { if (needNextFile()) { if (!getNextFile(true)) { return false; } } + file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum(); file_pos = inFileStream_.tellg(); if (file_pos == std::streampos(-1)) { std::ostringstream oss; @@ -587,21 +616,21 @@ bool RecoveryManager::getNextRecordHeader() } } + uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary switch(h._magic) { case QLS_ENQ_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } enq_rec er; - uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary - if (!decodeRecord(er, cum_size_read, h, file_pos)) { + if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) { return false; } if (!er.is_transient()) { // Ignore transient msgs - fileNumberMap_[start_fid]->incrEnqueuedRecordCount(); + fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount(); if (er.xid_size()) { er.get_xid(&xidp); if (xidp == 0) { @@ -629,12 +658,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } deq_rec dr; - uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary - if (!decodeRecord(dr, cum_size_read, h, file_pos)) { + if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) { return false; } if (dr.xid_size()) { @@ -655,7 +683,7 @@ bool RecoveryManager::getNextRecordHeader() } else { uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error - fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); + fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount(); } } } @@ -664,11 +692,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } txn_rec ar; - if (!decodeRecord(ar, cum_size_read, h, file_pos)) { + if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) { return false; } // Delete this txn from tmap, unlock any locked records in emap @@ -680,7 +708,7 @@ bool RecoveryManager::getNextRecordHeader() txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) { - fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount(); + fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else { enqueueMapRef_.unlock(itr->drid_); // ignore not found error } @@ -691,11 +719,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } txn_rec cr; - if (!decodeRecord(cr, cum_size_read, h, file_pos)) { + if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) { return false; } // Delete this txn from tmap, process records into emap @@ -717,7 +745,7 @@ bool RecoveryManager::getNextRecordHeader() } else { // txn dequeue uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error - fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); + fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount(); } } } @@ -729,7 +757,9 @@ bool RecoveryManager::getNextRecordHeader() inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t)); checkFileStreamOk(false); if (needNextFile()) { + file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES; if (!getNextFile(false)) { + lastRecord(start_fid, file_pos); return false; } } @@ -737,17 +767,36 @@ bool RecoveryManager::getNextRecordHeader() break; case 0: //std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG - checkJournalAlignment(file_pos); + checkJournalAlignment(getCurrentFileNumber(), file_pos); return false; default: //std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG // Stop as this is the overwrite boundary. - checkJournalAlignment(file_pos); + checkJournalAlignment(getCurrentFileNumber(), file_pos); return false; } return true; } +void RecoveryManager::lastRecord(const uint64_t file_id, const std::streamoff endOffset) { + endOffset_ = endOffset; + initial_fid_ = file_id; + fileNumberMap_[file_id]->completedDblkCount_ = endOffset_ / QLS_DBLK_SIZE_BYTES; + + // Remove any files in fileNumberMap_ beyond initial_fid_ + fileNumberMapItr_t unwantedFirstItr = fileNumberMap_.find(file_id); + if (++unwantedFirstItr != fileNumberMap_.end()) { + fileNumberMapItr_t itr = unwantedFirstItr; + notNeededFilesList_.push_back(unwantedFirstItr->second->journalFilePtr_->getFqFileName()); + while (++itr != fileNumberMap_.end()) { + notNeededFilesList_.push_back(itr->second->journalFilePtr_->getFqFileName()); + delete itr->second->journalFilePtr_; + delete itr->second; + } + fileNumberMap_.erase(unwantedFirstItr, fileNumberMap_.end()); + } +} + bool RecoveryManager::needNextFile() { if (inFileStream_.is_open()) { return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024); @@ -820,7 +869,7 @@ bool RecoveryManager::readFileHeader() { currentSerial_ = fhdr._rhdr._serial; } else { inFileStream_.close(); - if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { + if (currentJournalFileItr_ == fileNumberMap_.begin()) { journalEmptyFlag_ = true; } return false; @@ -855,9 +904,11 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName, } void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { - while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { -//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG - emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName()); + while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { + RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second; + emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName()); + delete rfdp->journalFilePtr_; + delete rfdp; fileNumberMap_.erase(fileNumberMap_.begin()->first); } } |