diff options
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/JournalFile.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/journal/JournalFile.cpp | 68 |
1 files changed, 37 insertions, 31 deletions
diff --git a/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/cpp/src/qpid/linearstore/journal/JournalFile.cpp index 1b2025bd5a..fc6ced4fd2 100644 --- a/cpp/src/qpid/linearstore/journal/JournalFile.cpp +++ b/cpp/src/qpid/linearstore/journal/JournalFile.cpp @@ -27,18 +27,18 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include <unistd.h> -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { JournalFile::JournalFile(const std::string& fqFileName, const efpIdentity_t& efpIdentity, - const uint64_t fileSeqNum) : + const uint64_t fileSeqNum, + const std::string queueName) : efpIdentity_(efpIdentity), fqFileName_(fqFileName), fileSeqNum_(fileSeqNum), + queueName_(queueName), serial_(getRandom64()), firstRecordOffset_(0ULL), fileHandle_(-1), @@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::string& fqFileName, {} JournalFile::JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader) : + const ::file_hdr_t& fileHeader, + const std::string queueName) : efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib), fqFileName_(fqFileName), fileSeqNum_(fileHeader._file_number), + queueName_(queueName), serial_(fileHeader._rhdr._serial), firstRecordOffset_(fileHeader._fro), fileHandle_(-1), @@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -78,18 +82,21 @@ JournalFile::~JournalFile() { void JournalFile::initialize(const uint32_t completedDblkCount) { - if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) - { - std::ostringstream oss; - oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); - oss << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + if (!initializedFlag_) { + if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) + { + std::ostringstream oss; + oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + } + fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; + aioControlBlockPtr_ = new aio_cb; + initializedFlag_ = true; } - fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; - aioControlBlockPtr_ = new aio_cb; if (completedDblkCount > 0UL) { - submittedDblkCount_.add(completedDblkCount); - completedDblkCount_.add(completedDblkCount); + submittedDblkCount_.set(completedDblkCount); + completedDblkCount_.set(completedDblkCount); } } @@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, const efpDataSize_kib_t efpDataSize_kib, const uint16_t userFlags, const uint64_t recordId, - const uint64_t firstRecordOffset, - const std::string queueName) { + const uint64_t firstRecordOffset) { firstRecordOffset_ = firstRecordOffset; ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib); ::file_hdr_init(fileHeaderBasePtr_, @@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, recordId, firstRecordOffset, fileSeqNum_, - queueName.size(), - queueName.data()); - aio::prep_pwrite(aioControlBlockPtr_, - fileHandle_, - (void*)fileHeaderBasePtr_, - QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024, - 0UL); - if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); + queueName_.size(), + queueName_.data()); + const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024; + aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL); + if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) { + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0"; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite"); + } addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); incrOutstandingAioOperationCount(); } @@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_context_t ioContextPtr, aio_cb* aioControlBlockPtr, void* data, uint32_t dataSize_dblks) { - aio::prep_pwrite_2(aioControlBlockPtr, - fileHandle_, - data, - dataSize_dblks * QLS_DBLK_SIZE_BYTES, - submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES); + const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES; + const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES; + aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs); pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb) pcbp->_wdblks = dataSize_dblks; pcbp->_jfp = this; if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) { - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite"); } addSubmittedDblkCount(dataSize_dblks); incrOutstandingAioOperationCount(); |
