summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp')
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp276
1 files changed, 136 insertions, 140 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
index af953aa4ef..e02e965823 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
@@ -31,21 +31,21 @@
namespace qpid {
namespace qls_jrnl {
-JournalFile::JournalFile(const std::string& fqFileName_,
- const uint64_t fileSeqNum_,
- const uint32_t fileSize_kib_) :
- fqFileName(fqFileName_),
- fileSeqNum(fileSeqNum_),
- fileHandle(-1),
- fileCloseFlag(false),
- fileHeaderBasePtr (0),
- fileHeaderPtr(0),
- aioControlBlockPtr(0),
- fileSizeDblks(((fileSize_kib_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES)) / JRNL_DBLK_SIZE_BYTES),
- enqueuedRecordCount(0),
- submittedDblkCount(0),
- completedDblkCount(0),
- outstandingAioOpsCount(0)
+JournalFile::JournalFile(const std::string& fqFileName,
+ const uint64_t fileSeqNum,
+ const efpDataSize_kib_t efpDataSize_kib) :
+ fqFileName_(fqFileName),
+ fileSeqNum_(fileSeqNum),
+ fileHandle_(-1),
+ fileCloseFlag_(false),
+ fileHeaderBasePtr_ (0),
+ fileHeaderPtr_(0),
+ aioControlBlockPtr_(0),
+ fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ enqueuedRecordCount_(0),
+ submittedDblkCount_(0),
+ completedDblkCount_(0),
+ outstandingAioOpsCount_(0)
{}
JournalFile::~JournalFile() {
@@ -53,227 +53,223 @@ JournalFile::~JournalFile() {
}
void
-JournalFile::initialize() {
- if (::posix_memalign(&fileHeaderBasePtr, QLS_AIO_ALIGN_BOUNDARY, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024))
+JournalFile::initialize(const uint32_t completedDblkCount) {
+ if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
{
std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024);
+ oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
}
- fileHeaderPtr = (::file_hdr_t*)fileHeaderBasePtr;
- aioControlBlockPtr = new aio_cb;
+ fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
+ aioControlBlockPtr_ = new aio_cb;
+ if (completedDblkCount > 0UL) {
+ submittedDblkCount_.add(completedDblkCount);
+ completedDblkCount_.add(completedDblkCount);
+ }
}
void
JournalFile::finalize() {
- if (fileHeaderBasePtr != 0) {
- std::free(fileHeaderBasePtr);
- fileHeaderBasePtr = 0;
- fileHeaderPtr = 0;
+ if (fileHeaderBasePtr_ != 0) {
+ std::free(fileHeaderBasePtr_);
+ fileHeaderBasePtr_ = 0;
+ fileHeaderPtr_ = 0;
}
- if (aioControlBlockPtr != 0) {
- std::free(aioControlBlockPtr);
- aioControlBlockPtr = 0;
+ if (aioControlBlockPtr_ != 0) {
+ delete(aioControlBlockPtr_);
+ aioControlBlockPtr_ = 0;
}
}
-const std::string
-JournalFile::getDirectory() const {
- return fqFileName.substr(0, fqFileName.rfind('/'));
+const std::string JournalFile::getDirectory() const {
+ return fqFileName_.substr(0, fqFileName_.rfind('/'));
+}
+
+const std::string JournalFile::getFileName() const {
+ return fqFileName_.substr(fqFileName_.rfind('/')+1);
}
-const std::string
-JournalFile::getFileName() const {
- return fqFileName.substr(fqFileName.rfind('/')+1);
+const std::string JournalFile::getFqFileName() const {
+ return fqFileName_;
}
-const std::string
-JournalFile::getFqFileName() const {
- return fqFileName;
+uint64_t JournalFile::getFileSeqNum() const {
+ return fileSeqNum_;
}
-uint64_t
-JournalFile::getFileSeqNum() const {
- return fileSeqNum;
+bool JournalFile::isOpen() const {
+ return fileHandle_ >= 0;
}
-int
-JournalFile::open() {
- fileHandle = ::open(fqFileName.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
- if (fileHandle < 0) {
+int JournalFile::open() {
+ fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
+ if (fileHandle_ < 0) {
std::ostringstream oss;
- oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+ oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JNLF_OPEN, oss.str(), "JournalFile", "open");
}
- return fileHandle;
+ return fileHandle_;
}
-bool
-JournalFile::isOpen() const {
- return fileHandle >= 0;
-}
-
-void
-JournalFile::close() {
- if (fileHandle >= 0) {
+void JournalFile::close() {
+ if (fileHandle_ >= 0) {
if (getOutstandingAioDblks()) {
- fileCloseFlag = true; // Close later when all outstanding AIOs have returned
+ fileCloseFlag_ = true; // Close later when all outstanding AIOs have returned
} else {
- int res = ::close(fileHandle);
- fileHandle = -1;
+ int res = ::close(fileHandle_);
+ fileHandle_ = -1;
if (res != 0) {
std::ostringstream oss;
- oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+ oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JNLF_CLOSE, oss.str(), "JournalFile", "open");
}
}
}
}
-void
-JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr_,
- const efpPartitionNumber_t efpPartitionNumber_,
- const efpDataSize_kib_t efpDataSize_kib_,
- const uint16_t userFlags_,
- const uint64_t recordId_,
- const uint64_t firstRecordOffset_,
- const std::string queueName_) {
- ::file_hdr_create(fileHeaderPtr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber_, efpDataSize_kib_);
- ::file_hdr_init(fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, userFlags_, recordId_, firstRecordOffset_, fileSeqNum, queueName_.size(), queueName_.data());
- aio::prep_pwrite(aioControlBlockPtr, fileHandle, (void*)fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, 0UL);
- if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr) < 0)
+void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
+ const efpPartitionNumber_t efpPartitionNumber,
+ const efpDataSize_kib_t efpDataSize_kib,
+ const uint16_t userFlags,
+ const uint64_t recordId,
+ const uint64_t firstRecordOffset,
+ const std::string queueName) {
+ ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
+ ::file_hdr_init(fileHeaderBasePtr_,
+ QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
+ userFlags,
+ recordId,
+ firstRecordOffset,
+ fileSeqNum_,
+ queueName.size(),
+ queueName.data());
+ aio::prep_pwrite(aioControlBlockPtr_,
+ fileHandle_,
+ (void*)fileHeaderBasePtr_,
+ QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
+ 0UL);
+ if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0)
throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
- addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+ addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
incrOutstandingAioOperationCount();
}
-void
-JournalFile::asyncPageWrite(io_context_t ioContextPtr_,
- aio_cb* aioControlBlockPtr_,
- void* data_,
- uint32_t dataSize_dblks_) {
- aio::prep_pwrite_2(aioControlBlockPtr_, fileHandle, data_, dataSize_dblks_ * JRNL_DBLK_SIZE_BYTES, submittedDblkCount.get() * JRNL_DBLK_SIZE_BYTES);
- pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr_->data); // This page's control block (pcb)
- pcbp->_wdblks = dataSize_dblks_;
+void JournalFile::asyncPageWrite(io_context_t ioContextPtr,
+ aio_cb* aioControlBlockPtr,
+ void* data,
+ uint32_t dataSize_dblks) {
+ aio::prep_pwrite_2(aioControlBlockPtr,
+ fileHandle_,
+ data,
+ dataSize_dblks * QLS_DBLK_SIZE_BYTES,
+ submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES);
+ pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb)
+ pcbp->_wdblks = dataSize_dblks;
pcbp->_jfp = this;
- if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr_) < 0)
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
- addSubmittedDblkCount(dataSize_dblks_);
+ if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) {
+ throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details
+ }
+ addSubmittedDblkCount(dataSize_dblks);
incrOutstandingAioOperationCount();
}
-uint32_t
-JournalFile::getEnqueuedRecordCount() const {
- return enqueuedRecordCount.get();
+uint32_t JournalFile::getEnqueuedRecordCount() const {
+ return enqueuedRecordCount_.get();
}
-uint32_t
-JournalFile::incrEnqueuedRecordCount() {
- return enqueuedRecordCount.increment();
+uint32_t JournalFile::incrEnqueuedRecordCount() {
+ return enqueuedRecordCount_.increment();
}
-uint32_t
-JournalFile::addEnqueuedRecordCount(const uint32_t a) {
- return enqueuedRecordCount.add(a);
+uint32_t JournalFile::addEnqueuedRecordCount(const uint32_t a) {
+ return enqueuedRecordCount_.add(a);
}
-uint32_t
-JournalFile::decrEnqueuedRecordCount() {
- return enqueuedRecordCount.decrementLimit();
+uint32_t JournalFile::decrEnqueuedRecordCount() {
+ return enqueuedRecordCount_.decrementLimit();
}
-uint32_t
-JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
- return enqueuedRecordCount.subtractLimit(s);
+uint32_t JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
+ return enqueuedRecordCount_.subtractLimit(s);
}
-uint32_t
-JournalFile::getSubmittedDblkCount() const {
- return submittedDblkCount.get();
+uint32_t JournalFile::getSubmittedDblkCount() const {
+ return submittedDblkCount_.get();
}
-uint32_t
-JournalFile::addSubmittedDblkCount(const uint32_t a) {
- return submittedDblkCount.addLimit(a, fileSizeDblks, jerrno::JERR_JNLF_FILEOFFSOVFL);
+uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) {
+ return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL);
}
-uint32_t
-JournalFile::getCompletedDblkCount() const {
- return completedDblkCount.get();
+uint32_t JournalFile::getCompletedDblkCount() const {
+ return completedDblkCount_.get();
}
-uint32_t
-JournalFile::addCompletedDblkCount(const uint32_t a) {
- return completedDblkCount.addLimit(a, submittedDblkCount.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
+uint32_t JournalFile::addCompletedDblkCount(const uint32_t a) {
+ return completedDblkCount_.addLimit(a, submittedDblkCount_.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
}
uint16_t JournalFile::getOutstandingAioOperationCount() const {
- return outstandingAioOpsCount.get();
+ return outstandingAioOpsCount_.get();
}
uint16_t JournalFile::incrOutstandingAioOperationCount() {
- return outstandingAioOpsCount.increment();
+ return outstandingAioOpsCount_.increment();
}
uint16_t JournalFile::decrOutstandingAioOperationCount() {
- uint16_t r = outstandingAioOpsCount.decrementLimit();
- if (fileCloseFlag && outstandingAioOpsCount == 0) { // Delayed close
+ uint16_t r = outstandingAioOpsCount_.decrementLimit();
+ if (fileCloseFlag_ && outstandingAioOpsCount_ == 0) { // Delayed close
close();
}
return r;
}
-bool
-JournalFile::isEmpty() const {
- return submittedDblkCount == 0;
+// --- Status helper functions ---
+
+bool JournalFile::isEmpty() const {
+ return submittedDblkCount_ == 0;
}
-bool
-JournalFile::isDataEmpty() const {
- return submittedDblkCount <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS;
+bool JournalFile::isDataEmpty() const {
+ return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS;
}
-u_int32_t
-JournalFile::dblksRemaining() const {
- return fileSizeDblks - submittedDblkCount;
+u_int32_t JournalFile::dblksRemaining() const {
+ return fileSize_dblks_ - submittedDblkCount_;
}
-bool
-JournalFile::isFull() const {
- return submittedDblkCount == fileSizeDblks;
+bool JournalFile::isFull() const {
+ return submittedDblkCount_ == fileSize_dblks_;
}
-bool
-JournalFile::isFullAndComplete() const {
- return completedDblkCount == fileSizeDblks;
+bool JournalFile::isFullAndComplete() const {
+ return completedDblkCount_ == fileSize_dblks_;
}
-u_int32_t
-JournalFile::getOutstandingAioDblks() const {
- return submittedDblkCount - completedDblkCount;
+u_int32_t JournalFile::getOutstandingAioDblks() const {
+ return submittedDblkCount_ - completedDblkCount_;
}
-bool
-JournalFile::getNextFile() const {
+bool JournalFile::getNextFile() const {
return isFull();
}
-bool
-JournalFile::isNoEnqueuedRecordsRemaining() const {
+bool JournalFile::isNoEnqueuedRecordsRemaining() const {
return !isDataEmpty() && // Must be written to, not empty
- enqueuedRecordCount == 0; // No remaining enqueued records
+ enqueuedRecordCount_ == 0; // No remaining enqueued records
}
-const std::string
-JournalFile::status_str(const uint8_t indentDepth_) const {
- std::string indent((size_t)indentDepth_, '.');
+// debug aid
+const std::string JournalFile::status_str(const uint8_t indentDepth) const {
+ std::string indent((size_t)indentDepth, '.');
std::ostringstream oss;
oss << indent << "JournalFile: fileName=" << getFileName() << std::endl;
oss << indent << " directory=" << getDirectory() << std::endl;
- oss << indent << " fileSizeDblks=" << fileSizeDblks << std::endl;
+ oss << indent << " fileSizeDblks=" << fileSize_dblks_ << std::endl;
oss << indent << " open=" << (isOpen() ? "T" : "F") << std::endl;
- oss << indent << " fileHandle=" << fileHandle << std::endl;
+ oss << indent << " fileHandle=" << fileHandle_ << std::endl;
oss << indent << " enqueuedRecordCount=" << getEnqueuedRecordCount() << std::endl;
oss << indent << " submittedDblkCount=" << getSubmittedDblkCount() << std::endl;
oss << indent << " completedDblkCount=" << getCompletedDblkCount() << std::endl;