summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/journal/JournalFile.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2014-02-05 18:49:32 +0000
committerKim van der Riet <kpvdr@apache.org>2014-02-05 18:49:32 +0000
commit4c26c985409d4327a537d40e966aee3b2492b285 (patch)
tree78f160c3aeeeb818b0b6fe74a9ff4f5b85c8363f /cpp/src/qpid/linearstore/journal/JournalFile.cpp
parent03feca4761cec442ca810a212cfd88278bcc44bb (diff)
downloadqpid-python-4c26c985409d4327a537d40e966aee3b2492b285.tar.gz
QPID-5480: Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message. Fixed numerous recovery issues, particularly the handling of files at the end of the file list during recovery when the last file is not used or incompletely written.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1564877 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/journal/JournalFile.cpp')
-rw-r--r--cpp/src/qpid/linearstore/journal/JournalFile.cpp68
1 files changed, 37 insertions, 31 deletions
diff --git a/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/cpp/src/qpid/linearstore/journal/JournalFile.cpp
index 1b2025bd5a..fc6ced4fd2 100644
--- a/cpp/src/qpid/linearstore/journal/JournalFile.cpp
+++ b/cpp/src/qpid/linearstore/journal/JournalFile.cpp
@@ -27,18 +27,18 @@
#include "qpid/linearstore/journal/utils/file_hdr.h"
#include <unistd.h>
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
JournalFile::JournalFile(const std::string& fqFileName,
const efpIdentity_t& efpIdentity,
- const uint64_t fileSeqNum) :
+ const uint64_t fileSeqNum,
+ const std::string queueName) :
efpIdentity_(efpIdentity),
fqFileName_(fqFileName),
fileSeqNum_(fileSeqNum),
+ queueName_(queueName),
serial_(getRandom64()),
firstRecordOffset_(0ULL),
fileHandle_(-1),
@@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::string& fqFileName,
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ initializedFlag_(false),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::string& fqFileName,
{}
JournalFile::JournalFile(const std::string& fqFileName,
- const ::file_hdr_t& fileHeader) :
+ const ::file_hdr_t& fileHeader,
+ const std::string queueName) :
efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib),
fqFileName_(fqFileName),
fileSeqNum_(fileHeader._file_number),
+ queueName_(queueName),
serial_(fileHeader._rhdr._serial),
firstRecordOffset_(fileHeader._fro),
fileHandle_(-1),
@@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::string& fqFileName,
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ initializedFlag_(false),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -78,18 +82,21 @@ JournalFile::~JournalFile() {
void
JournalFile::initialize(const uint32_t completedDblkCount) {
- if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
- {
- std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
- oss << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+ if (!initializedFlag_) {
+ if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
+ {
+ std::ostringstream oss;
+ oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
+ oss << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+ }
+ fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
+ aioControlBlockPtr_ = new aio_cb;
+ initializedFlag_ = true;
}
- fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
- aioControlBlockPtr_ = new aio_cb;
if (completedDblkCount > 0UL) {
- submittedDblkCount_.add(completedDblkCount);
- completedDblkCount_.add(completedDblkCount);
+ submittedDblkCount_.set(completedDblkCount);
+ completedDblkCount_.set(completedDblkCount);
}
}
@@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
const efpDataSize_kib_t efpDataSize_kib,
const uint16_t userFlags,
const uint64_t recordId,
- const uint64_t firstRecordOffset,
- const std::string queueName) {
+ const uint64_t firstRecordOffset) {
firstRecordOffset_ = firstRecordOffset;
::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
::file_hdr_init(fileHeaderBasePtr_,
@@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
recordId,
firstRecordOffset,
fileSeqNum_,
- queueName.size(),
- queueName.data());
- aio::prep_pwrite(aioControlBlockPtr_,
- fileHandle_,
- (void*)fileHeaderBasePtr_,
- QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
- 0UL);
- if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0)
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
+ queueName_.size(),
+ queueName_.data());
+ const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+ aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL);
+ if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) {
+ std::ostringstream oss;
+ oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0";
+ throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite");
+ }
addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
incrOutstandingAioOperationCount();
}
@@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_context_t ioContextPtr,
aio_cb* aioControlBlockPtr,
void* data,
uint32_t dataSize_dblks) {
- aio::prep_pwrite_2(aioControlBlockPtr,
- fileHandle_,
- data,
- dataSize_dblks * QLS_DBLK_SIZE_BYTES,
- submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES);
+ const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES;
+ const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES;
+ aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs);
pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb)
pcbp->_wdblks = dataSize_dblks;
pcbp->_jfp = this;
if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) {
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details
+ std::ostringstream oss;
+ oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs;
+ throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite");
}
addSubmittedDblkCount(dataSize_dblks);
incrOutstandingAioOperationCount();