summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp')
-rw-r--r--cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp269
1 files changed, 173 insertions, 96 deletions
diff --git a/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp b/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
index cba560750b..361ee1aeda 100644
--- a/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
+++ b/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
@@ -44,8 +44,8 @@
#include <vector>
namespace qpid {
-namespace qls_jrnl
-{
+namespace linearstore {
+namespace journal {
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
@@ -63,6 +63,7 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory,
highestRecordId_(0ULL),
highestFileNumber_(0ULL),
lastFileFullFlag_(false),
+ currentSerial_(0),
efpFileSize_kib_(0)
{}
@@ -154,9 +155,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
}
enq_map::emap_data_struct_t eds;
enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
- uint64_t fileNumber = eds._pfid;
- currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
- getNextFile(false);
+ if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
+ getFile(eds._pfid, false);
+ }
+//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
if (!inFileStream_.good()) {
@@ -174,7 +176,10 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
// check flags
transient = ::is_enq_transient(&enqueueHeader);
external = ::is_enq_external(&enqueueHeader);
-
+//char magicBuff[5]; // DEBUG
+//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
+//magicBuff[4] = 0; // DEBUG
+//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
// read xid
xidSize = enqueueHeader._xidsize;
*xidPtrPtr = ::malloc(xidSize);
@@ -217,57 +222,75 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
}
}
-std::string RecoveryManager::toString(const std::string& jid,
- bool compact) {
+std::string RecoveryManager::toString(const std::string& jid) {
std::ostringstream oss;
- if (compact) {
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
- oss << " jfl=[";
- for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) {
- if (i!=fileNumberMap_.begin()) {
- oss << " ";
- }
- std::string fqFileName = i->second->getFqFileName();
- oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1);
- }
- oss << "] ecl=[ ";
- for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) {
- if (j!=fileNumberMap_.begin()) {
- oss << " ";
- }
- oss << j->second->getEnqueuedRecordCount();
+ oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
+ oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
+ oss << " Journal File List:" << std::endl;
+ for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
+ std::string fqFileName = k->second->getFqFileName();
+ oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
+ }
+ oss << " Enqueue Counts: [ ";
+ for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
+ if (l != fileNumberMap_.begin()) {
+ oss << ", ";
}
- oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
- oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
- oss << " eo=0x" << std::hex << endOffset_ << std::dec << " (" << (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
- oss << " hrid=0x" << std::hex << highestRecordId_ << std::dec;
- oss << " hfnum=0x" << std::hex << highestFileNumber_ << std::dec;
- oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
+ oss << l->second->getEnqueuedRecordCount();
+ }
+ oss << " ]" << std::endl;
+ oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+ oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
+ oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << " Enqueued records (txn & non-txn):" << std::endl;
+ return oss.str();
+}
+
+std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+ std::string indentStr(indent, ' ');
+ std::ostringstream oss;
+ oss << std::endl << indentStr << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl;
+ if (journalEmptyFlag_) {
+ oss << indentStr << "<Journal empty, no journal files found>" << std::endl;
} else {
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
- oss << " Journal File List:" << std::endl;
+ oss << indentStr << std::setw(7) << "file_id"
+ << std::setw(43) << "file_name"
+ << std::setw(16) << "fro"
+ << std::setw(12) << "record_cnt"
+ << std::setw(5) << "ptn"
+ << std::setw(10) << "efp"
+ << std::endl;
+ oss << indentStr << std::setw(7) << "-------"
+ << std::setw(43) << "-----------------------------------------"
+ << std::setw(16) << "--------------"
+ << std::setw(12) << "----------"
+ << std::setw(5) << "---"
+ << std::setw(10) << "--------"
+ << std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
std::string fqFileName = k->second->getFqFileName();
- oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
- }
- oss << " Enqueue Counts: [ " << std::endl;
- for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
- if (l != fileNumberMap_.begin()) {
- oss << ", ";
- }
- oss << l->second->getEnqueuedRecordCount();
+ std::ostringstream fro;
+ fro << std::hex << "0x" << k->second->getFirstRecordOffset();
+ oss << indentStr << std::setw(7) << k->first
+ << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
+ << std::setw(16) << fro.str()
+ << std::setw(12) << k->second->getEnqueuedRecordCount()
+ << std::setw(5) << k->second->getEfpIdentity().pn_
+ << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+ << std::endl;
}
- oss << " ]" << std::endl;
- oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
+ oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
+ oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " (" <<
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
- oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
- oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " Enqueued records (txn & non-txn):" << std::endl;
+ oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
+ oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
+ oss << indentStr << "Enqueued records (txn & non-txn):";
}
return oss.str();
}
@@ -355,9 +378,9 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
}
bool RecoveryManager::decodeRecord(jrec& record,
- std::size_t& cumulativeSizeRead,
- ::rec_hdr_t& headerRecord,
- std::streampos& fileOffset)
+ std::size_t& cumulativeSizeRead,
+ ::rec_hdr_t& headerRecord,
+ std::streampos& fileOffset)
{
std::streampos start_file_offs = fileOffset;
@@ -370,15 +393,18 @@ bool RecoveryManager::decodeRecord(jrec& record,
bool done = false;
while (!done) {
try {
- done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
}
catch (const jexception& e) {
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
checkJournalAlignment(start_file_offs);
return false;
}
- if (!done && !getNextFile(false)) {
- checkJournalAlignment(start_file_offs);
- return false;
+ if (!done && needNextFile()) {
+ if (!getNextFile(false)) {
+ checkJournalAlignment(start_file_offs);
+ return false;
+ }
}
}
return true;
@@ -392,45 +418,50 @@ uint64_t RecoveryManager::getCurrentFileNumber() const {
return currentJournalFileConstItr_->first;
}
-bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
if (inFileStream_.is_open()) {
- if (inFileStream_.eof() || !inFileStream_.good()) {
- inFileStream_.clear();
- endOffset_ = inFileStream_.tellg(); // remember file offset before closing
- if (endOffset_ == -1) {
- std::ostringstream oss;
- oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
- throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile");
- }
- inFileStream_.close();
- if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
- return false;
- }
- }
- }
- if (!inFileStream_.is_open()) {
+ inFileStream_.close();
+//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
- inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
- if (!inFileStream_.good()) {
- throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
- }
+ }
+ currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
+ if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ return false;
+ }
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getFile");
+ }
+//std::cout << " [F=" << getCurrentFileName() << std::flush; // DEBUG
- // Read file header
- file_hdr_t fhdr;
- inFileStream_.read((char*)&fhdr, sizeof(fhdr));
- checkFileStreamOk(true);
- if (fhdr._rhdr._magic == QLS_FILE_MAGIC) {
- firstRecordOffset_ = fhdr._fro;
- std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_SBLK_SIZE_BYTES;
- inFileStream_.seekg(foffs);
- } else {
- inFileStream_.close();
- if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
- journalEmptyFlag_ = true;
- }
+ if (!readFileHeader()) {
+ return false;
+ }
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
+ return true;
+}
+
+bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
+ if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
return false;
}
+ inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
+ }
+ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!inFileStream_.good()) {
+ throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "getNextFile");
+ }
+//std::cout << " [.F=" << getCurrentFileName() << std::flush; // DEBUG
+
+ if (!readFileHeader()) {
+ return false;
}
+ std::streamoff foffs = jumpToFirstRecordOffsetFlag ? firstRecordOffset_ : QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
+ inFileStream_.seekg(foffs);
return true;
}
@@ -443,7 +474,7 @@ bool RecoveryManager::getNextRecordHeader()
bool hdr_ok = false;
std::streampos file_pos;
while (!hdr_ok) {
- if (!inFileStream_.is_open()) {
+ if (needNextFile()) {
if (!getNextFile(true)) {
return false;
}
@@ -458,8 +489,10 @@ bool RecoveryManager::getNextRecordHeader()
if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
hdr_ok = true;
} else {
- if (!getNextFile(true)) {
- return false;
+ if (needNextFile()) {
+ if (!getNextFile(true)) {
+ return false;
+ }
}
}
}
@@ -468,6 +501,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_ENQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
enq_rec er;
uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(er, cum_size_read, h, file_pos)) {
@@ -502,6 +539,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_DEQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
deq_rec dr;
uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
@@ -534,6 +575,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_TXA_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
txn_rec ar;
if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
return false;
@@ -558,6 +603,10 @@ bool RecoveryManager::getNextRecordHeader()
case QLS_TXC_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
+ if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
+ endOffset_ = file_pos;
+ return false;
+ }
txn_rec cr;
if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
return false;
@@ -593,8 +642,10 @@ bool RecoveryManager::getNextRecordHeader()
uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
checkFileStreamOk(false);
- if (!getNextFile(false)) {
- return false;
+ if (needNextFile()) {
+ if (!getNextFile(false)) {
+ return false;
+ }
}
}
break;
@@ -611,6 +662,13 @@ bool RecoveryManager::getNextRecordHeader()
return true;
}
+bool RecoveryManager::needNextFile() {
+ if (inFileStream_.is_open()) {
+ return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
+ }
+ return true;
+}
+
void RecoveryManager::readJournalData(char* target,
const std::streamsize readSize) {
std::streamoff bytesRead = 0;
@@ -624,7 +682,9 @@ void RecoveryManager::readJournalData(char* target,
inFileStream_.read(target + bytesRead, readSize - bytesRead);
std::streamoff thisReadSize = inFileStream_.gcount();
if (thisReadSize < readSize) {
- getNextFile(false);
+ if (needNextFile()) {
+ getNextFile(false);
+ }
file_pos = inFileStream_.tellg();
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
@@ -636,6 +696,23 @@ void RecoveryManager::readJournalData(char* target,
}
}
+bool RecoveryManager::readFileHeader() {
+ file_hdr_t fhdr;
+ inFileStream_.read((char*)&fhdr, sizeof(fhdr));
+ checkFileStreamOk(true);
+ if (::file_hdr_check(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, efpFileSize_kib_) != 0) {
+ firstRecordOffset_ = fhdr._fro;
+ currentSerial_ = fhdr._rhdr._serial;
+ } else {
+ inFileStream_.close();
+ if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+ journalEmptyFlag_ = true;
+ }
+ return false;
+ }
+ return true;
+}
+
// static private
void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
::file_hdr_t& fileHeaderRef,
@@ -670,4 +747,4 @@ void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
}
}
-}} // namespace qpid::qls_jrnl
+}}}