summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/RecoveryManager.cpp')
-rw-r--r--cpp/src/qpid/linearstore/journal/RecoveryManager.cpp195
1 files changed, 123 insertions, 72 deletions
diff --git a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 72308cc929..a1cec53ca1 100644
--- a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -56,11 +56,15 @@ RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t
pendingTransaction_(ptxn)
{}
-
bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
return a.recordId_ < b.recordId_;
}
+RecoveredFileData_t::RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount) :
+ journalFilePtr_(journalFilePtr),
+ completedDblkCount_(completedDblkCount)
+{}
+
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
enq_map& enqueueMapRef,
@@ -77,11 +81,17 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory,
highestRecordId_(0ULL),
highestFileNumber_(0ULL),
lastFileFullFlag_(false),
+ initial_fid_(0),
currentSerial_(0),
efpFileSize_kib_(0)
{}
-RecoveryManager::~RecoveryManager() {}
+RecoveryManager::~RecoveryManager() {
+ for (fileNumberMapItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
+ delete i->second;
+ }
+ fileNumberMap_.clear();
+}
void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
EmptyFilePoolManager* emptyFilePoolManager,
@@ -92,9 +102,6 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
*emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
- // Check for file full condition
- lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
-
if (!journalEmptyFlag_) {
// Read all records, establish remaining enqueued records
@@ -106,6 +113,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
inFileStream_.close();
}
+ // Check for file full condition
+ lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
// Remove leading files which have no enqueued records
removeEmptyFiles(*emptyFilePoolPtrPtr);
@@ -121,7 +131,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
// Unlock any affected enqueues in emap
for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
if (i->enq_flag_) { // enq op - decrement enqueue count
- fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
+ fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
// enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
@@ -174,7 +184,7 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
}
} while (!foundRecord);
- if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+ if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) {
if (!getFile(recordIdListConstItr_->fileId_, false)) {
std::ostringstream oss;
oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
@@ -231,7 +241,6 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
::rec_tail_t enqueueTail;
inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
uint32_t cs = checksum.getChecksum();
-//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
if (res != 0) {
std::stringstream oss;
@@ -266,17 +275,30 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr) {
if (journalEmptyFlag_) {
- if (uninitializedJournal_.size() > 0) {
- lfcPtr->restoreEmptyFile(uninitializedJournal_);
+ if (uninitFileList_.size() > 0) {
+ std::string uninitFile = uninitFileList_.back();
+ uninitFileList_.pop_back();
+ lfcPtr->restoreEmptyFile(uninitFile);
}
} else {
for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
- uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file?
- endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset
- efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
- (lfcPtr->*fnPtr)(i->second, fileDblkCount);
+ (lfcPtr->*fnPtr)(i->second->journalFilePtr_, i->second->completedDblkCount_, i->first == initial_fid_);
}
}
+
+ std::ostringstream oss;
+ bool logFlag = !notNeededFilesList_.empty();
+ if (logFlag) {
+ oss << "Files removed from head of journal: prior truncation during recovery:";
+ }
+ while (!notNeededFilesList_.empty()) {
+ lfcPtr->removeFileToEfp(notNeededFilesList_.back());
+ oss << std::endl << " * " << notNeededFilesList_.back();
+ notNeededFilesList_.pop_back();
+ }
+ if (logFlag) {
+ journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
+ }
}
std::string RecoveryManager::toString(const std::string& jid) {
@@ -285,7 +307,7 @@ std::string RecoveryManager::toString(const std::string& jid) {
oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
oss << " Journal File List:" << std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->getFqFileName();
+ std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
}
oss << " Enqueue Counts: [ ";
@@ -293,7 +315,7 @@ std::string RecoveryManager::toString(const std::string& jid) {
if (l != fileNumberMap_.begin()) {
oss << ", ";
}
- oss << l->second->getEnqueuedRecordCount();
+ oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
}
oss << " ]" << std::endl;
oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
@@ -330,15 +352,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
<< std::setw(10) << "--------"
<< std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->getFqFileName();
+ std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
+ std::ostringstream fid;
+ fid << std::hex << "0x" << k->first;
std::ostringstream fro;
- fro << std::hex << "0x" << k->second->getFirstRecordOffset();
- oss << indentStr << std::setw(7) << k->first
+ fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset();
+ oss << indentStr << std::setw(7) << fid.str()
<< std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
<< std::setw(16) << fro.str()
- << std::setw(12) << k->second->getEnqueuedRecordCount()
- << std::setw(5) << k->second->getEfpIdentity().pn_
- << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+ << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount()
+ << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_
+ << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
<< std::endl;
}
oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
@@ -347,7 +371,7 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << indentStr << "Enqueued records (txn & non-txn):";
+ //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report
}
return oss.str();
}
@@ -357,27 +381,28 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
std::string headerQueueName;
::file_hdr_t fileHeader;
- directoryList_t directoryList;
+ stringList_t directoryList;
jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
- for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+ for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
readJournalFileHeader(*i, fileHeader, headerQueueName);
if (headerQueueName.empty()) {
std::ostringstream oss;
- if (uninitializedJournal_.empty()) {
- oss << "Journal file " << (*i) << " is first uninitialized (not yet written) journal file.";
- journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
- uninitializedJournal_ = *i;
- } else {
- oss << "Journal file " << (*i) << " is second or greater uninitialized journal file - ignoring";
- journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
- }
+ oss << "Journal file " << (*i) << " is uninitialized";
+ journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+ uninitFileList_.push_back(*i);
} else if (headerQueueName.compare(queueName_) != 0) {
std::ostringstream oss;
oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
} else {
- JournalFile* jfp = new JournalFile(*i, fileHeader);
- fileNumberMap_[fileHeader._file_number] = jfp;
+ JournalFile* jfp = new JournalFile(*i, fileHeader, queueName_);
+ std::pair<fileNumberMapItr_t, bool> res = fileNumberMap_.insert(
+ std::pair<uint64_t, RecoveredFileData_t*>(fileHeader._file_number, new RecoveredFileData_t(jfp, 0)));
+ if (!res.second) {
+ std::ostringstream oss;
+ oss << "Journal file " << (*i) << " has fid=0x" << std::hex << jfp->getFileSeqNum() << " which already exists for this journal.";
+ throw jexception(oss.str()); // TODO: complete this exception
+ }
if (fileHeader._file_number > highestFileNumber_) {
highestFileNumber_ = fileHeader._file_number;
}
@@ -393,7 +418,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
if (fileNumberMap_.empty()) {
journalEmptyFlag_ = true;
} else {
- currentJournalFileConstItr_ = fileNumberMap_.begin();
+ currentJournalFileItr_ = fileNumberMap_.begin();
}
}
@@ -408,7 +433,7 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) {
}
}
-void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+void RecoveryManager::checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition) {
if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
std::ostringstream oss;
oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
@@ -420,12 +445,13 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
if (sblkOffset)
{
std::ostringstream oss1;
- oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+ oss1 << std::hex << "Bad record alignment found at fid=0x" << start_fid;
oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
- std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ fileNumberMapConstItr_t fnmItr = fileNumberMap_.find(start_fid);
+ std::ofstream outFileStream(fnmItr->second->journalFilePtr_->getFqFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
if (!outFileStream.good()) {
throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
}
@@ -447,7 +473,7 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
}
std::ostringstream oss2;
- oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+ oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << start_fid;
oss2 << " offs=0x" << currentPosn;
journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
currentPosn = outFileStream.tellp();
@@ -456,16 +482,15 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
std::free(writeBuffer);
journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
}
- endOffset_ = currentPosn;
+ lastRecord(start_fid, currentPosn);
}
bool RecoveryManager::decodeRecord(jrec& record,
std::size_t& cumulativeSizeRead,
::rec_hdr_t& headerRecord,
- std::streampos& fileOffset)
+ const uint64_t start_fid,
+ const std::streampos recordOffset)
{
- std::streampos start_file_offs = fileOffset;
-
if (highestRecordId_ == 0) {
highestRecordId_ = headerRecord._rid;
} else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
@@ -475,7 +500,7 @@ bool RecoveryManager::decodeRecord(jrec& record,
bool done = false;
while (!done) {
try {
- done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead, recordOffset);
}
catch (const jexception& e) {
if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
@@ -485,11 +510,12 @@ bool RecoveryManager::decodeRecord(jrec& record,
} else {
journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
}
- checkJournalAlignment(start_file_offs);
+ checkJournalAlignment(start_fid, recordOffset);
return false;
}
if (!done && needNextFile()) {
if (!getNextFile(false)) {
+ checkJournalAlignment(start_fid, recordOffset);
return false;
}
}
@@ -498,11 +524,11 @@ bool RecoveryManager::decodeRecord(jrec& record,
}
std::string RecoveryManager::getCurrentFileName() const {
- return currentJournalFileConstItr_->second->getFqFileName();
+ return currentJournalFileItr_->second->journalFilePtr_->getFqFileName();
}
uint64_t RecoveryManager::getCurrentFileNumber() const {
- return currentJournalFileConstItr_->first;
+ return currentJournalFileItr_->first;
}
bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
@@ -511,8 +537,8 @@ bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordO
//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
}
- currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
- if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ currentJournalFileItr_ = fileNumberMap_.find(fileNumber);
+ if (currentJournalFileItr_ == fileNumberMap_.end()) {
return false;
}
inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
@@ -536,7 +562,8 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
if (inFileStream_.is_open()) {
inFileStream_.close();
//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
- if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ currentJournalFileItr_->second->completedDblkCount_ = efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES;
+ if (++currentJournalFileItr_ == fileNumberMap_.end()) {
return false;
}
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
@@ -562,13 +589,15 @@ bool RecoveryManager::getNextRecordHeader()
rec_hdr_t h;
bool hdr_ok = false;
- std::streampos file_pos;
+ uint64_t file_id = 0;
+ std::streampos file_pos = 0;
while (!hdr_ok) {
if (needNextFile()) {
if (!getNextFile(true)) {
return false;
}
}
+ file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
file_pos = inFileStream_.tellg();
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
@@ -587,21 +616,21 @@ bool RecoveryManager::getNextRecordHeader()
}
}
+ uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
switch(h._magic) {
case QLS_ENQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
enq_rec er;
- uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
- if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (!er.is_transient()) { // Ignore transient msgs
- fileNumberMap_[start_fid]->incrEnqueuedRecordCount();
+ fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount();
if (er.xid_size()) {
er.get_xid(&xidp);
if (xidp == 0) {
@@ -629,12 +658,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
deq_rec dr;
- uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
- if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (dr.xid_size()) {
@@ -655,7 +683,7 @@ bool RecoveryManager::getNextRecordHeader()
} else {
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
- fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+ fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
@@ -664,11 +692,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
txn_rec ar;
- if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, unlock any locked records in emap
@@ -680,7 +708,7 @@ bool RecoveryManager::getNextRecordHeader()
txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) {
- fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
+ fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else {
enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
@@ -691,11 +719,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
txn_rec cr;
- if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, process records into emap
@@ -717,7 +745,7 @@ bool RecoveryManager::getNextRecordHeader()
} else { // txn dequeue
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
- fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+ fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
@@ -729,7 +757,9 @@ bool RecoveryManager::getNextRecordHeader()
inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
checkFileStreamOk(false);
if (needNextFile()) {
+ file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES;
if (!getNextFile(false)) {
+ lastRecord(start_fid, file_pos);
return false;
}
}
@@ -737,17 +767,36 @@ bool RecoveryManager::getNextRecordHeader()
break;
case 0:
//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
- checkJournalAlignment(file_pos);
+ checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
default:
//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
// Stop as this is the overwrite boundary.
- checkJournalAlignment(file_pos);
+ checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
}
return true;
}
+void RecoveryManager::lastRecord(const uint64_t file_id, const std::streamoff endOffset) {
+ endOffset_ = endOffset;
+ initial_fid_ = file_id;
+ fileNumberMap_[file_id]->completedDblkCount_ = endOffset_ / QLS_DBLK_SIZE_BYTES;
+
+ // Remove any files in fileNumberMap_ beyond initial_fid_
+ fileNumberMapItr_t unwantedFirstItr = fileNumberMap_.find(file_id);
+ if (++unwantedFirstItr != fileNumberMap_.end()) {
+ fileNumberMapItr_t itr = unwantedFirstItr;
+ notNeededFilesList_.push_back(unwantedFirstItr->second->journalFilePtr_->getFqFileName());
+ while (++itr != fileNumberMap_.end()) {
+ notNeededFilesList_.push_back(itr->second->journalFilePtr_->getFqFileName());
+ delete itr->second->journalFilePtr_;
+ delete itr->second;
+ }
+ fileNumberMap_.erase(unwantedFirstItr, fileNumberMap_.end());
+ }
+}
+
bool RecoveryManager::needNextFile() {
if (inFileStream_.is_open()) {
return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
@@ -820,7 +869,7 @@ bool RecoveryManager::readFileHeader() {
currentSerial_ = fhdr._rhdr._serial;
} else {
inFileStream_.close();
- if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+ if (currentJournalFileItr_ == fileNumberMap_.begin()) {
journalEmptyFlag_ = true;
}
return false;
@@ -855,9 +904,11 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
}
void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
- while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
-//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG
- emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName());
+ while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
+ RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
+ emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+ delete rfdp->journalFilePtr_;
+ delete rfdp;
fileNumberMap_.erase(fileNumberMap_.begin()->first);
}
}