diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2014-02-05 18:49:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2014-02-05 18:49:32 +0000 |
| commit | 4c26c985409d4327a537d40e966aee3b2492b285 (patch) | |
| tree | 78f160c3aeeeb818b0b6fe74a9ff4f5b85c8363f /cpp/src/qpid/linearstore/journal/LinearFileController.cpp | |
| parent | 03feca4761cec442ca810a212cfd88278bcc44bb (diff) | |
| download | qpid-python-4c26c985409d4327a537d40e966aee3b2492b285.tar.gz | |
QPID-5480: Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message. Fixed numerous recovery issues, particularly the handling of files at the end of the file list during recovery when the last file is not used or incompletely written.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1564877 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/LinearFileController.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/journal/LinearFileController.cpp | 82 |
1 files changed, 42 insertions, 40 deletions
diff --git a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp index 5483f3bb94..86d1b0e93c 100644 --- a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp +++ b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp @@ -25,8 +25,6 @@ #include "qpid/linearstore/journal/jcntl.h" #include "qpid/linearstore/journal/JournalFile.h" -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { @@ -34,10 +32,10 @@ namespace journal { LinearFileController::LinearFileController(jcntl& jcntlRef) : jcntlRef_(jcntlRef), emptyFilePoolPtr_(0), - currentJournalFilePtr_(0), fileSeqCounter_("LinearFileController::fileSeqCounter", 0), recordIdCounter_("LinearFileController::recordIdCounter", 0), - decrCounter_("LinearFileController::decrCounter", 0) + decrCounter_("LinearFileController::decrCounter", 0), + currentJournalFilePtr_(0) {} LinearFileController::~LinearFileController() {} @@ -53,7 +51,7 @@ void LinearFileController::initialize(const std::string& journalDirectory, void LinearFileController::finalize() { if (currentJournalFilePtr_) { currentJournalFilePtr_->close(); - currentJournalFilePtr_ = NULL; + currentJournalFilePtr_ = 0; } while (!journalFileList_.empty()) { delete journalFileList_.front(); @@ -62,17 +60,21 @@ void LinearFileController::finalize() { } void LinearFileController::addJournalFile(JournalFile* journalFilePtr, - const uint32_t completedDblkCount) { - if (currentJournalFilePtr_) { + const uint32_t completedDblkCount, + const bool makeCurrentFlag) { + if (makeCurrentFlag && currentJournalFilePtr_) { currentJournalFilePtr_->close(); + currentJournalFilePtr_ = 0; } journalFilePtr->initialize(completedDblkCount); - currentJournalFilePtr_ = journalFilePtr; { slock l(journalFileListMutex_); - journalFileList_.push_back(currentJournalFilePtr_); + journalFileList_.push_back(journalFilePtr); + } + if (makeCurrentFlag) { + currentJournalFilePtr_ = journalFilePtr; + currentJournalFilePtr_->open(); } - currentJournalFilePtr_->open(); } efpDataSize_sblks_t LinearFileController::dataSize_sblks() const { @@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController::fileSize_sblks() const { return emptyFilePoolPtr_->fileSize_sblks(); } +void LinearFileController::getNextJournalFile() { + if (currentJournalFilePtr_) + currentJournalFilePtr_->close(); + pullEmptyFileFromEfp(); +} + uint64_t LinearFileController::getNextRecordId() { return recordIdCounter_.increment(); } -void LinearFileController::pullEmptyFileFromEfp() { - if (currentJournalFilePtr_) - currentJournalFilePtr_->close(); - std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name -//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG - addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); +void LinearFileController::removeFileToEfp(const std::string& fileName) { + if (emptyFilePoolPtr_) { + emptyFilePoolPtr_->returnEmptyFile(fileName); + } } void LinearFileController::restoreEmptyFile(const std::string& fileName) { @@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) { void LinearFileController::purgeEmptyFilesToEfp() { slock l(journalFileListMutex_); - purgeEmptyFilesToEfpNoLock(); + while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records + emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); + delete journalFileList_.front(); + journalFileList_.pop_front(); + } } uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { @@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t file } uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount(); // TODO: Re-evaluate after testing and profiling @@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file // records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data // records) without impacting performance or performing badly (leaving excessive empty files in the journals). if (decrCounter_.increment() % 100ULL == 0ULL) { - purgeEmptyFilesToEfpNoLock(); + purgeEmptyFilesToEfp(); } return r; } uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->addCompletedDblkCount(a); } uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->decrOutstandingAioOperationCount(); } @@ -142,12 +149,11 @@ void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr, const uint64_t recordId, const uint64_t firstRecordOffset) { currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr, - emptyFilePoolPtr_->getPartitionNumber(), - emptyFilePoolPtr_->dataSize_kib(), - userFlags, - recordId, - firstRecordOffset, - jcntlRef_.id()); + emptyFilePoolPtr_->getPartitionNumber(), + emptyFilePoolPtr_->dataSize_kib(), + userFlags, + recordId, + firstRecordOffset); } void LinearFileController::asyncPageWrite(io_context_t ioContextPtr, @@ -195,8 +201,8 @@ void LinearFileController::addJournalFile(const std::string& fileName, const efpIdentity_t& efpIdentity, const uint64_t fileNumber, const uint32_t completedDblkCount) { - JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber); - addJournalFile(jfp, completedDblkCount); + JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id()); + addJournalFile(jfp, completedDblkCount, true); } void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const { @@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJournalFileValid() const { return currentJournalFilePtr_ != 0; } -// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) { - if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) + if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) return currentJournalFilePtr_; + + slock l(journalFileListMutex_); for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) { if ((*i)->getFileSeqNum() == fileSeqNumber) { return *i; } } + std::ostringstream oss; oss << "fileSeqNumber=" << fileSeqNumber; throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find"); @@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFileSeqNum() { return fileSeqCounter_.increment(); } -void LinearFileController::purgeEmptyFilesToEfpNoLock() { -//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG - while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && - journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records -//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG - emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); - delete journalFileList_.front(); - journalFileList_.pop_front(); - } +void LinearFileController::pullEmptyFileFromEfp() { + std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name + addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); } }}} |
