diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2014-02-05 18:49:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2014-02-05 18:49:32 +0000 |
| commit | 4c26c985409d4327a537d40e966aee3b2492b285 (patch) | |
| tree | 78f160c3aeeeb818b0b6fe74a9ff4f5b85c8363f /cpp/src/qpid/linearstore/journal/JournalFile.cpp | |
| parent | 03feca4761cec442ca810a212cfd88278bcc44bb (diff) | |
| download | qpid-python-4c26c985409d4327a537d40e966aee3b2492b285.tar.gz | |
QPID-5480: Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message. Fixed numerous recovery issues, particularly the handling of files at the end of the file list during recovery when the last file is not used or incompletely written.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1564877 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/JournalFile.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/journal/JournalFile.cpp | 68 |
1 files changed, 37 insertions, 31 deletions
diff --git a/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/cpp/src/qpid/linearstore/journal/JournalFile.cpp index 1b2025bd5a..fc6ced4fd2 100644 --- a/cpp/src/qpid/linearstore/journal/JournalFile.cpp +++ b/cpp/src/qpid/linearstore/journal/JournalFile.cpp @@ -27,18 +27,18 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include <unistd.h> -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { JournalFile::JournalFile(const std::string& fqFileName, const efpIdentity_t& efpIdentity, - const uint64_t fileSeqNum) : + const uint64_t fileSeqNum, + const std::string queueName) : efpIdentity_(efpIdentity), fqFileName_(fqFileName), fileSeqNum_(fileSeqNum), + queueName_(queueName), serial_(getRandom64()), firstRecordOffset_(0ULL), fileHandle_(-1), @@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::string& fqFileName, {} JournalFile::JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader) : + const ::file_hdr_t& fileHeader, + const std::string queueName) : efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib), fqFileName_(fqFileName), fileSeqNum_(fileHeader._file_number), + queueName_(queueName), serial_(fileHeader._rhdr._serial), firstRecordOffset_(fileHeader._fro), fileHandle_(-1), @@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -78,18 +82,21 @@ JournalFile::~JournalFile() { void JournalFile::initialize(const uint32_t completedDblkCount) { - if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) - { - std::ostringstream oss; - oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); - oss << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + if (!initializedFlag_) { + if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) + { + std::ostringstream oss; + oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + } + fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; + aioControlBlockPtr_ = new aio_cb; + initializedFlag_ = true; } - fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; - aioControlBlockPtr_ = new aio_cb; if (completedDblkCount > 0UL) { - submittedDblkCount_.add(completedDblkCount); - completedDblkCount_.add(completedDblkCount); + submittedDblkCount_.set(completedDblkCount); + completedDblkCount_.set(completedDblkCount); } } @@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, const efpDataSize_kib_t efpDataSize_kib, const uint16_t userFlags, const uint64_t recordId, - const uint64_t firstRecordOffset, - const std::string queueName) { + const uint64_t firstRecordOffset) { firstRecordOffset_ = firstRecordOffset; ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib); ::file_hdr_init(fileHeaderBasePtr_, @@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, recordId, firstRecordOffset, fileSeqNum_, - queueName.size(), - queueName.data()); - aio::prep_pwrite(aioControlBlockPtr_, - fileHandle_, - (void*)fileHeaderBasePtr_, - QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024, - 0UL); - if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); + queueName_.size(), + queueName_.data()); + const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024; + aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL); + if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) { + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0"; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite"); + } addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); incrOutstandingAioOperationCount(); } @@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_context_t ioContextPtr, aio_cb* aioControlBlockPtr, void* data, uint32_t dataSize_dblks) { - aio::prep_pwrite_2(aioControlBlockPtr, - fileHandle_, - data, - dataSize_dblks * QLS_DBLK_SIZE_BYTES, - submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES); + const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES; + const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES; + aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs); pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb) pcbp->_wdblks = dataSize_dblks; pcbp->_jfp = this; if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) { - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite"); } addSubmittedDblkCount(dataSize_dblks); incrOutstandingAioOperationCount(); |
