summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-12-20 18:07:31 +0000
committerKim van der Riet <kpvdr@apache.org>2013-12-20 18:07:31 +0000
commit03c59b8e046edd380485af253bf3962f3feff39d (patch)
tree999f331622be3485e2f6a09e4c75108d519b20ed /cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
parent44297255a2a408dbb9114395467da6384b9bc012 (diff)
downloadqpid-python-03c59b8e046edd380485af253bf3962f3feff39d.tar.gz
QPID-5422: DTX test failure, and some tidying up of code in JournalImpl.cpp/h
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1552772 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/RecoveryManager.cpp')
-rw-r--r--cpp/src/qpid/linearstore/journal/RecoveryManager.cpp99
1 files changed, 76 insertions, 23 deletions
diff --git a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 66ac7c3a2d..16ca1e0994 100644
--- a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -49,6 +49,18 @@ namespace qpid {
namespace linearstore {
namespace journal {
+RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn) :
+ recordId_(rid),
+ fileId_(fid),
+ fileOffset_(foffs),
+ pendingTransaction_(ptxn)
+{}
+
+
+bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
+ return a.recordId_ < b.recordId_;
+}
+
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
enq_map& enqueueMapRef,
@@ -86,6 +98,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
if (!journalEmptyFlag_) {
// Read all records, establish remaining enqueued records
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+ }
while (getNextRecordHeader()) {}
if (inFileStream_.is_open()) {
inFileStream_.close();
@@ -120,11 +135,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
}
}
}
-
- // Set up recordIdList_ from enqueue map
- enqueueMapRef_.rid_list(recordIdList_);
-
- recordIdListConstItr_ = recordIdList_.begin();
+ prepareRecordList();
}
}
@@ -151,37 +162,44 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
bool& transient,
bool& external,
data_tok* const dtokp,
- bool /*ignore_pending_txns*/) {
- if (recordIdListConstItr_ == recordIdList_.end()) {
- return false;
- }
- enq_map::emap_data_struct_t eds;
- enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
- if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
- getFile(eds._pfid, false);
- }
-//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
+ bool ignore_pending_txns) {
+ bool foundRecord = false;
+ do {
+ if (recordIdListConstItr_ == recordIdList_.end()) {
+ return false;
+ }
+ if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction
+ ++recordIdListConstItr_; // ignore, go to next record
+ } else {
+ foundRecord = true;
+ }
+ } while (!foundRecord);
- inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+ if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+ if (!getFile(recordIdListConstItr_->fileId_, false)) {
+ std::ostringstream oss;
+ oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
+ throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ }
+ inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg);
if (!inFileStream_.good()) {
std::ostringstream oss;
- oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+ oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName();
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
+
::enq_hdr_t enqueueHeader;
inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
std::ostringstream oss;
- oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+ oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_;
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
// check flags
transient = ::is_enq_transient(&enqueueHeader);
external = ::is_enq_external(&enqueueHeader);
-//char magicBuff[5]; // DEBUG
-//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
-//magicBuff[4] = 0; // DEBUG
-//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
+
// read xid
xidSize = enqueueHeader._xidsize;
*xidPtrPtr = ::malloc(xidSize);
@@ -386,6 +404,12 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) {
}
void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+ if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
+ std::ostringstream oss;
+ oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
+ oss << " (dblk alignment offset = 0x" << (recordPosition % QLS_DBLK_SIZE_BYTES);
+ throw jexception(jerrno::JERR_RCVM_NOTDBLKALIGNED, oss.str(), "RecoveryManager", "checkJournalAlignment");
+ }
std::streampos currentPosn = recordPosition;
unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
if (sblkOffset)
@@ -574,7 +598,7 @@ bool RecoveryManager::getNextRecordHeader()
throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, er.xid_size());
- transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true));
+ transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false /*tpcFlag*/));
if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
@@ -725,6 +749,35 @@ bool RecoveryManager::needNextFile() {
return true;
}
+void RecoveryManager::prepareRecordList() {
+ // Set up recordIdList_ from enqueue map and transaction map
+ recordIdList_.clear();
+
+ // Extract records from enqueue list
+ std::vector<uint64_t> ridList;
+ enqueueMapRef_.rid_list(ridList);
+ qpid::linearstore::journal::enq_map::emap_data_struct_t eds;
+ for (std::vector<uint64_t>::const_iterator i=ridList.begin(); i!=ridList.end(); ++i) {
+ enqueueMapRef_.get_data(*i, eds);
+ recordIdList_.push_back(RecoveredRecordData_t(*i, eds._pfid, eds._file_posn, false));
+ }
+
+ // Extract records from pending transaction enqueues
+ std::vector<std::string> xidList;
+ transactionMapRef_.xid_list(xidList);
+ for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) {
+ qpid::linearstore::journal::txn_data_list tdsl = transactionMapRef_.get_tdata_list(*j);
+ for (qpid::linearstore::journal::tdl_itr k=tdsl.begin(); k!=tdsl.end(); ++k) {
+ if (k->enq_flag_) {
+ recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
+ }
+ }
+ }
+
+ std::sort(recordIdList_.begin(), recordIdList_.end(), recordIdListCompare);
+ recordIdListConstItr_ = recordIdList_.begin();
+}
+
void RecoveryManager::readJournalData(char* target,
const std::streamsize readSize) {
std::streamoff bytesRead = 0;