summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/LinearFileController.cpp')
-rw-r--r--cpp/src/qpid/linearstore/journal/LinearFileController.cpp82
1 files changed, 42 insertions, 40 deletions
diff --git a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index 5483f3bb94..86d1b0e93c 100644
--- a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -25,8 +25,6 @@
#include "qpid/linearstore/journal/jcntl.h"
#include "qpid/linearstore/journal/JournalFile.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -34,10 +32,10 @@ namespace journal {
LinearFileController::LinearFileController(jcntl& jcntlRef) :
jcntlRef_(jcntlRef),
emptyFilePoolPtr_(0),
- currentJournalFilePtr_(0),
fileSeqCounter_("LinearFileController::fileSeqCounter", 0),
recordIdCounter_("LinearFileController::recordIdCounter", 0),
- decrCounter_("LinearFileController::decrCounter", 0)
+ decrCounter_("LinearFileController::decrCounter", 0),
+ currentJournalFilePtr_(0)
{}
LinearFileController::~LinearFileController() {}
@@ -53,7 +51,7 @@ void LinearFileController::initialize(const std::string& journalDirectory,
void LinearFileController::finalize() {
if (currentJournalFilePtr_) {
currentJournalFilePtr_->close();
- currentJournalFilePtr_ = NULL;
+ currentJournalFilePtr_ = 0;
}
while (!journalFileList_.empty()) {
delete journalFileList_.front();
@@ -62,17 +60,21 @@ void LinearFileController::finalize() {
}
void LinearFileController::addJournalFile(JournalFile* journalFilePtr,
- const uint32_t completedDblkCount) {
- if (currentJournalFilePtr_) {
+ const uint32_t completedDblkCount,
+ const bool makeCurrentFlag) {
+ if (makeCurrentFlag && currentJournalFilePtr_) {
currentJournalFilePtr_->close();
+ currentJournalFilePtr_ = 0;
}
journalFilePtr->initialize(completedDblkCount);
- currentJournalFilePtr_ = journalFilePtr;
{
slock l(journalFileListMutex_);
- journalFileList_.push_back(currentJournalFilePtr_);
+ journalFileList_.push_back(journalFilePtr);
+ }
+ if (makeCurrentFlag) {
+ currentJournalFilePtr_ = journalFilePtr;
+ currentJournalFilePtr_->open();
}
- currentJournalFilePtr_->open();
}
efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
@@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
return emptyFilePoolPtr_->fileSize_sblks();
}
+void LinearFileController::getNextJournalFile() {
+ if (currentJournalFilePtr_)
+ currentJournalFilePtr_->close();
+ pullEmptyFileFromEfp();
+}
+
uint64_t LinearFileController::getNextRecordId() {
return recordIdCounter_.increment();
}
-void LinearFileController::pullEmptyFileFromEfp() {
- if (currentJournalFilePtr_)
- currentJournalFilePtr_->close();
- std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
-//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
- addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
+void LinearFileController::removeFileToEfp(const std::string& fileName) {
+ if (emptyFilePoolPtr_) {
+ emptyFilePoolPtr_->returnEmptyFile(fileName);
+ }
}
void LinearFileController::restoreEmptyFile(const std::string& fileName) {
@@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) {
void LinearFileController::purgeEmptyFilesToEfp() {
slock l(journalFileListMutex_);
- purgeEmptyFilesToEfpNoLock();
+ while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
+ emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
+ }
}
uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
@@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t file
}
uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex_);
uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
// TODO: Re-evaluate after testing and profiling
@@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file
// records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data
// records) without impacting performance or performing badly (leaving excessive empty files in the journals).
if (decrCounter_.increment() % 100ULL == 0ULL) {
- purgeEmptyFilesToEfpNoLock();
+ purgeEmptyFilesToEfp();
}
return r;
}
uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
- slock l(journalFileListMutex_);
return find(fileSeqNumber)->addCompletedDblkCount(a);
}
uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex_);
return find(fileSeqNumber)->decrOutstandingAioOperationCount();
}
@@ -142,12 +149,11 @@ void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
const uint64_t recordId,
const uint64_t firstRecordOffset) {
currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
- emptyFilePoolPtr_->getPartitionNumber(),
- emptyFilePoolPtr_->dataSize_kib(),
- userFlags,
- recordId,
- firstRecordOffset,
- jcntlRef_.id());
+ emptyFilePoolPtr_->getPartitionNumber(),
+ emptyFilePoolPtr_->dataSize_kib(),
+ userFlags,
+ recordId,
+ firstRecordOffset);
}
void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
@@ -195,8 +201,8 @@ void LinearFileController::addJournalFile(const std::string& fileName,
const efpIdentity_t& efpIdentity,
const uint64_t fileNumber,
const uint32_t completedDblkCount) {
- JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber);
- addJournalFile(jfp, completedDblkCount);
+ JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id());
+ addJournalFile(jfp, completedDblkCount, true);
}
void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
@@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJournalFileValid() const {
return currentJournalFilePtr_ != 0;
}
-// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
- if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+ if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
return currentJournalFilePtr_;
+
+ slock l(journalFileListMutex_);
for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
if ((*i)->getFileSeqNum() == fileSeqNumber) {
return *i;
}
}
+
std::ostringstream oss;
oss << "fileSeqNumber=" << fileSeqNumber;
throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
@@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFileSeqNum() {
return fileSeqCounter_.increment();
}
-void LinearFileController::purgeEmptyFilesToEfpNoLock() {
-//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG
- while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() &&
- journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
-//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG
- emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
- delete journalFileList_.front();
- journalFileList_.pop_front();
- }
+void LinearFileController::pullEmptyFileFromEfp() {
+ std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name
+ addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
}
}}}