diff options
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp | 276 |
1 files changed, 136 insertions, 140 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp index af953aa4ef..e02e965823 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp @@ -31,21 +31,21 @@ namespace qpid { namespace qls_jrnl { -JournalFile::JournalFile(const std::string& fqFileName_, - const uint64_t fileSeqNum_, - const uint32_t fileSize_kib_) : - fqFileName(fqFileName_), - fileSeqNum(fileSeqNum_), - fileHandle(-1), - fileCloseFlag(false), - fileHeaderBasePtr (0), - fileHeaderPtr(0), - aioControlBlockPtr(0), - fileSizeDblks(((fileSize_kib_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES)) / JRNL_DBLK_SIZE_BYTES), - enqueuedRecordCount(0), - submittedDblkCount(0), - completedDblkCount(0), - outstandingAioOpsCount(0) +JournalFile::JournalFile(const std::string& fqFileName, + const uint64_t fileSeqNum, + const efpDataSize_kib_t efpDataSize_kib) : + fqFileName_(fqFileName), + fileSeqNum_(fileSeqNum), + fileHandle_(-1), + fileCloseFlag_(false), + fileHeaderBasePtr_ (0), + fileHeaderPtr_(0), + aioControlBlockPtr_(0), + fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + enqueuedRecordCount_(0), + submittedDblkCount_(0), + completedDblkCount_(0), + outstandingAioOpsCount_(0) {} JournalFile::~JournalFile() { @@ -53,227 +53,223 @@ JournalFile::~JournalFile() { } void -JournalFile::initialize() { - if (::posix_memalign(&fileHeaderBasePtr, QLS_AIO_ALIGN_BOUNDARY, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024)) +JournalFile::initialize(const uint32_t completedDblkCount) { + if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) { std::ostringstream oss; - oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024); + oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); oss << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); } - fileHeaderPtr = (::file_hdr_t*)fileHeaderBasePtr; - aioControlBlockPtr = new aio_cb; + fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; + aioControlBlockPtr_ = new aio_cb; + if (completedDblkCount > 0UL) { + submittedDblkCount_.add(completedDblkCount); + completedDblkCount_.add(completedDblkCount); + } } void JournalFile::finalize() { - if (fileHeaderBasePtr != 0) { - std::free(fileHeaderBasePtr); - fileHeaderBasePtr = 0; - fileHeaderPtr = 0; + if (fileHeaderBasePtr_ != 0) { + std::free(fileHeaderBasePtr_); + fileHeaderBasePtr_ = 0; + fileHeaderPtr_ = 0; } - if (aioControlBlockPtr != 0) { - std::free(aioControlBlockPtr); - aioControlBlockPtr = 0; + if (aioControlBlockPtr_ != 0) { + delete(aioControlBlockPtr_); + aioControlBlockPtr_ = 0; } } -const std::string -JournalFile::getDirectory() const { - return fqFileName.substr(0, fqFileName.rfind('/')); +const std::string JournalFile::getDirectory() const { + return fqFileName_.substr(0, fqFileName_.rfind('/')); +} + +const std::string JournalFile::getFileName() const { + return fqFileName_.substr(fqFileName_.rfind('/')+1); } -const std::string -JournalFile::getFileName() const { - return fqFileName.substr(fqFileName.rfind('/')+1); +const std::string JournalFile::getFqFileName() const { + return fqFileName_; } -const std::string -JournalFile::getFqFileName() const { - return fqFileName; +uint64_t JournalFile::getFileSeqNum() const { + return fileSeqNum_; } -uint64_t -JournalFile::getFileSeqNum() const { - return fileSeqNum; +bool JournalFile::isOpen() const { + return fileHandle_ >= 0; } -int -JournalFile::open() { - fileHandle = ::open(fqFileName.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r-- - if (fileHandle < 0) { +int JournalFile::open() { + fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r-- + if (fileHandle_ < 0) { std::ostringstream oss; - oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno); + oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JNLF_OPEN, oss.str(), "JournalFile", "open"); } - return fileHandle; + return fileHandle_; } -bool -JournalFile::isOpen() const { - return fileHandle >= 0; -} - -void -JournalFile::close() { - if (fileHandle >= 0) { +void JournalFile::close() { + if (fileHandle_ >= 0) { if (getOutstandingAioDblks()) { - fileCloseFlag = true; // Close later when all outstanding AIOs have returned + fileCloseFlag_ = true; // Close later when all outstanding AIOs have returned } else { - int res = ::close(fileHandle); - fileHandle = -1; + int res = ::close(fileHandle_); + fileHandle_ = -1; if (res != 0) { std::ostringstream oss; - oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno); + oss << "file=\"" << fqFileName_ << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JNLF_CLOSE, oss.str(), "JournalFile", "open"); } } } } -void -JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr_, - const efpPartitionNumber_t efpPartitionNumber_, - const efpDataSize_kib_t efpDataSize_kib_, - const uint16_t userFlags_, - const uint64_t recordId_, - const uint64_t firstRecordOffset_, - const std::string queueName_) { - ::file_hdr_create(fileHeaderPtr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber_, efpDataSize_kib_); - ::file_hdr_init(fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, userFlags_, recordId_, firstRecordOffset_, fileSeqNum, queueName_.size(), queueName_.data()); - aio::prep_pwrite(aioControlBlockPtr, fileHandle, (void*)fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, 0UL); - if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr) < 0) +void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, + const efpPartitionNumber_t efpPartitionNumber, + const efpDataSize_kib_t efpDataSize_kib, + const uint16_t userFlags, + const uint64_t recordId, + const uint64_t firstRecordOffset, + const std::string queueName) { + ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib); + ::file_hdr_init(fileHeaderBasePtr_, + QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024, + userFlags, + recordId, + firstRecordOffset, + fileSeqNum_, + queueName.size(), + queueName.data()); + aio::prep_pwrite(aioControlBlockPtr_, + fileHandle_, + (void*)fileHeaderBasePtr_, + QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024, + 0UL); + if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); - addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS); + addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); incrOutstandingAioOperationCount(); } -void -JournalFile::asyncPageWrite(io_context_t ioContextPtr_, - aio_cb* aioControlBlockPtr_, - void* data_, - uint32_t dataSize_dblks_) { - aio::prep_pwrite_2(aioControlBlockPtr_, fileHandle, data_, dataSize_dblks_ * JRNL_DBLK_SIZE_BYTES, submittedDblkCount.get() * JRNL_DBLK_SIZE_BYTES); - pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr_->data); // This page's control block (pcb) - pcbp->_wdblks = dataSize_dblks_; +void JournalFile::asyncPageWrite(io_context_t ioContextPtr, + aio_cb* aioControlBlockPtr, + void* data, + uint32_t dataSize_dblks) { + aio::prep_pwrite_2(aioControlBlockPtr, + fileHandle_, + data, + dataSize_dblks * QLS_DBLK_SIZE_BYTES, + submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES); + pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb) + pcbp->_wdblks = dataSize_dblks; pcbp->_jfp = this; - if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr_) < 0) - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); - addSubmittedDblkCount(dataSize_dblks_); + if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) { + throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details + } + addSubmittedDblkCount(dataSize_dblks); incrOutstandingAioOperationCount(); } -uint32_t -JournalFile::getEnqueuedRecordCount() const { - return enqueuedRecordCount.get(); +uint32_t JournalFile::getEnqueuedRecordCount() const { + return enqueuedRecordCount_.get(); } -uint32_t -JournalFile::incrEnqueuedRecordCount() { - return enqueuedRecordCount.increment(); +uint32_t JournalFile::incrEnqueuedRecordCount() { + return enqueuedRecordCount_.increment(); } -uint32_t -JournalFile::addEnqueuedRecordCount(const uint32_t a) { - return enqueuedRecordCount.add(a); +uint32_t JournalFile::addEnqueuedRecordCount(const uint32_t a) { + return enqueuedRecordCount_.add(a); } -uint32_t -JournalFile::decrEnqueuedRecordCount() { - return enqueuedRecordCount.decrementLimit(); +uint32_t JournalFile::decrEnqueuedRecordCount() { + return enqueuedRecordCount_.decrementLimit(); } -uint32_t -JournalFile::subtrEnqueuedRecordCount(const uint32_t s) { - return enqueuedRecordCount.subtractLimit(s); +uint32_t JournalFile::subtrEnqueuedRecordCount(const uint32_t s) { + return enqueuedRecordCount_.subtractLimit(s); } -uint32_t -JournalFile::getSubmittedDblkCount() const { - return submittedDblkCount.get(); +uint32_t JournalFile::getSubmittedDblkCount() const { + return submittedDblkCount_.get(); } -uint32_t -JournalFile::addSubmittedDblkCount(const uint32_t a) { - return submittedDblkCount.addLimit(a, fileSizeDblks, jerrno::JERR_JNLF_FILEOFFSOVFL); +uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) { + return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL); } -uint32_t -JournalFile::getCompletedDblkCount() const { - return completedDblkCount.get(); +uint32_t JournalFile::getCompletedDblkCount() const { + return completedDblkCount_.get(); } -uint32_t -JournalFile::addCompletedDblkCount(const uint32_t a) { - return completedDblkCount.addLimit(a, submittedDblkCount.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL); +uint32_t JournalFile::addCompletedDblkCount(const uint32_t a) { + return completedDblkCount_.addLimit(a, submittedDblkCount_.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL); } uint16_t JournalFile::getOutstandingAioOperationCount() const { - return outstandingAioOpsCount.get(); + return outstandingAioOpsCount_.get(); } uint16_t JournalFile::incrOutstandingAioOperationCount() { - return outstandingAioOpsCount.increment(); + return outstandingAioOpsCount_.increment(); } uint16_t JournalFile::decrOutstandingAioOperationCount() { - uint16_t r = outstandingAioOpsCount.decrementLimit(); - if (fileCloseFlag && outstandingAioOpsCount == 0) { // Delayed close + uint16_t r = outstandingAioOpsCount_.decrementLimit(); + if (fileCloseFlag_ && outstandingAioOpsCount_ == 0) { // Delayed close close(); } return r; } -bool -JournalFile::isEmpty() const { - return submittedDblkCount == 0; +// --- Status helper functions --- + +bool JournalFile::isEmpty() const { + return submittedDblkCount_ == 0; } -bool -JournalFile::isDataEmpty() const { - return submittedDblkCount <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS; +bool JournalFile::isDataEmpty() const { + return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS; } -u_int32_t -JournalFile::dblksRemaining() const { - return fileSizeDblks - submittedDblkCount; +u_int32_t JournalFile::dblksRemaining() const { + return fileSize_dblks_ - submittedDblkCount_; } -bool -JournalFile::isFull() const { - return submittedDblkCount == fileSizeDblks; +bool JournalFile::isFull() const { + return submittedDblkCount_ == fileSize_dblks_; } -bool -JournalFile::isFullAndComplete() const { - return completedDblkCount == fileSizeDblks; +bool JournalFile::isFullAndComplete() const { + return completedDblkCount_ == fileSize_dblks_; } -u_int32_t -JournalFile::getOutstandingAioDblks() const { - return submittedDblkCount - completedDblkCount; +u_int32_t JournalFile::getOutstandingAioDblks() const { + return submittedDblkCount_ - completedDblkCount_; } -bool -JournalFile::getNextFile() const { +bool JournalFile::getNextFile() const { return isFull(); } -bool -JournalFile::isNoEnqueuedRecordsRemaining() const { +bool JournalFile::isNoEnqueuedRecordsRemaining() const { return !isDataEmpty() && // Must be written to, not empty - enqueuedRecordCount == 0; // No remaining enqueued records + enqueuedRecordCount_ == 0; // No remaining enqueued records } -const std::string -JournalFile::status_str(const uint8_t indentDepth_) const { - std::string indent((size_t)indentDepth_, '.'); +// debug aid +const std::string JournalFile::status_str(const uint8_t indentDepth) const { + std::string indent((size_t)indentDepth, '.'); std::ostringstream oss; oss << indent << "JournalFile: fileName=" << getFileName() << std::endl; oss << indent << " directory=" << getDirectory() << std::endl; - oss << indent << " fileSizeDblks=" << fileSizeDblks << std::endl; + oss << indent << " fileSizeDblks=" << fileSize_dblks_ << std::endl; oss << indent << " open=" << (isOpen() ? "T" : "F") << std::endl; - oss << indent << " fileHandle=" << fileHandle << std::endl; + oss << indent << " fileHandle=" << fileHandle_ << std::endl; oss << indent << " enqueuedRecordCount=" << getEnqueuedRecordCount() << std::endl; oss << indent << " submittedDblkCount=" << getSubmittedDblkCount() << std::endl; oss << indent << " completedDblkCount=" << getCompletedDblkCount() << std::endl; |