diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-12-20 18:07:31 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-12-20 18:07:31 +0000 |
| commit | 03c59b8e046edd380485af253bf3962f3feff39d (patch) | |
| tree | 999f331622be3485e2f6a09e4c75108d519b20ed /cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | |
| parent | 44297255a2a408dbb9114395467da6384b9bc012 (diff) | |
| download | qpid-python-03c59b8e046edd380485af253bf3962f3feff39d.tar.gz | |
QPID-5422: DTX test failure, and some tidying up of code in JournalImpl.cpp/h
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1552772 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/RecoveryManager.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | 99 |
1 files changed, 76 insertions, 23 deletions
diff --git a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 66ac7c3a2d..16ca1e0994 100644 --- a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -49,6 +49,18 @@ namespace qpid { namespace linearstore { namespace journal { +RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn) : + recordId_(rid), + fileId_(fid), + fileOffset_(foffs), + pendingTransaction_(ptxn) +{} + + +bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) { + return a.recordId_ < b.recordId_; +} + RecoveryManager::RecoveryManager(const std::string& journalDirectory, const std::string& queuename, enq_map& enqueueMapRef, @@ -86,6 +98,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr if (!journalEmptyFlag_) { // Read all records, establish remaining enqueued records + if (inFileStream_.is_open()) { + inFileStream_.close(); + } while (getNextRecordHeader()) {} if (inFileStream_.is_open()) { inFileStream_.close(); @@ -120,11 +135,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr } } } - - // Set up recordIdList_ from enqueue map - enqueueMapRef_.rid_list(recordIdList_); - - recordIdListConstItr_ = recordIdList_.begin(); + prepareRecordList(); } } @@ -151,37 +162,44 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, bool& transient, bool& external, data_tok* const dtokp, - bool /*ignore_pending_txns*/) { - if (recordIdListConstItr_ == recordIdList_.end()) { - return false; - } - enq_map::emap_data_struct_t eds; - enqueueMapRef_.get_data(*recordIdListConstItr_, eds); - if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) { - getFile(eds._pfid, false); - } -//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG + bool ignore_pending_txns) { + bool foundRecord = false; + do { + if (recordIdListConstItr_ == recordIdList_.end()) { + return false; + } + if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction + ++recordIdListConstItr_; // ignore, go to next record + } else { + foundRecord = true; + } + } while (!foundRecord); - inFileStream_.seekg(eds._file_posn, std::ifstream::beg); + if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) { + if (!getFile(recordIdListConstItr_->fileId_, false)) { + std::ostringstream oss; + oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_; + throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord"); + } + } + inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg); if (!inFileStream_.good()) { std::ostringstream oss; - oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName(); + oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName(); throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord"); } + ::enq_hdr_t enqueueHeader; inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t)); if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) { std::ostringstream oss; - oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn; + oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_; throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord"); } // check flags transient = ::is_enq_transient(&enqueueHeader); external = ::is_enq_external(&enqueueHeader); -//char magicBuff[5]; // DEBUG -//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG -//magicBuff[4] = 0; // DEBUG -//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG + // read xid xidSize = enqueueHeader._xidsize; *xidPtrPtr = ::malloc(xidSize); @@ -386,6 +404,12 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) { } void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) { + if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) { + std::ostringstream oss; + oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition; + oss << " (dblk alignment offset = 0x" << (recordPosition % QLS_DBLK_SIZE_BYTES); + throw jexception(jerrno::JERR_RCVM_NOTDBLKALIGNED, oss.str(), "RecoveryManager", "checkJournalAlignment"); + } std::streampos currentPosn = recordPosition; unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES; if (sblkOffset) @@ -574,7 +598,7 @@ bool RecoveryManager::getNextRecordHeader() throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, er.xid_size()); - transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true)); + transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false /*tpcFlag*/)); if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found std::ostringstream oss; oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid; @@ -725,6 +749,35 @@ bool RecoveryManager::needNextFile() { return true; } +void RecoveryManager::prepareRecordList() { + // Set up recordIdList_ from enqueue map and transaction map + recordIdList_.clear(); + + // Extract records from enqueue list + std::vector<uint64_t> ridList; + enqueueMapRef_.rid_list(ridList); + qpid::linearstore::journal::enq_map::emap_data_struct_t eds; + for (std::vector<uint64_t>::const_iterator i=ridList.begin(); i!=ridList.end(); ++i) { + enqueueMapRef_.get_data(*i, eds); + recordIdList_.push_back(RecoveredRecordData_t(*i, eds._pfid, eds._file_posn, false)); + } + + // Extract records from pending transaction enqueues + std::vector<std::string> xidList; + transactionMapRef_.xid_list(xidList); + for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) { + qpid::linearstore::journal::txn_data_list tdsl = transactionMapRef_.get_tdata_list(*j); + for (qpid::linearstore::journal::tdl_itr k=tdsl.begin(); k!=tdsl.end(); ++k) { + if (k->enq_flag_) { + recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true)); + } + } + } + + std::sort(recordIdList_.begin(), recordIdList_.end(), recordIdListCompare); + recordIdListConstItr_ = recordIdList_.begin(); +} + void RecoveryManager::readJournalData(char* target, const std::streamsize readSize) { std::streamoff bytesRead = 0; |
