diff options
Diffstat (limited to 'cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp | 269 |
1 files changed, 173 insertions, 96 deletions
diff --git a/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp b/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp index cba560750b..361ee1aeda 100644 --- a/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp +++ b/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp @@ -44,8 +44,8 @@ #include <vector> namespace qpid { -namespace qls_jrnl -{ +namespace linearstore { +namespace journal { RecoveryManager::RecoveryManager(const std::string& journalDirectory, const std::string& queuename, @@ -63,6 +63,7 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory, highestRecordId_(0ULL), highestFileNumber_(0ULL), lastFileFullFlag_(false), + currentSerial_(0), efpFileSize_kib_(0) {} @@ -154,9 +155,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, } enq_map::emap_data_struct_t eds; enqueueMapRef_.get_data(*recordIdListConstItr_, eds); - uint64_t fileNumber = eds._pfid; - currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); - getNextFile(false); + if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) { + getFile(eds._pfid, false); + } +//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG inFileStream_.seekg(eds._file_posn, std::ifstream::beg); if (!inFileStream_.good()) { @@ -174,7 +176,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, // check flags transient = ::is_enq_transient(&enqueueHeader); external = ::is_enq_external(&enqueueHeader); - +//char magicBuff[5]; // DEBUG +//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG +//magicBuff[4] = 0; // DEBUG +//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG // read xid xidSize = enqueueHeader._xidsize; *xidPtrPtr = ::malloc(xidSize); @@ -217,57 +222,75 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, } } -std::string RecoveryManager::toString(const std::string& jid, - bool compact) { +std::string RecoveryManager::toString(const std::string& jid) { std::ostringstream oss; - if (compact) { - oss << "Recovery journal analysis (jid=\"" << jid << "\"):"; - oss << " jfl=["; - for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) { - if (i!=fileNumberMap_.begin()) { - oss << " "; - } - std::string fqFileName = i->second->getFqFileName(); - oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1); - } - oss << "] ecl=[ "; - for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) { - if (j!=fileNumberMap_.begin()) { - oss << " "; - } - oss << j->second->getEnqueuedRecordCount(); + oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl; + oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; + oss << " Journal File List:" << std::endl; + for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { + std::string fqFileName = k->second->getFqFileName(); + oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; + } + oss << " Enqueue Counts: [ "; + for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) { + if (l != fileNumberMap_.begin()) { + oss << ", "; } - oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F"); - oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)"; - oss << " eo=0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)"; - oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec; - oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec; - oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F"); + oss << l->second->getEnqueuedRecordCount(); + } + oss << " ]" << std::endl; + oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; + oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ << + std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; + oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" << + (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; + oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; + oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl; + oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; + oss << " Enqueued records (txn & non-txn):" << std::endl; + return oss.str(); +} + +std::string RecoveryManager::toLog(const std::string& jid, const int indent) { + std::string indentStr(indent, ' '); + std::ostringstream oss; + oss << std::endl << indentStr << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl; + if (journalEmptyFlag_) { + oss << indentStr << "<Journal empty, no journal files found>" << std::endl; } else { - oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl; - oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; - oss << " Journal File List:" << std::endl; + oss << indentStr << std::setw(7) << "file_id" + << std::setw(43) << "file_name" + << std::setw(16) << "fro" + << std::setw(12) << "record_cnt" + << std::setw(5) << "ptn" + << std::setw(10) << "efp" + << std::endl; + oss << indentStr << std::setw(7) << "-------" + << std::setw(43) << "-----------------------------------------" + << std::setw(16) << "--------------" + << std::setw(12) << "----------" + << std::setw(5) << "---" + << std::setw(10) << "--------" + << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { std::string fqFileName = k->second->getFqFileName(); - oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; - } - oss << " Enqueue Counts: [ " << std::endl; - for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) { - if (l != fileNumberMap_.begin()) { - oss << ", "; - } - oss << l->second->getEnqueuedRecordCount(); + std::ostringstream fro; + fro << std::hex << "0x" << k->second->getFirstRecordOffset(); + oss << indentStr << std::setw(7) << k->first + << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1) + << std::setw(16) << fro.str() + << std::setw(12) << k->second->getEnqueuedRecordCount() + << std::setw(5) << k->second->getEfpIdentity().pn_ + << std::setw(9) << k->second->getEfpIdentity().ds_ << "k" + << std::endl; } - oss << " ]" << std::endl; - oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; - oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ << + oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; - oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" << + oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; - oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; - oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl; - oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; - oss << " Enqueued records (txn & non-txn):" << std::endl; + oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; + oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; + oss << indentStr << "Enqueued records (txn & non-txn):"; } return oss.str(); } @@ -355,9 +378,9 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) } bool RecoveryManager::decodeRecord(jrec& record, - std::size_t& cumulativeSizeRead, - ::rec_hdr_t& headerRecord, - std::streampos& fileOffset) + std::size_t& cumulativeSizeRead, + ::rec_hdr_t& headerRecord, + std::streampos& fileOffset) { std::streampos start_file_offs = fileOffset; @@ -370,15 +393,18 @@ bool RecoveryManager::decodeRecord(jrec& record, bool done = false; while (!done) { try { - done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead); + done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead); } catch (const jexception& e) { + journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what()); checkJournalAlignment(start_file_offs); return false; } - if (!done && !getNextFile(false)) { - checkJournalAlignment(start_file_offs); - return false; + if (!done && needNextFile()) { + if (!getNextFile(false)) { + checkJournalAlignment(start_file_offs); + return false; + } } } return true; @@ -392,45 +418,50 @@ uint64_t RecoveryManager::getCurrentFileNumber() const { return currentJournalFileConstItr_->first; } -bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { +bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) { if (inFileStream_.is_open()) { - if (inFileStream_.eof() || !inFileStream_.good()) { - inFileStream_.clear(); - endOffset_ = inFileStream_.tellg(); // remember file offset before closing - if (endOffset_ == -1) { - std::ostringstream oss; - oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); - throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile"); - } - inFileStream_.close(); - if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { - return false; - } - } - } - if (!inFileStream_.is_open()) { + inFileStream_.close(); +//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ - inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); - if (!inFileStream_.good()) { - throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile"); - } + } + currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); + if (currentJournalFileConstItr_ == fileNumberMap_.end()) { + return false; + } + inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); + if (!inFileStream_.good()) { + throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getFile"); + } +//std::cout << " [F=" << getCurrentFileName() << std::flush; // DEBUG - // Read file header - file_hdr_t fhdr; - inFileStream_.read((char*)&fhdr, sizeof(fhdr)); - checkFileStreamOk(true); - if (fhdr._rhdr._magic == QLS_FILE_MAGIC) { - firstRecordOffset_ = fhdr._fro; - std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES; - inFileStream_.seekg(foffs); - } else { - inFileStream_.close(); - if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { - journalEmptyFlag_ = true; - } + if (!readFileHeader()) { + return false; + } + std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; + inFileStream_.seekg(foffs); + return true; +} + +bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { + if (inFileStream_.is_open()) { + inFileStream_.close(); +//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG + if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { return false; } + inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ + } + inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); + if (!inFileStream_.good()) { + throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile"); + } +//std::cout << " [.F=" << getCurrentFileName() << std::flush; // DEBUG + + if (!readFileHeader()) { + return false; } + std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; + inFileStream_.seekg(foffs); return true; } @@ -443,7 +474,7 @@ bool RecoveryManager::getNextRecordHeader() bool hdr_ok = false; std::streampos file_pos; while (!hdr_ok) { - if (!inFileStream_.is_open()) { + if (needNextFile()) { if (!getNextFile(true)) { return false; } @@ -458,8 +489,10 @@ bool RecoveryManager::getNextRecordHeader() if (inFileStream_.gcount() == sizeof(rec_hdr_t)) { hdr_ok = true; } else { - if (!getNextFile(true)) { - return false; + if (needNextFile()) { + if (!getNextFile(true)) { + return false; + } } } } @@ -468,6 +501,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_ENQ_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } enq_rec er; uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(er, cum_size_read, h, file_pos)) { @@ -502,6 +539,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_DEQ_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } deq_rec dr; uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(dr, cum_size_read, h, file_pos)) { @@ -534,6 +575,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_TXA_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } txn_rec ar; if (!decodeRecord(ar, cum_size_read, h, file_pos)) { return false; @@ -558,6 +603,10 @@ bool RecoveryManager::getNextRecordHeader() case QLS_TXC_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG + if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { + endOffset_ = file_pos; + return false; + } txn_rec cr; if (!decodeRecord(cr, cum_size_read, h, file_pos)) { return false; @@ -593,8 +642,10 @@ bool RecoveryManager::getNextRecordHeader() uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t)); inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t)); checkFileStreamOk(false); - if (!getNextFile(false)) { - return false; + if (needNextFile()) { + if (!getNextFile(false)) { + return false; + } } } break; @@ -611,6 +662,13 @@ bool RecoveryManager::getNextRecordHeader() return true; } +bool RecoveryManager::needNextFile() { + if (inFileStream_.is_open()) { + return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024); + } + return true; +} + void RecoveryManager::readJournalData(char* target, const std::streamsize readSize) { std::streamoff bytesRead = 0; @@ -624,7 +682,9 @@ void RecoveryManager::readJournalData(char* target, inFileStream_.read(target + bytesRead, readSize - bytesRead); std::streamoff thisReadSize = inFileStream_.gcount(); if (thisReadSize < readSize) { - getNextFile(false); + if (needNextFile()) { + getNextFile(false); + } file_pos = inFileStream_.tellg(); if (file_pos == std::streampos(-1)) { std::ostringstream oss; @@ -636,6 +696,23 @@ void RecoveryManager::readJournalData(char* target, } } +bool RecoveryManager::readFileHeader() { + file_hdr_t fhdr; + inFileStream_.read((char*)&fhdr, sizeof(fhdr)); + checkFileStreamOk(true); + if (::file_hdr_check(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, efpFileSize_kib_) != 0) { + firstRecordOffset_ = fhdr._fro; + currentSerial_ = fhdr._rhdr._serial; + } else { + inFileStream_.close(); + if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { + journalEmptyFlag_ = true; + } + return false; + } + return true; +} + // static private void RecoveryManager::readJournalFileHeader(const std::string& journalFileName, ::file_hdr_t& fileHeaderRef, @@ -670,4 +747,4 @@ void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { } } -}} // namespace qpid::qls_jrnl +}}} |
