summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2014-02-05 18:49:32 +0000
committerKim van der Riet <kpvdr@apache.org>2014-02-05 18:49:32 +0000
commit3ad602825cf4954a09372822fc168f5b03d916e9 (patch)
tree047f51be4cc0d2a36398a7f64378704ca302feab
parentcfae4006fc5f91a693c5edef075ab3c9d848a111 (diff)
downloadqpid-python-3ad602825cf4954a09372822fc168f5b03d916e9.tar.gz
QPID-5480: Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message. Fixed numerous recovery issues, particularly the handling of files at the end of the file list during recovery when the last file is not used or incompletely written.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564877 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES28
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp13
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp68
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/JournalFile.h13
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp82
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h13
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp195
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp20
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jrec.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp100
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.h12
22 files changed, 371 insertions, 247 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 8c5b08bb61..3aec281858 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -34,29 +34,36 @@ Current/pending:
** Basic performance tests
5362 - Linearstore: No store tools exist for examining the journals
svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+ svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+ svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
* Store analysis and status
* Recovery/reading of message content
* Empty file pool status and management
5464 - [linearstore] Incompletely created journal files accumulate in EFP
- 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
- * Probablilty: 2 of 600 (0.3%) using tx-test-soak.sh
- 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
+# 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
+ * Probability: 2 of 600 (0.3%) using tx-test-soak.sh
+# 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
* Probability: 6 of 600 (1.0%) using tx-test-soak.sh
* If broker is started a second time after failure, it starts correctly and test completes ok.
+ * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally).
+ * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary
5484 1035843 Slow performance for producers
svn r.1558592 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers.
- - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+ svn r.1558913 replaces use of /dev/urandom with several calls to rand() to construct a 64-bit random number.
+ * Recommend rebuilding and testing for performance again with these two fixes.
+# - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
UNABLE TO REPRODUCE - but Frantizek has additional info
- 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
* Possible dup of 1039525
- * May be fixed by QPID-5483 - waiting for needinfo
+ * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing
- 1039525 Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL
* Possible dup of 1039522
- * May be fixed by QPID-5483 - waiting for needinfo
- 5487 - [linearstore] Replace use of /dev/urandom with c random generator calls
+ * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing
+# - 1049870 [LinearStore] auto-delete property does not survive restart
-Fixed/closed:
-=============
+Fixed/closed (in commit order):
+===============================
Q-JIRA RHBZ Description / Comments
------ ------- ----------------------
5357 1052518 Linearstore: Empty file recycling not functional
@@ -85,6 +92,8 @@ NO-JIRA - Added missing Apache copyright/license text
svn r.1558589 2014-01-15: Proposed fix
* May be linked to RHBZ 1039522 - waiting for needinfo
* May be linked to RHBZ 1039525 - waiting for needinfo
+ 5487 1054448 [linearstore] Replace use of /dev/urandom with c random generator calls
+ svn r.1558913 2014-01-16: Proposed fix
Future:
=======
@@ -101,3 +110,4 @@ Code tidy-up
* Member names: xxx_
* Rename classes, functions and variables to camel-case
* Add Doxygen docs to classes
+* Make fid's consistent in name (fid, file_id, pfid) and format (hex vs decimal)
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 483b494c2c..ff5b41b962 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -593,7 +593,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
std::ostringstream oss;
oss << "Recovered transaction prepared list:";
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- oss << std::endl << " " << str2hexnum(i->xid);
+ oss << std::endl << " " << qpid::linearstore::journal::jcntl::str2hexnum(i->xid);
}
QLS_LOG(debug, oss.str());
@@ -1292,7 +1292,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_,
mgmtObject->inc_tplTxnAborts();
}
} catch (const std::exception& e) {
- QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what());
+ QLS_LOG(error, "Error completing xid " << qpid::linearstore::journal::jcntl::str2hexnum(txn_.getXid()) << ": " << e.what());
throw;
}
}
@@ -1516,15 +1516,6 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) {
journalList.erase(j_.id());
}
-std::string MessageStoreImpl::str2hexnum(const std::string& str) {
- std::ostringstream oss;
- oss << "(" << str.size() << ")0x" << std::hex;
- for (unsigned i=str.size(); i>0; --i) {
- oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
- }
- return oss.str();
-}
-
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
qpid::Options(name_),
truncateFlag(defTruncateFlag),
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 3157b9be9d..c2eb0deab0 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -235,8 +235,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
}
void chkTplStoreInit();
- static std::string str2hexnum(const std::string& str);
-
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
index 743d12989a..df2e7a442d 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
@@ -52,7 +52,9 @@ void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) {
jc->txn_abort(dtokp.get(), getXid());
}
} catch (const qpid::linearstore::journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ std::ostringstream oss;
+ oss << "Error during " << (commit ? "commit" : "abort") << ": " << e.what();
+ THROW_STORE_EXCEPTION(oss.str());
}
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
index 1b2025bd5a..fc6ced4fd2 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
@@ -27,18 +27,18 @@
#include "qpid/linearstore/journal/utils/file_hdr.h"
#include <unistd.h>
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
JournalFile::JournalFile(const std::string& fqFileName,
const efpIdentity_t& efpIdentity,
- const uint64_t fileSeqNum) :
+ const uint64_t fileSeqNum,
+ const std::string queueName) :
efpIdentity_(efpIdentity),
fqFileName_(fqFileName),
fileSeqNum_(fileSeqNum),
+ queueName_(queueName),
serial_(getRandom64()),
firstRecordOffset_(0ULL),
fileHandle_(-1),
@@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::string& fqFileName,
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ initializedFlag_(false),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::string& fqFileName,
{}
JournalFile::JournalFile(const std::string& fqFileName,
- const ::file_hdr_t& fileHeader) :
+ const ::file_hdr_t& fileHeader,
+ const std::string queueName) :
efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib),
fqFileName_(fqFileName),
fileSeqNum_(fileHeader._file_number),
+ queueName_(queueName),
serial_(fileHeader._rhdr._serial),
firstRecordOffset_(fileHeader._fro),
fileHandle_(-1),
@@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::string& fqFileName,
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ initializedFlag_(false),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -78,18 +82,21 @@ JournalFile::~JournalFile() {
void
JournalFile::initialize(const uint32_t completedDblkCount) {
- if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
- {
- std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
- oss << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+ if (!initializedFlag_) {
+ if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
+ {
+ std::ostringstream oss;
+ oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
+ oss << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+ }
+ fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
+ aioControlBlockPtr_ = new aio_cb;
+ initializedFlag_ = true;
}
- fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
- aioControlBlockPtr_ = new aio_cb;
if (completedDblkCount > 0UL) {
- submittedDblkCount_.add(completedDblkCount);
- completedDblkCount_.add(completedDblkCount);
+ submittedDblkCount_.set(completedDblkCount);
+ completedDblkCount_.set(completedDblkCount);
}
}
@@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
const efpDataSize_kib_t efpDataSize_kib,
const uint16_t userFlags,
const uint64_t recordId,
- const uint64_t firstRecordOffset,
- const std::string queueName) {
+ const uint64_t firstRecordOffset) {
firstRecordOffset_ = firstRecordOffset;
::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
::file_hdr_init(fileHeaderBasePtr_,
@@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
recordId,
firstRecordOffset,
fileSeqNum_,
- queueName.size(),
- queueName.data());
- aio::prep_pwrite(aioControlBlockPtr_,
- fileHandle_,
- (void*)fileHeaderBasePtr_,
- QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
- 0UL);
- if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0)
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
+ queueName_.size(),
+ queueName_.data());
+ const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+ aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL);
+ if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) {
+ std::ostringstream oss;
+ oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0";
+ throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite");
+ }
addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
incrOutstandingAioOperationCount();
}
@@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_context_t ioContextPtr,
aio_cb* aioControlBlockPtr,
void* data,
uint32_t dataSize_dblks) {
- aio::prep_pwrite_2(aioControlBlockPtr,
- fileHandle_,
- data,
- dataSize_dblks * QLS_DBLK_SIZE_BYTES,
- submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES);
+ const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES;
+ const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES;
+ aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs);
pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb)
pcbp->_wdblks = dataSize_dblks;
pcbp->_jfp = this;
if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) {
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details
+ std::ostringstream oss;
+ oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs;
+ throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite");
}
addSubmittedDblkCount(dataSize_dblks);
incrOutstandingAioOperationCount();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
index f0ad432fd8..e33830ef7f 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
@@ -38,6 +38,7 @@ protected:
const efpIdentity_t efpIdentity_;
const std::string fqFileName_;
const uint64_t fileSeqNum_;
+ const std::string queueName_;
const uint64_t serial_;
uint64_t firstRecordOffset_;
int fileHandle_;
@@ -46,6 +47,7 @@ protected:
::file_hdr_t* fileHeaderPtr_;
aio_cb* aioControlBlockPtr_;
uint32_t fileSize_dblks_; ///< File size in data blocks, including file header
+ bool initializedFlag_;
AtomicCounter<uint32_t> enqueuedRecordCount_; ///< Count of enqueued records
AtomicCounter<uint32_t> submittedDblkCount_; ///< Write file count (data blocks) for submitted AIO
@@ -56,10 +58,12 @@ public:
// Constructor for creating new file with known fileSeqNum and random serial
JournalFile(const std::string& fqFileName,
const efpIdentity_t& efpIdentity,
- const uint64_t fileSeqNum);
+ const uint64_t fileSeqNum,
+ const std::string queueName);
// Constructor for recovery in which fileSeqNum and serial are recovered from fileHeader param
JournalFile(const std::string& fqFileName,
- const ::file_hdr_t& fileHeader);
+ const ::file_hdr_t& fileHeader,
+ const std::string queueName);
virtual ~JournalFile();
void initialize(const uint32_t completedDblkCount);
@@ -76,13 +80,13 @@ public:
const efpDataSize_kib_t efpDataSize_kib,
const uint16_t userFlags,
const uint64_t recordId,
- const uint64_t firstRecordOffset,
- const std::string queueName);
+ const uint64_t firstRecordOffset);
void asyncPageWrite(io_context_t ioContextPtr,
aio_cb* aioControlBlockPtr,
void* data,
uint32_t dataSize_dblks);
+ uint32_t getSubmittedDblkCount() const;
uint32_t getEnqueuedRecordCount() const;
uint32_t incrEnqueuedRecordCount();
uint32_t decrEnqueuedRecordCount();
@@ -109,7 +113,6 @@ protected:
static uint64_t getRandom64();
bool isOpen() const;
- uint32_t getSubmittedDblkCount() const;
uint32_t addSubmittedDblkCount(const uint32_t a);
uint32_t getCompletedDblkCount() const;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index 5483f3bb94..86d1b0e93c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -25,8 +25,6 @@
#include "qpid/linearstore/journal/jcntl.h"
#include "qpid/linearstore/journal/JournalFile.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -34,10 +32,10 @@ namespace journal {
LinearFileController::LinearFileController(jcntl& jcntlRef) :
jcntlRef_(jcntlRef),
emptyFilePoolPtr_(0),
- currentJournalFilePtr_(0),
fileSeqCounter_("LinearFileController::fileSeqCounter", 0),
recordIdCounter_("LinearFileController::recordIdCounter", 0),
- decrCounter_("LinearFileController::decrCounter", 0)
+ decrCounter_("LinearFileController::decrCounter", 0),
+ currentJournalFilePtr_(0)
{}
LinearFileController::~LinearFileController() {}
@@ -53,7 +51,7 @@ void LinearFileController::initialize(const std::string& journalDirectory,
void LinearFileController::finalize() {
if (currentJournalFilePtr_) {
currentJournalFilePtr_->close();
- currentJournalFilePtr_ = NULL;
+ currentJournalFilePtr_ = 0;
}
while (!journalFileList_.empty()) {
delete journalFileList_.front();
@@ -62,17 +60,21 @@ void LinearFileController::finalize() {
}
void LinearFileController::addJournalFile(JournalFile* journalFilePtr,
- const uint32_t completedDblkCount) {
- if (currentJournalFilePtr_) {
+ const uint32_t completedDblkCount,
+ const bool makeCurrentFlag) {
+ if (makeCurrentFlag && currentJournalFilePtr_) {
currentJournalFilePtr_->close();
+ currentJournalFilePtr_ = 0;
}
journalFilePtr->initialize(completedDblkCount);
- currentJournalFilePtr_ = journalFilePtr;
{
slock l(journalFileListMutex_);
- journalFileList_.push_back(currentJournalFilePtr_);
+ journalFileList_.push_back(journalFilePtr);
+ }
+ if (makeCurrentFlag) {
+ currentJournalFilePtr_ = journalFilePtr;
+ currentJournalFilePtr_->open();
}
- currentJournalFilePtr_->open();
}
efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
@@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
return emptyFilePoolPtr_->fileSize_sblks();
}
+void LinearFileController::getNextJournalFile() {
+ if (currentJournalFilePtr_)
+ currentJournalFilePtr_->close();
+ pullEmptyFileFromEfp();
+}
+
uint64_t LinearFileController::getNextRecordId() {
return recordIdCounter_.increment();
}
-void LinearFileController::pullEmptyFileFromEfp() {
- if (currentJournalFilePtr_)
- currentJournalFilePtr_->close();
- std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
-//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
- addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
+void LinearFileController::removeFileToEfp(const std::string& fileName) {
+ if (emptyFilePoolPtr_) {
+ emptyFilePoolPtr_->returnEmptyFile(fileName);
+ }
}
void LinearFileController::restoreEmptyFile(const std::string& fileName) {
@@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) {
void LinearFileController::purgeEmptyFilesToEfp() {
slock l(journalFileListMutex_);
- purgeEmptyFilesToEfpNoLock();
+ while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
+ emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
+ }
}
uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
@@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t file
}
uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex_);
uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
// TODO: Re-evaluate after testing and profiling
@@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file
// records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data
// records) without impacting performance or performing badly (leaving excessive empty files in the journals).
if (decrCounter_.increment() % 100ULL == 0ULL) {
- purgeEmptyFilesToEfpNoLock();
+ purgeEmptyFilesToEfp();
}
return r;
}
uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
- slock l(journalFileListMutex_);
return find(fileSeqNumber)->addCompletedDblkCount(a);
}
uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex_);
return find(fileSeqNumber)->decrOutstandingAioOperationCount();
}
@@ -142,12 +149,11 @@ void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
const uint64_t recordId,
const uint64_t firstRecordOffset) {
currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
- emptyFilePoolPtr_->getPartitionNumber(),
- emptyFilePoolPtr_->dataSize_kib(),
- userFlags,
- recordId,
- firstRecordOffset,
- jcntlRef_.id());
+ emptyFilePoolPtr_->getPartitionNumber(),
+ emptyFilePoolPtr_->dataSize_kib(),
+ userFlags,
+ recordId,
+ firstRecordOffset);
}
void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
@@ -195,8 +201,8 @@ void LinearFileController::addJournalFile(const std::string& fileName,
const efpIdentity_t& efpIdentity,
const uint64_t fileNumber,
const uint32_t completedDblkCount) {
- JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber);
- addJournalFile(jfp, completedDblkCount);
+ JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id());
+ addJournalFile(jfp, completedDblkCount, true);
}
void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
@@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJournalFileValid() const {
return currentJournalFilePtr_ != 0;
}
-// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
- if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+ if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
return currentJournalFilePtr_;
+
+ slock l(journalFileListMutex_);
for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
if ((*i)->getFileSeqNum() == fileSeqNumber) {
return *i;
}
}
+
std::ostringstream oss;
oss << "fileSeqNumber=" << fileSeqNumber;
throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
@@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFileSeqNum() {
return fileSeqCounter_.increment();
}
-void LinearFileController::purgeEmptyFilesToEfpNoLock() {
-//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG
- while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() &&
- journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
-//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG
- emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
- delete journalFileList_.front();
- journalFileList_.pop_front();
- }
+void LinearFileController::pullEmptyFileFromEfp() {
+ std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name
+ addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
}
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
index 933b9792a4..05f08144b9 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
@@ -44,12 +44,12 @@ protected:
jcntl& jcntlRef_;
std::string journalDirectory_;
EmptyFilePool* emptyFilePoolPtr_;
- JournalFile* currentJournalFilePtr_;
AtomicCounter<uint64_t> fileSeqCounter_;
AtomicCounter<uint64_t> recordIdCounter_;
AtomicCounter<uint64_t> decrCounter_;
JournalFileList_t journalFileList_;
+ JournalFile* currentJournalFilePtr_;
smutex journalFileListMutex_;
public:
@@ -62,12 +62,14 @@ public:
void finalize();
void addJournalFile(JournalFile* journalFilePtr,
- const uint32_t completedDblkCount);
+ const uint32_t completedDblkCount,
+ const bool makeCurrentFlag);
efpDataSize_sblks_t dataSize_sblks() const;
efpFileSize_sblks_t fileSize_sblks() const;
+ void getNextJournalFile();
uint64_t getNextRecordId();
- void pullEmptyFileFromEfp();
+ void removeFileToEfp(const std::string& fileName);
void restoreEmptyFile(const std::string& fileName);
void purgeEmptyFilesToEfp();
@@ -105,11 +107,12 @@ protected:
bool checkCurrentJournalFileValid() const;
JournalFile* find(const efpFileCount_t fileSeqNumber);
uint64_t getNextFileSeqNum();
- void purgeEmptyFilesToEfpNoLock();
+ void pullEmptyFileFromEfp();
};
typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr,
- const uint32_t completedDblkCount);
+ const uint32_t completedDblkCount,
+ const bool makeCurrentFlag);
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 72308cc929..a1cec53ca1 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -56,11 +56,15 @@ RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t
pendingTransaction_(ptxn)
{}
-
bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
return a.recordId_ < b.recordId_;
}
+RecoveredFileData_t::RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount) :
+ journalFilePtr_(journalFilePtr),
+ completedDblkCount_(completedDblkCount)
+{}
+
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
enq_map& enqueueMapRef,
@@ -77,11 +81,17 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory,
highestRecordId_(0ULL),
highestFileNumber_(0ULL),
lastFileFullFlag_(false),
+ initial_fid_(0),
currentSerial_(0),
efpFileSize_kib_(0)
{}
-RecoveryManager::~RecoveryManager() {}
+RecoveryManager::~RecoveryManager() {
+ for (fileNumberMapItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
+ delete i->second;
+ }
+ fileNumberMap_.clear();
+}
void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
EmptyFilePoolManager* emptyFilePoolManager,
@@ -92,9 +102,6 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
*emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
- // Check for file full condition
- lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
-
if (!journalEmptyFlag_) {
// Read all records, establish remaining enqueued records
@@ -106,6 +113,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
inFileStream_.close();
}
+ // Check for file full condition
+ lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
// Remove leading files which have no enqueued records
removeEmptyFiles(*emptyFilePoolPtrPtr);
@@ -121,7 +131,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
// Unlock any affected enqueues in emap
for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
if (i->enq_flag_) { // enq op - decrement enqueue count
- fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
+ fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
// enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
@@ -174,7 +184,7 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
}
} while (!foundRecord);
- if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+ if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) {
if (!getFile(recordIdListConstItr_->fileId_, false)) {
std::ostringstream oss;
oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
@@ -231,7 +241,6 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
::rec_tail_t enqueueTail;
inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
uint32_t cs = checksum.getChecksum();
-//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
if (res != 0) {
std::stringstream oss;
@@ -266,17 +275,30 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr) {
if (journalEmptyFlag_) {
- if (uninitializedJournal_.size() > 0) {
- lfcPtr->restoreEmptyFile(uninitializedJournal_);
+ if (uninitFileList_.size() > 0) {
+ std::string uninitFile = uninitFileList_.back();
+ uninitFileList_.pop_back();
+ lfcPtr->restoreEmptyFile(uninitFile);
}
} else {
for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
- uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file?
- endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset
- efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
- (lfcPtr->*fnPtr)(i->second, fileDblkCount);
+ (lfcPtr->*fnPtr)(i->second->journalFilePtr_, i->second->completedDblkCount_, i->first == initial_fid_);
}
}
+
+ std::ostringstream oss;
+ bool logFlag = !notNeededFilesList_.empty();
+ if (logFlag) {
+ oss << "Files removed from head of journal: prior truncation during recovery:";
+ }
+ while (!notNeededFilesList_.empty()) {
+ lfcPtr->removeFileToEfp(notNeededFilesList_.back());
+ oss << std::endl << " * " << notNeededFilesList_.back();
+ notNeededFilesList_.pop_back();
+ }
+ if (logFlag) {
+ journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
+ }
}
std::string RecoveryManager::toString(const std::string& jid) {
@@ -285,7 +307,7 @@ std::string RecoveryManager::toString(const std::string& jid) {
oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
oss << " Journal File List:" << std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->getFqFileName();
+ std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
}
oss << " Enqueue Counts: [ ";
@@ -293,7 +315,7 @@ std::string RecoveryManager::toString(const std::string& jid) {
if (l != fileNumberMap_.begin()) {
oss << ", ";
}
- oss << l->second->getEnqueuedRecordCount();
+ oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
}
oss << " ]" << std::endl;
oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
@@ -330,15 +352,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
<< std::setw(10) << "--------"
<< std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->getFqFileName();
+ std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
+ std::ostringstream fid;
+ fid << std::hex << "0x" << k->first;
std::ostringstream fro;
- fro << std::hex << "0x" << k->second->getFirstRecordOffset();
- oss << indentStr << std::setw(7) << k->first
+ fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset();
+ oss << indentStr << std::setw(7) << fid.str()
<< std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
<< std::setw(16) << fro.str()
- << std::setw(12) << k->second->getEnqueuedRecordCount()
- << std::setw(5) << k->second->getEfpIdentity().pn_
- << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+ << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount()
+ << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_
+ << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
<< std::endl;
}
oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
@@ -347,7 +371,7 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << indentStr << "Enqueued records (txn & non-txn):";
+ //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report
}
return oss.str();
}
@@ -357,27 +381,28 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
std::string headerQueueName;
::file_hdr_t fileHeader;
- directoryList_t directoryList;
+ stringList_t directoryList;
jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
- for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+ for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
readJournalFileHeader(*i, fileHeader, headerQueueName);
if (headerQueueName.empty()) {
std::ostringstream oss;
- if (uninitializedJournal_.empty()) {
- oss << "Journal file " << (*i) << " is first uninitialized (not yet written) journal file.";
- journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
- uninitializedJournal_ = *i;
- } else {
- oss << "Journal file " << (*i) << " is second or greater uninitialized journal file - ignoring";
- journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
- }
+ oss << "Journal file " << (*i) << " is uninitialized";
+ journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+ uninitFileList_.push_back(*i);
} else if (headerQueueName.compare(queueName_) != 0) {
std::ostringstream oss;
oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
} else {
- JournalFile* jfp = new JournalFile(*i, fileHeader);
- fileNumberMap_[fileHeader._file_number] = jfp;
+ JournalFile* jfp = new JournalFile(*i, fileHeader, queueName_);
+ std::pair<fileNumberMapItr_t, bool> res = fileNumberMap_.insert(
+ std::pair<uint64_t, RecoveredFileData_t*>(fileHeader._file_number, new RecoveredFileData_t(jfp, 0)));
+ if (!res.second) {
+ std::ostringstream oss;
+ oss << "Journal file " << (*i) << " has fid=0x" << std::hex << jfp->getFileSeqNum() << " which already exists for this journal.";
+ throw jexception(oss.str()); // TODO: complete this exception
+ }
if (fileHeader._file_number > highestFileNumber_) {
highestFileNumber_ = fileHeader._file_number;
}
@@ -393,7 +418,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
if (fileNumberMap_.empty()) {
journalEmptyFlag_ = true;
} else {
- currentJournalFileConstItr_ = fileNumberMap_.begin();
+ currentJournalFileItr_ = fileNumberMap_.begin();
}
}
@@ -408,7 +433,7 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) {
}
}
-void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+void RecoveryManager::checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition) {
if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
std::ostringstream oss;
oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
@@ -420,12 +445,13 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
if (sblkOffset)
{
std::ostringstream oss1;
- oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+ oss1 << std::hex << "Bad record alignment found at fid=0x" << start_fid;
oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
- std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ fileNumberMapConstItr_t fnmItr = fileNumberMap_.find(start_fid);
+ std::ofstream outFileStream(fnmItr->second->journalFilePtr_->getFqFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
if (!outFileStream.good()) {
throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
}
@@ -447,7 +473,7 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
}
std::ostringstream oss2;
- oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+ oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << start_fid;
oss2 << " offs=0x" << currentPosn;
journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
currentPosn = outFileStream.tellp();
@@ -456,16 +482,15 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
std::free(writeBuffer);
journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
}
- endOffset_ = currentPosn;
+ lastRecord(start_fid, currentPosn);
}
bool RecoveryManager::decodeRecord(jrec& record,
std::size_t& cumulativeSizeRead,
::rec_hdr_t& headerRecord,
- std::streampos& fileOffset)
+ const uint64_t start_fid,
+ const std::streampos recordOffset)
{
- std::streampos start_file_offs = fileOffset;
-
if (highestRecordId_ == 0) {
highestRecordId_ = headerRecord._rid;
} else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
@@ -475,7 +500,7 @@ bool RecoveryManager::decodeRecord(jrec& record,
bool done = false;
while (!done) {
try {
- done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead, recordOffset);
}
catch (const jexception& e) {
if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
@@ -485,11 +510,12 @@ bool RecoveryManager::decodeRecord(jrec& record,
} else {
journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
}
- checkJournalAlignment(start_file_offs);
+ checkJournalAlignment(start_fid, recordOffset);
return false;
}
if (!done && needNextFile()) {
if (!getNextFile(false)) {
+ checkJournalAlignment(start_fid, recordOffset);
return false;
}
}
@@ -498,11 +524,11 @@ bool RecoveryManager::decodeRecord(jrec& record,
}
std::string RecoveryManager::getCurrentFileName() const {
- return currentJournalFileConstItr_->second->getFqFileName();
+ return currentJournalFileItr_->second->journalFilePtr_->getFqFileName();
}
uint64_t RecoveryManager::getCurrentFileNumber() const {
- return currentJournalFileConstItr_->first;
+ return currentJournalFileItr_->first;
}
bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
@@ -511,8 +537,8 @@ bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordO
//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
}
- currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
- if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ currentJournalFileItr_ = fileNumberMap_.find(fileNumber);
+ if (currentJournalFileItr_ == fileNumberMap_.end()) {
return false;
}
inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
@@ -536,7 +562,8 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
if (inFileStream_.is_open()) {
inFileStream_.close();
//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
- if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ currentJournalFileItr_->second->completedDblkCount_ = efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES;
+ if (++currentJournalFileItr_ == fileNumberMap_.end()) {
return false;
}
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
@@ -562,13 +589,15 @@ bool RecoveryManager::getNextRecordHeader()
rec_hdr_t h;
bool hdr_ok = false;
- std::streampos file_pos;
+ uint64_t file_id = 0;
+ std::streampos file_pos = 0;
while (!hdr_ok) {
if (needNextFile()) {
if (!getNextFile(true)) {
return false;
}
}
+ file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
file_pos = inFileStream_.tellg();
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
@@ -587,21 +616,21 @@ bool RecoveryManager::getNextRecordHeader()
}
}
+ uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
switch(h._magic) {
case QLS_ENQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
enq_rec er;
- uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
- if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (!er.is_transient()) { // Ignore transient msgs
- fileNumberMap_[start_fid]->incrEnqueuedRecordCount();
+ fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount();
if (er.xid_size()) {
er.get_xid(&xidp);
if (xidp == 0) {
@@ -629,12 +658,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
deq_rec dr;
- uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
- if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (dr.xid_size()) {
@@ -655,7 +683,7 @@ bool RecoveryManager::getNextRecordHeader()
} else {
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
- fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+ fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
@@ -664,11 +692,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
txn_rec ar;
- if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, unlock any locked records in emap
@@ -680,7 +708,7 @@ bool RecoveryManager::getNextRecordHeader()
txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) {
- fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
+ fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else {
enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
@@ -691,11 +719,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
txn_rec cr;
- if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, process records into emap
@@ -717,7 +745,7 @@ bool RecoveryManager::getNextRecordHeader()
} else { // txn dequeue
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
- fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+ fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
@@ -729,7 +757,9 @@ bool RecoveryManager::getNextRecordHeader()
inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
checkFileStreamOk(false);
if (needNextFile()) {
+ file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES;
if (!getNextFile(false)) {
+ lastRecord(start_fid, file_pos);
return false;
}
}
@@ -737,17 +767,36 @@ bool RecoveryManager::getNextRecordHeader()
break;
case 0:
//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
- checkJournalAlignment(file_pos);
+ checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
default:
//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
// Stop as this is the overwrite boundary.
- checkJournalAlignment(file_pos);
+ checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
}
return true;
}
+void RecoveryManager::lastRecord(const uint64_t file_id, const std::streamoff endOffset) {
+ endOffset_ = endOffset;
+ initial_fid_ = file_id;
+ fileNumberMap_[file_id]->completedDblkCount_ = endOffset_ / QLS_DBLK_SIZE_BYTES;
+
+ // Remove any files in fileNumberMap_ beyond initial_fid_
+ fileNumberMapItr_t unwantedFirstItr = fileNumberMap_.find(file_id);
+ if (++unwantedFirstItr != fileNumberMap_.end()) {
+ fileNumberMapItr_t itr = unwantedFirstItr;
+ notNeededFilesList_.push_back(unwantedFirstItr->second->journalFilePtr_->getFqFileName());
+ while (++itr != fileNumberMap_.end()) {
+ notNeededFilesList_.push_back(itr->second->journalFilePtr_->getFqFileName());
+ delete itr->second->journalFilePtr_;
+ delete itr->second;
+ }
+ fileNumberMap_.erase(unwantedFirstItr, fileNumberMap_.end());
+ }
+}
+
bool RecoveryManager::needNextFile() {
if (inFileStream_.is_open()) {
return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
@@ -820,7 +869,7 @@ bool RecoveryManager::readFileHeader() {
currentSerial_ = fhdr._rhdr._serial;
} else {
inFileStream_.close();
- if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+ if (currentJournalFileItr_ == fileNumberMap_.begin()) {
journalEmptyFlag_ = true;
}
return false;
@@ -855,9 +904,11 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
}
void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
- while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
-//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG
- emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName());
+ while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
+ RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
+ emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+ delete rfdp->journalFilePtr_;
+ delete rfdp;
fileNumberMap_.erase(fileNumberMap_.begin()->first);
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
index 997596938b..e19f92e305 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
@@ -51,15 +51,21 @@ struct RecoveredRecordData_t {
RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn);
};
+struct RecoveredFileData_t {
+ JournalFile* journalFilePtr_;
+ uint32_t completedDblkCount_;
+ RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount);
+};
+
bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b);
class RecoveryManager
{
protected:
// Types
- typedef std::vector<std::string> directoryList_t;
- typedef directoryList_t::const_iterator directoryListConstItr_t;
- typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
+ typedef std::vector<std::string> stringList_t;
+ typedef stringList_t::const_iterator stringListConstItr_t;
+ typedef std::map<uint64_t, RecoveredFileData_t*> fileNumberMap_t;
typedef fileNumberMap_t::iterator fileNumberMapItr_t;
typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
typedef std::vector<RecoveredRecordData_t> recordIdList_t;
@@ -74,18 +80,20 @@ protected:
// Initial journal analysis data
fileNumberMap_t fileNumberMap_; ///< File number - JournalFilePtr map
+ stringList_t notNeededFilesList_; ///< Files not needed and to be returned to EFP
+ stringList_t uninitFileList_; ///< File name of uninitialized journal files found during header analysis
bool journalEmptyFlag_; ///< Journal data files empty
std::streamoff firstRecordOffset_; ///< First record offset in ffid
std::streamoff endOffset_; ///< End offset (first byte past last record)
uint64_t highestRecordId_; ///< Highest rid found
uint64_t highestFileNumber_; ///< Highest file number found
bool lastFileFullFlag_; ///< Last file is full
- std::string uninitializedJournal_; ///< File name of uninitialized journal found during header analysis
+ uint64_t initial_fid_; ///< File id where initial write after recovery will occur
// State for recovery of individual enqueued records
uint64_t currentSerial_;
uint32_t efpFileSize_kib_;
- fileNumberMapConstItr_t currentJournalFileConstItr_;
+ fileNumberMapConstItr_t currentJournalFileItr_;
std::string currentFileName_;
std::ifstream inFileStream_;
recordIdList_t recordIdList_;
@@ -121,16 +129,18 @@ public:
protected:
void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
void checkFileStreamOk(bool checkEof);
- void checkJournalAlignment(const std::streampos recordPosition);
+ void checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition);
bool decodeRecord(jrec& record,
std::size_t& cumulativeSizeRead,
::rec_hdr_t& recordHeader,
- std::streampos& fileOffset);
+ const uint64_t start_fid,
+ const std::streampos recordOffset);
std::string getCurrentFileName() const;
uint64_t getCurrentFileNumber() const;
bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag);
bool getNextFile(bool jumpToFirstRecordOffsetFlag);
bool getNextRecordHeader();
+ void lastRecord(const uint64_t file_id, const std::streamoff endOffset);
bool needNextFile();
void prepareRecordList();
bool readFileHeader();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
index a4882aaa9c..90ca27d082 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
@@ -181,7 +181,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch
}
bool
-deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
{
if (rec_offs == 0)
{
@@ -228,7 +228,7 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
- check_rec_tail();
+ check_rec_tail(rec_start);
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -274,7 +274,7 @@ deq_rec::rec_size() const
}
void
-deq_rec::check_rec_tail() const {
+deq_rec::check_rec_tail(const std::streampos rec_start) const {
Checksum checksum;
checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t));
if (_deq_hdr._xidsize > 0) {
@@ -284,7 +284,7 @@ deq_rec::check_rec_tail() const {
uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- oss << std::hex;
+ oss << std::endl << " Record offset: 0x" << std::hex << rec_start;
if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
oss << std::endl << " Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
index ead0eed72a..9f55032e76 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
@@ -49,7 +49,7 @@ public:
void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp,
const std::size_t xidlen, const bool txn_coml_commit);
uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
- bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
@@ -59,7 +59,7 @@ public:
inline std::size_t data_size() const { return 0; } // This record never carries data
std::size_t xid_size() const;
std::size_t rec_size() const;
- void check_rec_tail() const;
+ void check_rec_tail(const std::streampos rec_start) const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
index f95a722308..0fecd90cbf 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
@@ -218,7 +218,7 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch
}
bool
-enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
{
if (rec_offs == 0)
{
@@ -291,7 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
- check_rec_tail();
+ check_rec_tail(rec_start);
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -352,7 +352,7 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool
}
void
-enq_rec::check_rec_tail() const {
+enq_rec::check_rec_tail(const std::streampos rec_start) const {
Checksum checksum;
checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
if (_enq_hdr._xidsize > 0) {
@@ -365,7 +365,7 @@ enq_rec::check_rec_tail() const {
uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- oss << std::hex;
+ oss << std::endl << " Record offset: 0x" << std::hex << rec_start;
if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
index 1655e2cc4d..d85cde42f5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
@@ -51,7 +51,7 @@ public:
void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
const void* const xidp, const std::size_t xidlen, const bool transient, const bool external);
uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
- bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
std::size_t get_xid(void** const xidpp);
std::size_t get_data(void** const datapp);
@@ -63,7 +63,7 @@ public:
std::size_t rec_size() const;
static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
- void check_rec_tail() const;
+ void check_rec_tail(const std::streampos rec_start) const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
index 55bac4a2e5..ab367754d5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
@@ -21,6 +21,7 @@
#include "qpid/linearstore/journal/jcntl.h"
+#include <iomanip>
#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/JournalLog.h"
@@ -90,7 +91,7 @@ jcntl::initialize(EmptyFilePool* efpp,
_linearFileController.finalize();
_jdir.clear_dir(); // Clear any existing journal files
_linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
- _linearFileController.pullEmptyFileFromEfp();
+ _linearFileController.getNextJournalFile();
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
_init_flag = true;
}
@@ -120,6 +121,9 @@ jcntl::recover(EmptyFilePoolManager* efpmp,
_jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
_recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
+ if (_recoveryManager.isLastFileFull()) {
+ _linearFileController.getNextJournalFile();
+ }
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
(_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
@@ -316,6 +320,20 @@ jcntl::getLinearFileControllerRef() {
return _linearFileController;
}
+// static
+std::string
+jcntl::str2hexnum(const std::string& str) {
+ if (str.empty()) {
+ return "<null>";
+ }
+ std::ostringstream oss;
+ oss << "(" << str.size() << ")0x" << std::hex;
+ for (unsigned i=str.size(); i>0; --i) {
+ oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+ }
+ return oss.str();
+}
+
iores
jcntl::flush(const bool block_till_aio_cmpl)
{
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
index 2db0e707a7..94c00d2fab 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
@@ -537,6 +537,8 @@ public:
inline virtual void instr_incr_outstanding_aio_cnt() {}
inline virtual void instr_decr_outstanding_aio_cnt() {}
+ static std::string str2hexnum(const std::string& str);
+
protected:
static bool _init;
static bool init_statics();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
index 01c432d37b..8765396b31 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
@@ -167,8 +167,8 @@ jerrno::__init()
_err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found";
// class jrec, enq_rec, deq_rec, txn_rec
- _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header.";
- _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid data record tail.";
+ _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid record header.";
+ _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid record tail.";
// class wmgr
_err_map[JERR_WMGR_BADPGSTATE] = "JERR_WMGR_BADPGSTATE: Page buffer in illegal state for operation.";
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
index 7645e646f6..cad0e5d7a2 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
@@ -98,7 +98,7 @@ public:
* \returns Number of data-blocks encoded.
*/
virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0;
- virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
+ virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) = 0;
virtual std::string& str(std::string& str) const = 0;
virtual std::size_t data_size() const = 0;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
index 1368fd4be2..298ab608b1 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
@@ -176,7 +176,7 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch
}
bool
-txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
{
if (rec_offs == 0)
{
@@ -218,7 +218,7 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
- check_rec_tail();
+ check_rec_tail(rec_start);
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -266,7 +266,7 @@ txn_rec::rec_size() const
}
void
-txn_rec::check_rec_tail() const {
+txn_rec::check_rec_tail(const std::streampos rec_start) const {
Checksum checksum;
checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t));
if (_txn_hdr._xidsize > 0) {
@@ -276,7 +276,7 @@ txn_rec::check_rec_tail() const {
uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- oss << std::hex;
+ oss << std::endl << " Record offset: 0x" << std::hex << rec_start;
if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
oss << std::endl << " Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
index daca16d9d4..4552071595 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
@@ -49,7 +49,7 @@ public:
void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
const std::size_t xidlen);
uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
- bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
std::size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
@@ -57,7 +57,7 @@ public:
std::size_t xid_size() const;
std::size_t rec_size() const;
inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
- void check_rec_tail() const;
+ void check_rec_tail(const std::streampos rec_start) const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index c246837a8d..7c590713f5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -30,8 +30,6 @@
#include "qpid/linearstore/journal/LinearFileController.h"
#include "qpid/linearstore/journal/utils/file_hdr.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -49,7 +47,7 @@ wmgr::wmgr(jcntl* jc,
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
- _txn_pending_set()
+ _txn_pending_map()
{}
wmgr::wmgr(jcntl* jc,
@@ -67,7 +65,7 @@ wmgr::wmgr(jcntl* jc,
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
- _txn_pending_set()
+ _txn_pending_map()
{}
wmgr::~wmgr()
@@ -281,6 +279,7 @@ wmgr::dequeue(data_tok* dtokp,
_deq_busy = true;
}
//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
+ std::string xid((const char*)xid_ptr, xid_len);
bool done = false;
Checksum checksum;
while (!done)
@@ -292,9 +291,27 @@ wmgr::dequeue(data_tok* dtokp,
uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
- // Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0) {
- dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ uint64_t fid;
+ short eres = _emap.get_pfid(dtokp->dequeue_rid(), fid);
+ if (eres == enq_map::EMAP_OK) {
+ dtokp->set_fid(fid);
+ } else if (xid_len > 0) {
+ txn_data_list_t tdl = _tmap.get_tdata_list(xid);
+ bool found = false;
+ for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) {
+ if (i->rid_ == dtokp->dequeue_rid()) {
+ found = true;
+ dtokp->set_fid(i->pfid_);
+ break;
+ }
+ }
+ if (!found) {
+ throw jexception("rid found in neither emap nor tmap, transactional");
+ }
+ } else {
+ throw jexception("rid not found in emap, non-transactional");
+ }
}
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
@@ -325,7 +342,7 @@ wmgr::dequeue(data_tok* dtokp,
if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
+ oss << std::hex << "emap: rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
}
if (eres == enq_map::EMAP_LOCKED)
@@ -335,10 +352,6 @@ wmgr::dequeue(data_tok* dtokp,
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
-//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG
-//try {
- _lfc.decrEnqueuedRecordCount(fid);
-//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; }
}
done = true;
@@ -427,14 +440,16 @@ wmgr::abort(data_tok* dtokp,
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ fidl_t fidl;
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (!itr->enq_flag_)
_emap.unlock(itr->drid_); // ignore rid not found error
- if (itr->enq_flag_)
- _lfc.decrEnqueuedRecordCount(itr->pfid_);
+ if (itr->enq_flag_) {
+ fidl.push_back(itr->pfid_);
+ }
}
- std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
if (!res.second)
{
std::ostringstream oss;
@@ -526,6 +541,7 @@ wmgr::commit(data_tok* dtokp,
// Delete this txn from tmap, process records into emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ fidl_t fidl;
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->enq_flag_) // txn enqueue
@@ -547,20 +563,20 @@ wmgr::commit(data_tok* dtokp,
if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
+ oss << std::hex << "emap: rid=0x" << itr->drid_;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "commit");
}
if (eres == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
+ oss << std::hex << "rid=0x" << itr->drid_;
+ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "commit");
}
}
- _lfc.decrEnqueuedRecordCount(fid);
+ fidl.push_back(fid);
}
}
- std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
if (!res.second)
{
std::ostringstream oss;
@@ -695,7 +711,7 @@ wmgr::get_next_file()
{
_pg_cntr = 0;
//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG
- _lfc.pullEmptyFileFromEfp();
+ _lfc.getNextJournalFile();
}
int32_t
@@ -757,7 +773,7 @@ wmgr::get_events(timespec* const timeout,
data_tok* dtokp = pcbp->_pdtokl->at(k);
if (dtokp->decr_pg_cnt() == 0)
{
- std::set<std::string>::iterator it;
+ pending_txn_map_itr_t it;
switch (dtokp->wstate())
{
case data_tok::ENQ_SUBM:
@@ -770,6 +786,9 @@ wmgr::get_events(timespec* const timeout,
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::DEQ_SUBM:
+ if (!dtokp->has_xid()) {
+ _lfc.decrEnqueuedRecordCount(dtokp->fid());
+ }
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::DEQ);
@@ -781,31 +800,35 @@ wmgr::get_events(timespec* const timeout,
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::ABORTED);
- it = _txn_pending_set.find(dtokp->xid());
- if (it == _txn_pending_set.end())
+ it = _txn_pending_map.find(dtokp->xid());
+ if (it == _txn_pending_map.end())
{
std::ostringstream oss;
- oss << std::hex << "_txn_pending_set: abort xid=\"";
- oss << dtokp->xid() << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
- "get_events");
+ oss << std::hex << "_txn_pending_set: abort xid=\""
+ << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+ }
+ for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+ _lfc.decrEnqueuedRecordCount(*i);
}
- _txn_pending_set.erase(it);
+ _txn_pending_map.erase(it);
break;
case data_tok::COMMIT_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::COMMITTED);
- it = _txn_pending_set.find(dtokp->xid());
- if (it == _txn_pending_set.end())
+ it = _txn_pending_map.find(dtokp->xid());
+ if (it == _txn_pending_map.end())
{
std::ostringstream oss;
- oss << std::hex << "_txn_pending_set: commit xid=\"";
- oss << dtokp->xid() << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
- "get_events");
+ oss << std::hex << "_txn_pending_set: commit xid=\""
+ << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+ }
+ for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+ _lfc.decrEnqueuedRecordCount(*i);
}
- _txn_pending_set.erase(it);
+ _txn_pending_map.erase(it);
break;
case data_tok::ENQ_PART:
case data_tok::DEQ_PART:
@@ -858,8 +881,8 @@ wmgr::is_txn_synced(const std::string& xid)
if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED)
return false;
// Check for outstanding commit/aborts
- std::set<std::string>::iterator it = _txn_pending_set.find(xid);
- return it == _txn_pending_set.end();
+ pending_txn_map_itr_t it = _txn_pending_map.find(xid);
+ return it == _txn_pending_map.end();
}
void
@@ -871,7 +894,6 @@ wmgr::initialize(aio_callback* const cbp,
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
_page_cb_arr[0]._state = IN_USE;
- _ddtokl.clear();
_cached_offset_dblks = 0;
_enq_busy = false;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
index 8837e51e97..8eaa2364ad 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
@@ -22,9 +22,11 @@
#ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H
#define QPID_LINEARSTORE_JOURNAL_WMGR_H
+#include <deque>
+#include <map>
#include "qpid/linearstore/journal/enums.h"
#include "qpid/linearstore/journal/pmgr.h"
-#include <set>
+#include <vector>
namespace qpid {
namespace linearstore {
@@ -51,11 +53,15 @@ class LinearFileController;
class wmgr : public pmgr
{
private:
+ typedef std::vector<uint64_t> fidl_t;
+ typedef fidl_t::iterator fidl_itr_t;
+ typedef std::map<std::string, fidl_t> pending_txn_map_t;
+ typedef pending_txn_map_t::iterator pending_txn_map_itr_t;
+
LinearFileController& _lfc; ///< Linear File Controller ref
uint32_t _max_dtokpp; ///< Max data writes per page
uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
- std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
// TODO: Convert _enq_busy etc into a proper threadsafe lock
// TODO: Convert to enum? Are these encodes mutually exclusive?
@@ -70,7 +76,7 @@ private:
enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
- std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
+ pending_txn_map_t _txn_pending_map; ///< Set containing xids of pending commits/aborts
public:
wmgr(jcntl* jc,