summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Moravec <pmoravec@apache.org>2015-05-21 12:48:30 +0000
committerPavel Moravec <pmoravec@apache.org>2015-05-21 12:48:30 +0000
commitc75685031b2d46bb522e55da24ebaf37bc3b369c (patch)
tree33ca1cc3f493c4fb5a63126556a96652318ab372
parent0f3ce19450aa3bc96d42d0a58fa74813b4b9f428 (diff)
downloadqpid-python-c75685031b2d46bb522e55da24ebaf37bc3b369c.tar.gz
QPID-6551: [C++ broker]: linearstore raising JERR_LFCR_SEQNUMNOTFOUND after sending many DTX transactions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1680861 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp16
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h14
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp12
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_map.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp8
6 files changed, 29 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index e6e07dc8e2..08d565ca2e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -114,15 +114,15 @@ void LinearFileController::purgeEmptyFilesToEfp() {
}
}
-uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+uint32_t LinearFileController::getEnqueuedRecordCount(const uint64_t fileSeqNumber) {
return find(fileSeqNumber)->getEnqueuedRecordCount();
}
-uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+uint32_t LinearFileController::incrEnqueuedRecordCount(const uint64_t fileSeqNumber) {
return find(fileSeqNumber)->incrEnqueuedRecordCount();
}
-uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+uint32_t LinearFileController::decrEnqueuedRecordCount(const uint64_t fileSeqNumber) {
uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
// TODO: Re-evaluate after testing and profiling
@@ -136,11 +136,11 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file
return r;
}
-uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
+uint32_t LinearFileController::addWriteCompletedDblkCount(const uint64_t fileSeqNumber, const uint32_t a) {
return find(fileSeqNumber)->addCompletedDblkCount(a);
}
-uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
+uint16_t LinearFileController::decrOutstandingAioOperationCount(const uint64_t fileSeqNumber) {
return find(fileSeqNumber)->decrOutstandingAioOperationCount();
}
@@ -199,9 +199,9 @@ const std::string LinearFileController::status(const uint8_t indentDepth) const
void LinearFileController::addJournalFile(const std::string& fileName,
const efpIdentity_t& efpIdentity,
- const uint64_t fileNumber,
+ const uint64_t fileSeqNumber,
const uint32_t completedDblkCount) {
- JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id());
+ JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileSeqNumber, jcntlRef_.id());
addJournalFile(jfp, completedDblkCount, true);
}
@@ -215,7 +215,7 @@ bool LinearFileController::checkCurrentJournalFileValid() const {
return currentJournalFilePtr_ != 0;
}
-JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
+JournalFile* LinearFileController::find(const uint64_t fileSeqNumber) {
if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
return currentJournalFilePtr_;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
index 05f08144b9..3cdfb72a37 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
@@ -74,12 +74,12 @@ public:
void purgeEmptyFilesToEfp();
// Functions for manipulating counts of non-current JournalFile instances in journalFileList_
- uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
- uint32_t incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
- uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
- uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber,
+ uint32_t getEnqueuedRecordCount(const uint64_t fileSeqNumber);
+ uint32_t incrEnqueuedRecordCount(const uint64_t fileSeqNumber);
+ uint32_t decrEnqueuedRecordCount(const uint64_t fileSeqNumber);
+ uint32_t addWriteCompletedDblkCount(const uint64_t fileSeqNumber,
const uint32_t a);
- uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
+ uint16_t decrOutstandingAioOperationCount(const uint64_t fileSeqNumber);
// Pass-through functions for current JournalFile class
void asyncFileHeaderWrite(io_context_t ioContextPtr,
@@ -101,11 +101,11 @@ public:
protected:
void addJournalFile(const std::string& fileName,
const efpIdentity_t& efpIdentity,
- const uint64_t fileNumber,
+ const uint64_t fileSeqNumber,
const uint32_t completedDblkCount);
void assertCurrentJournalFileValid(const char* const functionName) const;
bool checkCurrentJournalFileValid() const;
- JournalFile* find(const efpFileCount_t fileSeqNumber);
+ JournalFile* find(const uint64_t fileSeqNumber);
uint64_t getNextFileSeqNum();
void pullEmptyFileFromEfp();
};
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 73a16f01b7..254566e824 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -148,7 +148,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
// Unlock any affected enqueues in emap
for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
if (i->enq_flag_) { // enq op - decrement enqueue count
- fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
+ fileNumberMap_[i->fid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
// enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
@@ -738,7 +738,7 @@ bool RecoveryManager::getNextRecordHeader()
txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) {
- fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
+ fileNumberMap_[itr->fid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else {
enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
@@ -765,11 +765,11 @@ bool RecoveryManager::getNextRecordHeader()
txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) { // txn enqueue
-//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
- if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
+//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->fid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
+ if (enqueueMapRef_.insert_pfid(itr->rid_, itr->fid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
- oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
+ oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
} else { // txn dequeue
@@ -854,7 +854,7 @@ void RecoveryManager::prepareRecordList() {
qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j);
for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) {
if (k->enq_flag_) {
- recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
+ recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->fid_, k->foffs_, true));
}
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
index 3ada3d2504..8336d36b80 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
@@ -36,14 +36,14 @@ int16_t txn_map::TMAP_SYNCED = 1;
txn_data_t::txn_data_t(const uint64_t rid,
const uint64_t drid,
- const uint16_t pfid,
+ const uint64_t fid,
const uint64_t foffs,
const bool enq_flag,
const bool tpc_flag,
const bool commit_flag):
rid_(rid),
drid_(drid),
- pfid_(pfid),
+ fid_(fid),
foffs_(foffs),
enq_flag_(enq_flag),
tpc_flag_(tpc_flag),
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
index 996d54bdac..e79c0522d8 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
@@ -39,7 +39,7 @@ namespace journal {
{
uint64_t rid_; ///< Record id for this operation
uint64_t drid_; ///< Dequeue record id for this operation
- uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit
+ uint64_t fid_; ///< File seq number, to be used when transferring to emap on commit
uint64_t foffs_; ///< Offset in file for this record
bool enq_flag_; ///< If true, enq op, otherwise deq op
bool tpc_flag_; ///< 2PC transaction if true
@@ -47,7 +47,7 @@ namespace journal {
bool aio_compl_; ///< Initially false, set to true when record AIO returns
txn_data_t(const uint64_t rid,
const uint64_t drid,
- const uint16_t pfid,
+ const uint64_t fid,
const uint64_t foffs,
const bool enq_flag,
const bool tpc_flag,
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index 7dc8f1d416..1ff18da663 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -307,7 +307,7 @@ wmgr::dequeue(data_tok* dtokp,
for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) {
if (i->rid_ == dtokp->dequeue_rid()) {
found = true;
- dtokp->set_fid(i->pfid_);
+ dtokp->set_fid(i->fid_);
break;
}
}
@@ -451,7 +451,7 @@ wmgr::abort(data_tok* dtokp,
if (!itr->enq_flag_)
_emap.unlock(itr->drid_); // ignore rid not found error
if (itr->enq_flag_) {
- fidl.push_back(itr->pfid_);
+ fidl.push_back(itr->fid_);
}
}
std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
@@ -551,11 +551,11 @@ wmgr::commit(data_tok* dtokp,
{
if (itr->enq_flag_) // txn enqueue
{
- if (_emap.insert_pfid(itr->rid_, itr->pfid_, 0) < enq_map::EMAP_OK) // fail
+ if (_emap.insert_pfid(itr->rid_, itr->fid_, 0) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
- oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
+ oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
}
}