diff options
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/LinearFileController.cpp')
-rw-r--r-- | cpp/src/qpid/linearstore/journal/LinearFileController.cpp | 82 |
1 files changed, 42 insertions, 40 deletions
diff --git a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp index 5483f3bb94..86d1b0e93c 100644 --- a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp +++ b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp @@ -25,8 +25,6 @@ #include "qpid/linearstore/journal/jcntl.h" #include "qpid/linearstore/journal/JournalFile.h" -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { @@ -34,10 +32,10 @@ namespace journal { LinearFileController::LinearFileController(jcntl& jcntlRef) : jcntlRef_(jcntlRef), emptyFilePoolPtr_(0), - currentJournalFilePtr_(0), fileSeqCounter_("LinearFileController::fileSeqCounter", 0), recordIdCounter_("LinearFileController::recordIdCounter", 0), - decrCounter_("LinearFileController::decrCounter", 0) + decrCounter_("LinearFileController::decrCounter", 0), + currentJournalFilePtr_(0) {} LinearFileController::~LinearFileController() {} @@ -53,7 +51,7 @@ void LinearFileController::initialize(const std::string& journalDirectory, void LinearFileController::finalize() { if (currentJournalFilePtr_) { currentJournalFilePtr_->close(); - currentJournalFilePtr_ = NULL; + currentJournalFilePtr_ = 0; } while (!journalFileList_.empty()) { delete journalFileList_.front(); @@ -62,17 +60,21 @@ void LinearFileController::finalize() { } void LinearFileController::addJournalFile(JournalFile* journalFilePtr, - const uint32_t completedDblkCount) { - if (currentJournalFilePtr_) { + const uint32_t completedDblkCount, + const bool makeCurrentFlag) { + if (makeCurrentFlag && currentJournalFilePtr_) { currentJournalFilePtr_->close(); + currentJournalFilePtr_ = 0; } journalFilePtr->initialize(completedDblkCount); - currentJournalFilePtr_ = journalFilePtr; { slock l(journalFileListMutex_); - journalFileList_.push_back(currentJournalFilePtr_); + journalFileList_.push_back(journalFilePtr); + } + if (makeCurrentFlag) { + currentJournalFilePtr_ = journalFilePtr; + currentJournalFilePtr_->open(); } - currentJournalFilePtr_->open(); } efpDataSize_sblks_t LinearFileController::dataSize_sblks() const { @@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController::fileSize_sblks() const { return emptyFilePoolPtr_->fileSize_sblks(); } +void LinearFileController::getNextJournalFile() { + if (currentJournalFilePtr_) + currentJournalFilePtr_->close(); + pullEmptyFileFromEfp(); +} + uint64_t LinearFileController::getNextRecordId() { return recordIdCounter_.increment(); } -void LinearFileController::pullEmptyFileFromEfp() { - if (currentJournalFilePtr_) - currentJournalFilePtr_->close(); - std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name -//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG - addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); +void LinearFileController::removeFileToEfp(const std::string& fileName) { + if (emptyFilePoolPtr_) { + emptyFilePoolPtr_->returnEmptyFile(fileName); + } } void LinearFileController::restoreEmptyFile(const std::string& fileName) { @@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) { void LinearFileController::purgeEmptyFilesToEfp() { slock l(journalFileListMutex_); - purgeEmptyFilesToEfpNoLock(); + while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records + emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); + delete journalFileList_.front(); + journalFileList_.pop_front(); + } } uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { @@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t file } uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount(); // TODO: Re-evaluate after testing and profiling @@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file // records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data // records) without impacting performance or performing badly (leaving excessive empty files in the journals). if (decrCounter_.increment() % 100ULL == 0ULL) { - purgeEmptyFilesToEfpNoLock(); + purgeEmptyFilesToEfp(); } return r; } uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->addCompletedDblkCount(a); } uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->decrOutstandingAioOperationCount(); } @@ -142,12 +149,11 @@ void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr, const uint64_t recordId, const uint64_t firstRecordOffset) { currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr, - emptyFilePoolPtr_->getPartitionNumber(), - emptyFilePoolPtr_->dataSize_kib(), - userFlags, - recordId, - firstRecordOffset, - jcntlRef_.id()); + emptyFilePoolPtr_->getPartitionNumber(), + emptyFilePoolPtr_->dataSize_kib(), + userFlags, + recordId, + firstRecordOffset); } void LinearFileController::asyncPageWrite(io_context_t ioContextPtr, @@ -195,8 +201,8 @@ void LinearFileController::addJournalFile(const std::string& fileName, const efpIdentity_t& efpIdentity, const uint64_t fileNumber, const uint32_t completedDblkCount) { - JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber); - addJournalFile(jfp, completedDblkCount); + JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id()); + addJournalFile(jfp, completedDblkCount, true); } void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const { @@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJournalFileValid() const { return currentJournalFilePtr_ != 0; } -// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) { - if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) + if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) return currentJournalFilePtr_; + + slock l(journalFileListMutex_); for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) { if ((*i)->getFileSeqNum() == fileSeqNumber) { return *i; } } + std::ostringstream oss; oss << "fileSeqNumber=" << fileSeqNumber; throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find"); @@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFileSeqNum() { return fileSeqCounter_.increment(); } -void LinearFileController::purgeEmptyFilesToEfpNoLock() { -//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG - while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && - journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records -//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG - emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); - delete journalFileList_.front(); - journalFileList_.pop_front(); - } +void LinearFileController::pullEmptyFileFromEfp() { + std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name + addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); } }}} |