summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES28
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp13
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp68
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/JournalFile.h13
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp82
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h13
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp195
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp20
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jrec.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp100
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.h12
22 files changed, 371 insertions, 247 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 8c5b08bb61..3aec281858 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -34,29 +34,36 @@ Current/pending:
** Basic performance tests
5362 - Linearstore: No store tools exist for examining the journals
svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+ svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+ svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
* Store analysis and status
* Recovery/reading of message content
* Empty file pool status and management
5464 - [linearstore] Incompletely created journal files accumulate in EFP
- 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
- * Probablilty: 2 of 600 (0.3%) using tx-test-soak.sh
- 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
+# 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
+ * Probability: 2 of 600 (0.3%) using tx-test-soak.sh
+# 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
* Probability: 6 of 600 (1.0%) using tx-test-soak.sh
* If broker is started a second time after failure, it starts correctly and test completes ok.
+ * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally).
+ * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary
5484 1035843 Slow performance for producers
svn r.1558592 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers.
- - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+ svn r.1558913 replaces use of /dev/urandom with several calls to rand() to construct a 64-bit random number.
+ * Recommend rebuilding and testing for performance again with these two fixes.
+# - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
UNABLE TO REPRODUCE - but Frantizek has additional info
- 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
* Possible dup of 1039525
- * May be fixed by QPID-5483 - waiting for needinfo
+ * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing
- 1039525 Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL
* Possible dup of 1039522
- * May be fixed by QPID-5483 - waiting for needinfo
- 5487 - [linearstore] Replace use of /dev/urandom with c random generator calls
+ * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing
+# - 1049870 [LinearStore] auto-delete property does not survive restart
-Fixed/closed:
-=============
+Fixed/closed (in commit order):
+===============================
Q-JIRA RHBZ Description / Comments
------ ------- ----------------------
5357 1052518 Linearstore: Empty file recycling not functional
@@ -85,6 +92,8 @@ NO-JIRA - Added missing Apache copyright/license text
svn r.1558589 2014-01-15: Proposed fix
* May be linked to RHBZ 1039522 - waiting for needinfo
* May be linked to RHBZ 1039525 - waiting for needinfo
+ 5487 1054448 [linearstore] Replace use of /dev/urandom with c random generator calls
+ svn r.1558913 2014-01-16: Proposed fix
Future:
=======
@@ -101,3 +110,4 @@ Code tidy-up
* Member names: xxx_
* Rename classes, functions and variables to camel-case
* Add Doxygen docs to classes
+* Make fid's consistent in name (fid, file_id, pfid) and format (hex vs decimal)
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 483b494c2c..ff5b41b962 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -593,7 +593,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
std::ostringstream oss;
oss << "Recovered transaction prepared list:";
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- oss << std::endl << " " << str2hexnum(i->xid);
+ oss << std::endl << " " << qpid::linearstore::journal::jcntl::str2hexnum(i->xid);
}
QLS_LOG(debug, oss.str());
@@ -1292,7 +1292,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_,
mgmtObject->inc_tplTxnAborts();
}
} catch (const std::exception& e) {
- QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what());
+ QLS_LOG(error, "Error completing xid " << qpid::linearstore::journal::jcntl::str2hexnum(txn_.getXid()) << ": " << e.what());
throw;
}
}
@@ -1516,15 +1516,6 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) {
journalList.erase(j_.id());
}
-std::string MessageStoreImpl::str2hexnum(const std::string& str) {
- std::ostringstream oss;
- oss << "(" << str.size() << ")0x" << std::hex;
- for (unsigned i=str.size(); i>0; --i) {
- oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
- }
- return oss.str();
-}
-
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
qpid::Options(name_),
truncateFlag(defTruncateFlag),
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 3157b9be9d..c2eb0deab0 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -235,8 +235,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
}
void chkTplStoreInit();
- static std::string str2hexnum(const std::string& str);
-
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
index 743d12989a..df2e7a442d 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
@@ -52,7 +52,9 @@ void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) {
jc->txn_abort(dtokp.get(), getXid());
}
} catch (const qpid::linearstore::journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ std::ostringstream oss;
+ oss << "Error during " << (commit ? "commit" : "abort") << ": " << e.what();
+ THROW_STORE_EXCEPTION(oss.str());
}
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
index 1b2025bd5a..fc6ced4fd2 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
@@ -27,18 +27,18 @@
#include "qpid/linearstore/journal/utils/file_hdr.h"
#include <unistd.h>
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
JournalFile::JournalFile(const std::string& fqFileName,
const efpIdentity_t& efpIdentity,
- const uint64_t fileSeqNum) :
+ const uint64_t fileSeqNum,
+ const std::string queueName) :
efpIdentity_(efpIdentity),
fqFileName_(fqFileName),
fileSeqNum_(fileSeqNum),
+ queueName_(queueName),
serial_(getRandom64()),
firstRecordOffset_(0ULL),
fileHandle_(-1),
@@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::string& fqFileName,
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ initializedFlag_(false),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::string& fqFileName,
{}
JournalFile::JournalFile(const std::string& fqFileName,
- const ::file_hdr_t& fileHeader) :
+ const ::file_hdr_t& fileHeader,
+ const std::string queueName) :
efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib),
fqFileName_(fqFileName),
fileSeqNum_(fileHeader._file_number),
+ queueName_(queueName),
serial_(fileHeader._rhdr._serial),
firstRecordOffset_(fileHeader._fro),
fileHandle_(-1),
@@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::string& fqFileName,
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ initializedFlag_(false),
enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
submittedDblkCount_("JournalFile::submittedDblkCount", 0),
completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -78,18 +82,21 @@ JournalFile::~JournalFile() {
void
JournalFile::initialize(const uint32_t completedDblkCount) {
- if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
- {
- std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
- oss << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+ if (!initializedFlag_) {
+ if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
+ {
+ std::ostringstream oss;
+ oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
+ oss << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+ }
+ fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
+ aioControlBlockPtr_ = new aio_cb;
+ initializedFlag_ = true;
}
- fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
- aioControlBlockPtr_ = new aio_cb;
if (completedDblkCount > 0UL) {
- submittedDblkCount_.add(completedDblkCount);
- completedDblkCount_.add(completedDblkCount);
+ submittedDblkCount_.set(completedDblkCount);
+ completedDblkCount_.set(completedDblkCount);
}
}
@@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
const efpDataSize_kib_t efpDataSize_kib,
const uint16_t userFlags,
const uint64_t recordId,
- const uint64_t firstRecordOffset,
- const std::string queueName) {
+ const uint64_t firstRecordOffset) {
firstRecordOffset_ = firstRecordOffset;
::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
::file_hdr_init(fileHeaderBasePtr_,
@@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr,
recordId,
firstRecordOffset,
fileSeqNum_,
- queueName.size(),
- queueName.data());
- aio::prep_pwrite(aioControlBlockPtr_,
- fileHandle_,
- (void*)fileHeaderBasePtr_,
- QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
- 0UL);
- if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0)
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
+ queueName_.size(),
+ queueName_.data());
+ const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+ aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL);
+ if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) {
+ std::ostringstream oss;
+ oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0";
+ throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite");
+ }
addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
incrOutstandingAioOperationCount();
}
@@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_context_t ioContextPtr,
aio_cb* aioControlBlockPtr,
void* data,
uint32_t dataSize_dblks) {
- aio::prep_pwrite_2(aioControlBlockPtr,
- fileHandle_,
- data,
- dataSize_dblks * QLS_DBLK_SIZE_BYTES,
- submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES);
+ const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES;
+ const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES;
+ aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs);
pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb)
pcbp->_wdblks = dataSize_dblks;
pcbp->_jfp = this;
if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) {
- throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details
+ std::ostringstream oss;
+ oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs;
+ throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite");
}
addSubmittedDblkCount(dataSize_dblks);
incrOutstandingAioOperationCount();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
index f0ad432fd8..e33830ef7f 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
@@ -38,6 +38,7 @@ protected:
const efpIdentity_t efpIdentity_;
const std::string fqFileName_;
const uint64_t fileSeqNum_;
+ const std::string queueName_;
const uint64_t serial_;
uint64_t firstRecordOffset_;
int fileHandle_;
@@ -46,6 +47,7 @@ protected:
::file_hdr_t* fileHeaderPtr_;
aio_cb* aioControlBlockPtr_;
uint32_t fileSize_dblks_; ///< File size in data blocks, including file header
+ bool initializedFlag_;
AtomicCounter<uint32_t> enqueuedRecordCount_; ///< Count of enqueued records
AtomicCounter<uint32_t> submittedDblkCount_; ///< Write file count (data blocks) for submitted AIO
@@ -56,10 +58,12 @@ public:
// Constructor for creating new file with known fileSeqNum and random serial
JournalFile(const std::string& fqFileName,
const efpIdentity_t& efpIdentity,
- const uint64_t fileSeqNum);
+ const uint64_t fileSeqNum,
+ const std::string queueName);
// Constructor for recovery in which fileSeqNum and serial are recovered from fileHeader param
JournalFile(const std::string& fqFileName,
- const ::file_hdr_t& fileHeader);
+ const ::file_hdr_t& fileHeader,
+ const std::string queueName);
virtual ~JournalFile();
void initialize(const uint32_t completedDblkCount);
@@ -76,13 +80,13 @@ public:
const efpDataSize_kib_t efpDataSize_kib,
const uint16_t userFlags,
const uint64_t recordId,
- const uint64_t firstRecordOffset,
- const std::string queueName);
+ const uint64_t firstRecordOffset);
void asyncPageWrite(io_context_t ioContextPtr,
aio_cb* aioControlBlockPtr,
void* data,
uint32_t dataSize_dblks);
+ uint32_t getSubmittedDblkCount() const;
uint32_t getEnqueuedRecordCount() const;
uint32_t incrEnqueuedRecordCount();
uint32_t decrEnqueuedRecordCount();
@@ -109,7 +113,6 @@ protected:
static uint64_t getRandom64();
bool isOpen() const;
- uint32_t getSubmittedDblkCount() const;
uint32_t addSubmittedDblkCount(const uint32_t a);
uint32_t getCompletedDblkCount() const;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index 5483f3bb94..86d1b0e93c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -25,8 +25,6 @@
#include "qpid/linearstore/journal/jcntl.h"
#include "qpid/linearstore/journal/JournalFile.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -34,10 +32,10 @@ namespace journal {
LinearFileController::LinearFileController(jcntl& jcntlRef) :
jcntlRef_(jcntlRef),
emptyFilePoolPtr_(0),
- currentJournalFilePtr_(0),
fileSeqCounter_("LinearFileController::fileSeqCounter", 0),
recordIdCounter_("LinearFileController::recordIdCounter", 0),
- decrCounter_("LinearFileController::decrCounter", 0)
+ decrCounter_("LinearFileController::decrCounter", 0),
+ currentJournalFilePtr_(0)
{}
LinearFileController::~LinearFileController() {}
@@ -53,7 +51,7 @@ void LinearFileController::initialize(const std::string& journalDirectory,
void LinearFileController::finalize() {
if (currentJournalFilePtr_) {
currentJournalFilePtr_->close();
- currentJournalFilePtr_ = NULL;
+ currentJournalFilePtr_ = 0;
}
while (!journalFileList_.empty()) {
delete journalFileList_.front();
@@ -62,17 +60,21 @@ void LinearFileController::finalize() {
}
void LinearFileController::addJournalFile(JournalFile* journalFilePtr,
- const uint32_t completedDblkCount) {
- if (currentJournalFilePtr_) {
+ const uint32_t completedDblkCount,
+ const bool makeCurrentFlag) {
+ if (makeCurrentFlag && currentJournalFilePtr_) {
currentJournalFilePtr_->close();
+ currentJournalFilePtr_ = 0;
}
journalFilePtr->initialize(completedDblkCount);
- currentJournalFilePtr_ = journalFilePtr;
{
slock l(journalFileListMutex_);
- journalFileList_.push_back(currentJournalFilePtr_);
+ journalFileList_.push_back(journalFilePtr);
+ }
+ if (makeCurrentFlag) {
+ currentJournalFilePtr_ = journalFilePtr;
+ currentJournalFilePtr_->open();
}
- currentJournalFilePtr_->open();
}
efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
@@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
return emptyFilePoolPtr_->fileSize_sblks();
}
+void LinearFileController::getNextJournalFile() {
+ if (currentJournalFilePtr_)
+ currentJournalFilePtr_->close();
+ pullEmptyFileFromEfp();
+}
+
uint64_t LinearFileController::getNextRecordId() {
return recordIdCounter_.increment();
}
-void LinearFileController::pullEmptyFileFromEfp() {
- if (currentJournalFilePtr_)
- currentJournalFilePtr_->close();
- std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
-//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
- addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
+void LinearFileController::removeFileToEfp(const std::string& fileName) {
+ if (emptyFilePoolPtr_) {
+ emptyFilePoolPtr_->returnEmptyFile(fileName);
+ }
}
void LinearFileController::restoreEmptyFile(const std::string& fileName) {
@@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) {
void LinearFileController::purgeEmptyFilesToEfp() {
slock l(journalFileListMutex_);
- purgeEmptyFilesToEfpNoLock();
+ while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
+ emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
+ }
}
uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
@@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t file
}
uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex_);
uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
// TODO: Re-evaluate after testing and profiling
@@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file
// records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data
// records) without impacting performance or performing badly (leaving excessive empty files in the journals).
if (decrCounter_.increment() % 100ULL == 0ULL) {
- purgeEmptyFilesToEfpNoLock();
+ purgeEmptyFilesToEfp();
}
return r;
}
uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
- slock l(journalFileListMutex_);
return find(fileSeqNumber)->addCompletedDblkCount(a);
}
uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex_);
return find(fileSeqNumber)->decrOutstandingAioOperationCount();
}
@@ -142,12 +149,11 @@ void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
const uint64_t recordId,
const uint64_t firstRecordOffset) {
currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
- emptyFilePoolPtr_->getPartitionNumber(),
- emptyFilePoolPtr_->dataSize_kib(),
- userFlags,
- recordId,
- firstRecordOffset,
- jcntlRef_.id());
+ emptyFilePoolPtr_->getPartitionNumber(),
+ emptyFilePoolPtr_->dataSize_kib(),
+ userFlags,
+ recordId,
+ firstRecordOffset);
}
void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
@@ -195,8 +201,8 @@ void LinearFileController::addJournalFile(const std::string& fileName,
const efpIdentity_t& efpIdentity,
const uint64_t fileNumber,
const uint32_t completedDblkCount) {
- JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber);
- addJournalFile(jfp, completedDblkCount);
+ JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id());
+ addJournalFile(jfp, completedDblkCount, true);
}
void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
@@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJournalFileValid() const {
return currentJournalFilePtr_ != 0;
}
-// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
- if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+ if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
return currentJournalFilePtr_;
+
+ slock l(journalFileListMutex_);
for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
if ((*i)->getFileSeqNum() == fileSeqNumber) {
return *i;
}
}
+
std::ostringstream oss;
oss << "fileSeqNumber=" << fileSeqNumber;
throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
@@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFileSeqNum() {
return fileSeqCounter_.increment();
}
-void LinearFileController::purgeEmptyFilesToEfpNoLock() {
-//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG
- while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() &&
- journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
-//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG
- emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
- delete journalFileList_.front();
- journalFileList_.pop_front();
- }
+void LinearFileController::pullEmptyFileFromEfp() {
+ std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name
+ addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
}
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
index 933b9792a4..05f08144b9 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
@@ -44,12 +44,12 @@ protected:
jcntl& jcntlRef_;
std::string journalDirectory_;
EmptyFilePool* emptyFilePoolPtr_;
- JournalFile* currentJournalFilePtr_;
AtomicCounter<uint64_t> fileSeqCounter_;
AtomicCounter<uint64_t> recordIdCounter_;
AtomicCounter<uint64_t> decrCounter_;
JournalFileList_t journalFileList_;
+ JournalFile* currentJournalFilePtr_;
smutex journalFileListMutex_;
public:
@@ -62,12 +62,14 @@ public:
void finalize();
void addJournalFile(JournalFile* journalFilePtr,
- const uint32_t completedDblkCount);
+ const uint32_t completedDblkCount,
+ const bool makeCurrentFlag);
efpDataSize_sblks_t dataSize_sblks() const;
efpFileSize_sblks_t fileSize_sblks() const;
+ void getNextJournalFile();
uint64_t getNextRecordId();
- void pullEmptyFileFromEfp();
+ void removeFileToEfp(const std::string& fileName);
void restoreEmptyFile(const std::string& fileName);
void purgeEmptyFilesToEfp();
@@ -105,11 +107,12 @@ protected:
bool checkCurrentJournalFileValid() const;
JournalFile* find(const efpFileCount_t fileSeqNumber);
uint64_t getNextFileSeqNum();
- void purgeEmptyFilesToEfpNoLock();
+ void pullEmptyFileFromEfp();
};
typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr,
- const uint32_t completedDblkCount);
+ const uint32_t completedDblkCount,
+ const bool makeCurrentFlag);
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 72308cc929..a1cec53ca1 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -56,11 +56,15 @@ RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t
pendingTransaction_(ptxn)
{}
-
bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
return a.recordId_ < b.recordId_;
}
+RecoveredFileData_t::RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount) :
+ journalFilePtr_(journalFilePtr),
+ completedDblkCount_(completedDblkCount)
+{}
+
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
enq_map& enqueueMapRef,
@@ -77,11 +81,17 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory,
highestRecordId_(0ULL),
highestFileNumber_(0ULL),
lastFileFullFlag_(false),
+ initial_fid_(0),
currentSerial_(0),
efpFileSize_kib_(0)
{}
-RecoveryManager::~RecoveryManager() {}
+RecoveryManager::~RecoveryManager() {
+ for (fileNumberMapItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
+ delete i->second;
+ }
+ fileNumberMap_.clear();
+}
void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
EmptyFilePoolManager* emptyFilePoolManager,
@@ -92,9 +102,6 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
*emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
- // Check for file full condition
- lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
-
if (!journalEmptyFlag_) {
// Read all records, establish remaining enqueued records
@@ -106,6 +113,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
inFileStream_.close();
}
+ // Check for file full condition
+ lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
// Remove leading files which have no enqueued records
removeEmptyFiles(*emptyFilePoolPtrPtr);
@@ -121,7 +131,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
// Unlock any affected enqueues in emap
for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
if (i->enq_flag_) { // enq op - decrement enqueue count
- fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
+ fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
// enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
@@ -174,7 +184,7 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
}
} while (!foundRecord);
- if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+ if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) {
if (!getFile(recordIdListConstItr_->fileId_, false)) {
std::ostringstream oss;
oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
@@ -231,7 +241,6 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
::rec_tail_t enqueueTail;
inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
uint32_t cs = checksum.getChecksum();
-//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
if (res != 0) {
std::stringstream oss;
@@ -266,17 +275,30 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr) {
if (journalEmptyFlag_) {
- if (uninitializedJournal_.size() > 0) {
- lfcPtr->restoreEmptyFile(uninitializedJournal_);
+ if (uninitFileList_.size() > 0) {
+ std::string uninitFile = uninitFileList_.back();
+ uninitFileList_.pop_back();
+ lfcPtr->restoreEmptyFile(uninitFile);
}
} else {
for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
- uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file?
- endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset
- efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
- (lfcPtr->*fnPtr)(i->second, fileDblkCount);
+ (lfcPtr->*fnPtr)(i->second->journalFilePtr_, i->second->completedDblkCount_, i->first == initial_fid_);
}
}
+
+ std::ostringstream oss;
+ bool logFlag = !notNeededFilesList_.empty();
+ if (logFlag) {
+ oss << "Files removed from head of journal: prior truncation during recovery:";
+ }
+ while (!notNeededFilesList_.empty()) {
+ lfcPtr->removeFileToEfp(notNeededFilesList_.back());
+ oss << std::endl << " * " << notNeededFilesList_.back();
+ notNeededFilesList_.pop_back();
+ }
+ if (logFlag) {
+ journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
+ }
}
std::string RecoveryManager::toString(const std::string& jid) {
@@ -285,7 +307,7 @@ std::string RecoveryManager::toString(const std::string& jid) {
oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
oss << " Journal File List:" << std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->getFqFileName();
+ std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
}
oss << " Enqueue Counts: [ ";
@@ -293,7 +315,7 @@ std::string RecoveryManager::toString(const std::string& jid) {
if (l != fileNumberMap_.begin()) {
oss << ", ";
}
- oss << l->second->getEnqueuedRecordCount();
+ oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
}
oss << " ]" << std::endl;
oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
@@ -330,15 +352,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
<< std::setw(10) << "--------"
<< std::endl;
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->getFqFileName();
+ std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
+ std::ostringstream fid;
+ fid << std::hex << "0x" << k->first;
std::ostringstream fro;
- fro << std::hex << "0x" << k->second->getFirstRecordOffset();
- oss << indentStr << std::setw(7) << k->first
+ fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset();
+ oss << indentStr << std::setw(7) << fid.str()
<< std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
<< std::setw(16) << fro.str()
- << std::setw(12) << k->second->getEnqueuedRecordCount()
- << std::setw(5) << k->second->getEfpIdentity().pn_
- << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+ << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount()
+ << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_
+ << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
<< std::endl;
}
oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
@@ -347,7 +371,7 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << indentStr << "Enqueued records (txn & non-txn):";
+ //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report
}
return oss.str();
}
@@ -357,27 +381,28 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
std::string headerQueueName;
::file_hdr_t fileHeader;
- directoryList_t directoryList;
+ stringList_t directoryList;
jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
- for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+ for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
readJournalFileHeader(*i, fileHeader, headerQueueName);
if (headerQueueName.empty()) {
std::ostringstream oss;
- if (uninitializedJournal_.empty()) {
- oss << "Journal file " << (*i) << " is first uninitialized (not yet written) journal file.";
- journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
- uninitializedJournal_ = *i;
- } else {
- oss << "Journal file " << (*i) << " is second or greater uninitialized journal file - ignoring";
- journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
- }
+ oss << "Journal file " << (*i) << " is uninitialized";
+ journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+ uninitFileList_.push_back(*i);
} else if (headerQueueName.compare(queueName_) != 0) {
std::ostringstream oss;
oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
} else {
- JournalFile* jfp = new JournalFile(*i, fileHeader);
- fileNumberMap_[fileHeader._file_number] = jfp;
+ JournalFile* jfp = new JournalFile(*i, fileHeader, queueName_);
+ std::pair<fileNumberMapItr_t, bool> res = fileNumberMap_.insert(
+ std::pair<uint64_t, RecoveredFileData_t*>(fileHeader._file_number, new RecoveredFileData_t(jfp, 0)));
+ if (!res.second) {
+ std::ostringstream oss;
+ oss << "Journal file " << (*i) << " has fid=0x" << std::hex << jfp->getFileSeqNum() << " which already exists for this journal.";
+ throw jexception(oss.str()); // TODO: complete this exception
+ }
if (fileHeader._file_number > highestFileNumber_) {
highestFileNumber_ = fileHeader._file_number;
}
@@ -393,7 +418,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
if (fileNumberMap_.empty()) {
journalEmptyFlag_ = true;
} else {
- currentJournalFileConstItr_ = fileNumberMap_.begin();
+ currentJournalFileItr_ = fileNumberMap_.begin();
}
}
@@ -408,7 +433,7 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) {
}
}
-void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+void RecoveryManager::checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition) {
if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
std::ostringstream oss;
oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
@@ -420,12 +445,13 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
if (sblkOffset)
{
std::ostringstream oss1;
- oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+ oss1 << std::hex << "Bad record alignment found at fid=0x" << start_fid;
oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
- std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ fileNumberMapConstItr_t fnmItr = fileNumberMap_.find(start_fid);
+ std::ofstream outFileStream(fnmItr->second->journalFilePtr_->getFqFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
if (!outFileStream.good()) {
throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
}
@@ -447,7 +473,7 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
}
std::ostringstream oss2;
- oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+ oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << start_fid;
oss2 << " offs=0x" << currentPosn;
journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
currentPosn = outFileStream.tellp();
@@ -456,16 +482,15 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition)
std::free(writeBuffer);
journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
}
- endOffset_ = currentPosn;
+ lastRecord(start_fid, currentPosn);
}
bool RecoveryManager::decodeRecord(jrec& record,
std::size_t& cumulativeSizeRead,
::rec_hdr_t& headerRecord,
- std::streampos& fileOffset)
+ const uint64_t start_fid,
+ const std::streampos recordOffset)
{
- std::streampos start_file_offs = fileOffset;
-
if (highestRecordId_ == 0) {
highestRecordId_ = headerRecord._rid;
} else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
@@ -475,7 +500,7 @@ bool RecoveryManager::decodeRecord(jrec& record,
bool done = false;
while (!done) {
try {
- done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+ done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead, recordOffset);
}
catch (const jexception& e) {
if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
@@ -485,11 +510,12 @@ bool RecoveryManager::decodeRecord(jrec& record,
} else {
journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
}
- checkJournalAlignment(start_file_offs);
+ checkJournalAlignment(start_fid, recordOffset);
return false;
}
if (!done && needNextFile()) {
if (!getNextFile(false)) {
+ checkJournalAlignment(start_fid, recordOffset);
return false;
}
}
@@ -498,11 +524,11 @@ bool RecoveryManager::decodeRecord(jrec& record,
}
std::string RecoveryManager::getCurrentFileName() const {
- return currentJournalFileConstItr_->second->getFqFileName();
+ return currentJournalFileItr_->second->journalFilePtr_->getFqFileName();
}
uint64_t RecoveryManager::getCurrentFileNumber() const {
- return currentJournalFileConstItr_->first;
+ return currentJournalFileItr_->first;
}
bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
@@ -511,8 +537,8 @@ bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordO
//std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
}
- currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
- if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ currentJournalFileItr_ = fileNumberMap_.find(fileNumber);
+ if (currentJournalFileItr_ == fileNumberMap_.end()) {
return false;
}
inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
@@ -536,7 +562,8 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
if (inFileStream_.is_open()) {
inFileStream_.close();
//std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
- if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
+ currentJournalFileItr_->second->completedDblkCount_ = efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES;
+ if (++currentJournalFileItr_ == fileNumberMap_.end()) {
return false;
}
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
@@ -562,13 +589,15 @@ bool RecoveryManager::getNextRecordHeader()
rec_hdr_t h;
bool hdr_ok = false;
- std::streampos file_pos;
+ uint64_t file_id = 0;
+ std::streampos file_pos = 0;
while (!hdr_ok) {
if (needNextFile()) {
if (!getNextFile(true)) {
return false;
}
}
+ file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
file_pos = inFileStream_.tellg();
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
@@ -587,21 +616,21 @@ bool RecoveryManager::getNextRecordHeader()
}
}
+ uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
switch(h._magic) {
case QLS_ENQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
enq_rec er;
- uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
- if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (!er.is_transient()) { // Ignore transient msgs
- fileNumberMap_[start_fid]->incrEnqueuedRecordCount();
+ fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount();
if (er.xid_size()) {
er.get_xid(&xidp);
if (xidp == 0) {
@@ -629,12 +658,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
deq_rec dr;
- uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
- if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (dr.xid_size()) {
@@ -655,7 +683,7 @@ bool RecoveryManager::getNextRecordHeader()
} else {
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
- fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+ fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
@@ -664,11 +692,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
txn_rec ar;
- if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, unlock any locked records in emap
@@ -680,7 +708,7 @@ bool RecoveryManager::getNextRecordHeader()
txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) {
- fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
+ fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else {
enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
@@ -691,11 +719,11 @@ bool RecoveryManager::getNextRecordHeader()
{
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
- endOffset_ = file_pos;
+ lastRecord(file_id, file_pos);
return false;
}
txn_rec cr;
- if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+ if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, process records into emap
@@ -717,7 +745,7 @@ bool RecoveryManager::getNextRecordHeader()
} else { // txn dequeue
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
- fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+ fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
@@ -729,7 +757,9 @@ bool RecoveryManager::getNextRecordHeader()
inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
checkFileStreamOk(false);
if (needNextFile()) {
+ file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES;
if (!getNextFile(false)) {
+ lastRecord(start_fid, file_pos);
return false;
}
}
@@ -737,17 +767,36 @@ bool RecoveryManager::getNextRecordHeader()
break;
case 0:
//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
- checkJournalAlignment(file_pos);
+ checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
default:
//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
// Stop as this is the overwrite boundary.
- checkJournalAlignment(file_pos);
+ checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
}
return true;
}
+void RecoveryManager::lastRecord(const uint64_t file_id, const std::streamoff endOffset) {
+ endOffset_ = endOffset;
+ initial_fid_ = file_id;
+ fileNumberMap_[file_id]->completedDblkCount_ = endOffset_ / QLS_DBLK_SIZE_BYTES;
+
+ // Remove any files in fileNumberMap_ beyond initial_fid_
+ fileNumberMapItr_t unwantedFirstItr = fileNumberMap_.find(file_id);
+ if (++unwantedFirstItr != fileNumberMap_.end()) {
+ fileNumberMapItr_t itr = unwantedFirstItr;
+ notNeededFilesList_.push_back(unwantedFirstItr->second->journalFilePtr_->getFqFileName());
+ while (++itr != fileNumberMap_.end()) {
+ notNeededFilesList_.push_back(itr->second->journalFilePtr_->getFqFileName());
+ delete itr->second->journalFilePtr_;
+ delete itr->second;
+ }
+ fileNumberMap_.erase(unwantedFirstItr, fileNumberMap_.end());
+ }
+}
+
bool RecoveryManager::needNextFile() {
if (inFileStream_.is_open()) {
return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
@@ -820,7 +869,7 @@ bool RecoveryManager::readFileHeader() {
currentSerial_ = fhdr._rhdr._serial;
} else {
inFileStream_.close();
- if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+ if (currentJournalFileItr_ == fileNumberMap_.begin()) {
journalEmptyFlag_ = true;
}
return false;
@@ -855,9 +904,11 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
}
void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
- while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
-//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG
- emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName());
+ while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
+ RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
+ emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+ delete rfdp->journalFilePtr_;
+ delete rfdp;
fileNumberMap_.erase(fileNumberMap_.begin()->first);
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
index 997596938b..e19f92e305 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
@@ -51,15 +51,21 @@ struct RecoveredRecordData_t {
RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn);
};
+struct RecoveredFileData_t {
+ JournalFile* journalFilePtr_;
+ uint32_t completedDblkCount_;
+ RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount);
+};
+
bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b);
class RecoveryManager
{
protected:
// Types
- typedef std::vector<std::string> directoryList_t;
- typedef directoryList_t::const_iterator directoryListConstItr_t;
- typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
+ typedef std::vector<std::string> stringList_t;
+ typedef stringList_t::const_iterator stringListConstItr_t;
+ typedef std::map<uint64_t, RecoveredFileData_t*> fileNumberMap_t;
typedef fileNumberMap_t::iterator fileNumberMapItr_t;
typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
typedef std::vector<RecoveredRecordData_t> recordIdList_t;
@@ -74,18 +80,20 @@ protected:
// Initial journal analysis data
fileNumberMap_t fileNumberMap_; ///< File number - JournalFilePtr map
+ stringList_t notNeededFilesList_; ///< Files not needed and to be returned to EFP
+ stringList_t uninitFileList_; ///< File name of uninitialized journal files found during header analysis
bool journalEmptyFlag_; ///< Journal data files empty
std::streamoff firstRecordOffset_; ///< First record offset in ffid
std::streamoff endOffset_; ///< End offset (first byte past last record)
uint64_t highestRecordId_; ///< Highest rid found
uint64_t highestFileNumber_; ///< Highest file number found
bool lastFileFullFlag_; ///< Last file is full
- std::string uninitializedJournal_; ///< File name of uninitialized journal found during header analysis
+ uint64_t initial_fid_; ///< File id where initial write after recovery will occur
// State for recovery of individual enqueued records
uint64_t currentSerial_;
uint32_t efpFileSize_kib_;
- fileNumberMapConstItr_t currentJournalFileConstItr_;
+ fileNumberMapConstItr_t currentJournalFileItr_;
std::string currentFileName_;
std::ifstream inFileStream_;
recordIdList_t recordIdList_;
@@ -121,16 +129,18 @@ public:
protected:
void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
void checkFileStreamOk(bool checkEof);
- void checkJournalAlignment(const std::streampos recordPosition);
+ void checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition);
bool decodeRecord(jrec& record,
std::size_t& cumulativeSizeRead,
::rec_hdr_t& recordHeader,
- std::streampos& fileOffset);
+ const uint64_t start_fid,
+ const std::streampos recordOffset);
std::string getCurrentFileName() const;
uint64_t getCurrentFileNumber() const;
bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag);
bool getNextFile(bool jumpToFirstRecordOffsetFlag);
bool getNextRecordHeader();
+ void lastRecord(const uint64_t file_id, const std::streamoff endOffset);
bool needNextFile();
void prepareRecordList();
bool readFileHeader();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
index a4882aaa9c..90ca27d082 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
@@ -181,7 +181,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch
}
bool
-deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
{
if (rec_offs == 0)
{
@@ -228,7 +228,7 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
- check_rec_tail();
+ check_rec_tail(rec_start);
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -274,7 +274,7 @@ deq_rec::rec_size() const
}
void
-deq_rec::check_rec_tail() const {
+deq_rec::check_rec_tail(const std::streampos rec_start) const {
Checksum checksum;
checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t));
if (_deq_hdr._xidsize > 0) {
@@ -284,7 +284,7 @@ deq_rec::check_rec_tail() const {
uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- oss << std::hex;
+ oss << std::endl << " Record offset: 0x" << std::hex << rec_start;
if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
oss << std::endl << " Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
index ead0eed72a..9f55032e76 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
@@ -49,7 +49,7 @@ public:
void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp,
const std::size_t xidlen, const bool txn_coml_commit);
uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
- bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
@@ -59,7 +59,7 @@ public:
inline std::size_t data_size() const { return 0; } // This record never carries data
std::size_t xid_size() const;
std::size_t rec_size() const;
- void check_rec_tail() const;
+ void check_rec_tail(const std::streampos rec_start) const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
index f95a722308..0fecd90cbf 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
@@ -218,7 +218,7 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch
}
bool
-enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
{
if (rec_offs == 0)
{
@@ -291,7 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
- check_rec_tail();
+ check_rec_tail(rec_start);
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -352,7 +352,7 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool
}
void
-enq_rec::check_rec_tail() const {
+enq_rec::check_rec_tail(const std::streampos rec_start) const {
Checksum checksum;
checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
if (_enq_hdr._xidsize > 0) {
@@ -365,7 +365,7 @@ enq_rec::check_rec_tail() const {
uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- oss << std::hex;
+ oss << std::endl << " Record offset: 0x" << std::hex << rec_start;
if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
index 1655e2cc4d..d85cde42f5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
@@ -51,7 +51,7 @@ public:
void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
const void* const xidp, const std::size_t xidlen, const bool transient, const bool external);
uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
- bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
std::size_t get_xid(void** const xidpp);
std::size_t get_data(void** const datapp);
@@ -63,7 +63,7 @@ public:
std::size_t rec_size() const;
static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
- void check_rec_tail() const;
+ void check_rec_tail(const std::streampos rec_start) const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
index 55bac4a2e5..ab367754d5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
@@ -21,6 +21,7 @@
#include "qpid/linearstore/journal/jcntl.h"
+#include <iomanip>
#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/JournalLog.h"
@@ -90,7 +91,7 @@ jcntl::initialize(EmptyFilePool* efpp,
_linearFileController.finalize();
_jdir.clear_dir(); // Clear any existing journal files
_linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
- _linearFileController.pullEmptyFileFromEfp();
+ _linearFileController.getNextJournalFile();
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
_init_flag = true;
}
@@ -120,6 +121,9 @@ jcntl::recover(EmptyFilePoolManager* efpmp,
_jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
_recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
+ if (_recoveryManager.isLastFileFull()) {
+ _linearFileController.getNextJournalFile();
+ }
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
(_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
@@ -316,6 +320,20 @@ jcntl::getLinearFileControllerRef() {
return _linearFileController;
}
+// static
+std::string
+jcntl::str2hexnum(const std::string& str) {
+ if (str.empty()) {
+ return "<null>";
+ }
+ std::ostringstream oss;
+ oss << "(" << str.size() << ")0x" << std::hex;
+ for (unsigned i=str.size(); i>0; --i) {
+ oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+ }
+ return oss.str();
+}
+
iores
jcntl::flush(const bool block_till_aio_cmpl)
{
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
index 2db0e707a7..94c00d2fab 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
@@ -537,6 +537,8 @@ public:
inline virtual void instr_incr_outstanding_aio_cnt() {}
inline virtual void instr_decr_outstanding_aio_cnt() {}
+ static std::string str2hexnum(const std::string& str);
+
protected:
static bool _init;
static bool init_statics();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
index 01c432d37b..8765396b31 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
@@ -167,8 +167,8 @@ jerrno::__init()
_err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found";
// class jrec, enq_rec, deq_rec, txn_rec
- _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header.";
- _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid data record tail.";
+ _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid record header.";
+ _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid record tail.";
// class wmgr
_err_map[JERR_WMGR_BADPGSTATE] = "JERR_WMGR_BADPGSTATE: Page buffer in illegal state for operation.";
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
index 7645e646f6..cad0e5d7a2 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
@@ -98,7 +98,7 @@ public:
* \returns Number of data-blocks encoded.
*/
virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0;
- virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
+ virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) = 0;
virtual std::string& str(std::string& str) const = 0;
virtual std::size_t data_size() const = 0;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
index 1368fd4be2..298ab608b1 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
@@ -176,7 +176,7 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch
}
bool
-txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
{
if (rec_offs == 0)
{
@@ -218,7 +218,7 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
- check_rec_tail();
+ check_rec_tail(rec_start);
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -266,7 +266,7 @@ txn_rec::rec_size() const
}
void
-txn_rec::check_rec_tail() const {
+txn_rec::check_rec_tail(const std::streampos rec_start) const {
Checksum checksum;
checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t));
if (_txn_hdr._xidsize > 0) {
@@ -276,7 +276,7 @@ txn_rec::check_rec_tail() const {
uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- oss << std::hex;
+ oss << std::endl << " Record offset: 0x" << std::hex << rec_start;
if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
oss << std::endl << " Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
index daca16d9d4..4552071595 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
@@ -49,7 +49,7 @@ public:
void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
const std::size_t xidlen);
uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
- bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+ bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
std::size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
@@ -57,7 +57,7 @@ public:
std::size_t xid_size() const;
std::size_t rec_size() const;
inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
- void check_rec_tail() const;
+ void check_rec_tail(const std::streampos rec_start) const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index c246837a8d..7c590713f5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -30,8 +30,6 @@
#include "qpid/linearstore/journal/LinearFileController.h"
#include "qpid/linearstore/journal/utils/file_hdr.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -49,7 +47,7 @@ wmgr::wmgr(jcntl* jc,
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
- _txn_pending_set()
+ _txn_pending_map()
{}
wmgr::wmgr(jcntl* jc,
@@ -67,7 +65,7 @@ wmgr::wmgr(jcntl* jc,
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
- _txn_pending_set()
+ _txn_pending_map()
{}
wmgr::~wmgr()
@@ -281,6 +279,7 @@ wmgr::dequeue(data_tok* dtokp,
_deq_busy = true;
}
//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
+ std::string xid((const char*)xid_ptr, xid_len);
bool done = false;
Checksum checksum;
while (!done)
@@ -292,9 +291,27 @@ wmgr::dequeue(data_tok* dtokp,
uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
- // Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0) {
- dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ uint64_t fid;
+ short eres = _emap.get_pfid(dtokp->dequeue_rid(), fid);
+ if (eres == enq_map::EMAP_OK) {
+ dtokp->set_fid(fid);
+ } else if (xid_len > 0) {
+ txn_data_list_t tdl = _tmap.get_tdata_list(xid);
+ bool found = false;
+ for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) {
+ if (i->rid_ == dtokp->dequeue_rid()) {
+ found = true;
+ dtokp->set_fid(i->pfid_);
+ break;
+ }
+ }
+ if (!found) {
+ throw jexception("rid found in neither emap nor tmap, transactional");
+ }
+ } else {
+ throw jexception("rid not found in emap, non-transactional");
+ }
}
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
@@ -325,7 +342,7 @@ wmgr::dequeue(data_tok* dtokp,
if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
+ oss << std::hex << "emap: rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
}
if (eres == enq_map::EMAP_LOCKED)
@@ -335,10 +352,6 @@ wmgr::dequeue(data_tok* dtokp,
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
-//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG
-//try {
- _lfc.decrEnqueuedRecordCount(fid);
-//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; }
}
done = true;
@@ -427,14 +440,16 @@ wmgr::abort(data_tok* dtokp,
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ fidl_t fidl;
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (!itr->enq_flag_)
_emap.unlock(itr->drid_); // ignore rid not found error
- if (itr->enq_flag_)
- _lfc.decrEnqueuedRecordCount(itr->pfid_);
+ if (itr->enq_flag_) {
+ fidl.push_back(itr->pfid_);
+ }
}
- std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
if (!res.second)
{
std::ostringstream oss;
@@ -526,6 +541,7 @@ wmgr::commit(data_tok* dtokp,
// Delete this txn from tmap, process records into emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ fidl_t fidl;
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->enq_flag_) // txn enqueue
@@ -547,20 +563,20 @@ wmgr::commit(data_tok* dtokp,
if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
+ oss << std::hex << "emap: rid=0x" << itr->drid_;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "commit");
}
if (eres == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
+ oss << std::hex << "rid=0x" << itr->drid_;
+ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "commit");
}
}
- _lfc.decrEnqueuedRecordCount(fid);
+ fidl.push_back(fid);
}
}
- std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
if (!res.second)
{
std::ostringstream oss;
@@ -695,7 +711,7 @@ wmgr::get_next_file()
{
_pg_cntr = 0;
//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG
- _lfc.pullEmptyFileFromEfp();
+ _lfc.getNextJournalFile();
}
int32_t
@@ -757,7 +773,7 @@ wmgr::get_events(timespec* const timeout,
data_tok* dtokp = pcbp->_pdtokl->at(k);
if (dtokp->decr_pg_cnt() == 0)
{
- std::set<std::string>::iterator it;
+ pending_txn_map_itr_t it;
switch (dtokp->wstate())
{
case data_tok::ENQ_SUBM:
@@ -770,6 +786,9 @@ wmgr::get_events(timespec* const timeout,
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::DEQ_SUBM:
+ if (!dtokp->has_xid()) {
+ _lfc.decrEnqueuedRecordCount(dtokp->fid());
+ }
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::DEQ);
@@ -781,31 +800,35 @@ wmgr::get_events(timespec* const timeout,
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::ABORTED);
- it = _txn_pending_set.find(dtokp->xid());
- if (it == _txn_pending_set.end())
+ it = _txn_pending_map.find(dtokp->xid());
+ if (it == _txn_pending_map.end())
{
std::ostringstream oss;
- oss << std::hex << "_txn_pending_set: abort xid=\"";
- oss << dtokp->xid() << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
- "get_events");
+ oss << std::hex << "_txn_pending_set: abort xid=\""
+ << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+ }
+ for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+ _lfc.decrEnqueuedRecordCount(*i);
}
- _txn_pending_set.erase(it);
+ _txn_pending_map.erase(it);
break;
case data_tok::COMMIT_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::COMMITTED);
- it = _txn_pending_set.find(dtokp->xid());
- if (it == _txn_pending_set.end())
+ it = _txn_pending_map.find(dtokp->xid());
+ if (it == _txn_pending_map.end())
{
std::ostringstream oss;
- oss << std::hex << "_txn_pending_set: commit xid=\"";
- oss << dtokp->xid() << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
- "get_events");
+ oss << std::hex << "_txn_pending_set: commit xid=\""
+ << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+ }
+ for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+ _lfc.decrEnqueuedRecordCount(*i);
}
- _txn_pending_set.erase(it);
+ _txn_pending_map.erase(it);
break;
case data_tok::ENQ_PART:
case data_tok::DEQ_PART:
@@ -858,8 +881,8 @@ wmgr::is_txn_synced(const std::string& xid)
if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED)
return false;
// Check for outstanding commit/aborts
- std::set<std::string>::iterator it = _txn_pending_set.find(xid);
- return it == _txn_pending_set.end();
+ pending_txn_map_itr_t it = _txn_pending_map.find(xid);
+ return it == _txn_pending_map.end();
}
void
@@ -871,7 +894,6 @@ wmgr::initialize(aio_callback* const cbp,
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
_page_cb_arr[0]._state = IN_USE;
- _ddtokl.clear();
_cached_offset_dblks = 0;
_enq_busy = false;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
index 8837e51e97..8eaa2364ad 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
@@ -22,9 +22,11 @@
#ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H
#define QPID_LINEARSTORE_JOURNAL_WMGR_H
+#include <deque>
+#include <map>
#include "qpid/linearstore/journal/enums.h"
#include "qpid/linearstore/journal/pmgr.h"
-#include <set>
+#include <vector>
namespace qpid {
namespace linearstore {
@@ -51,11 +53,15 @@ class LinearFileController;
class wmgr : public pmgr
{
private:
+ typedef std::vector<uint64_t> fidl_t;
+ typedef fidl_t::iterator fidl_itr_t;
+ typedef std::map<std::string, fidl_t> pending_txn_map_t;
+ typedef pending_txn_map_t::iterator pending_txn_map_itr_t;
+
LinearFileController& _lfc; ///< Linear File Controller ref
uint32_t _max_dtokpp; ///< Max data writes per page
uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit
uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
- std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
// TODO: Convert _enq_busy etc into a proper threadsafe lock
// TODO: Convert to enum? Are these encodes mutually exclusive?
@@ -70,7 +76,7 @@ private:
enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding
deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
- std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
+ pending_txn_map_t _txn_pending_map; ///< Set containing xids of pending commits/aborts
public:
wmgr(jcntl* jc,