summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
-rw-r--r--cpp/src/qpid/linearstore/MessageStoreImpl.cpp115
1 files changed, 40 insertions, 75 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index ca71d1b780..3df194d479 100644
--- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -341,7 +341,7 @@ void MessageStoreImpl::chkTplStoreInit()
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
+ tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -584,6 +584,13 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
txn_list prepared;
recoverLockedMappings(prepared);
+ std::ostringstream oss;
+ oss << "Recovered transaction prepared list:";
+ for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ oss << std::endl << " " << str2hexnum(i->xid);
+ }
+ QLS_LOG(debug, oss.str());
+
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
message_index messages;//id->message
@@ -620,39 +627,20 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
std::string xid = pt.xid;
- qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
- uint16_t enqCnt = 0UL;
- uint16_t deqCnt = 0UL;
- uint16_t tpcCnt = 0UL;
- uint16_t abortCnt = 0UL;
- uint64_t rid = 0ULL;
- for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
- if (j->enq_flag_) {
- ++enqCnt;
- rid = j->rid_;
- } else {
- ++deqCnt;
- }
- if (!j->commit_flag_) {
- ++abortCnt;
- }
- if (j->tpc_flag_) {
- ++tpcCnt;
- }
- }
- if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recover(): Inconsistent TPL 2PC count");
- bool commitFlag = abortCnt == 0;
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ bool commitFlag = txn_op_stats.abortCnt == 0;
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTplTxnFlag = deqCnt > 0;
+ bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0;
- if (tpcCnt > 0) {
+ if (txn_op_stats.tpcCnt > 0) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
- tpcc->recoverDtok(rid, xid);
+ tpcc->recoverDtok(txn_op_stats.rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
@@ -676,7 +664,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
- opcc->recoverDtok(rid, xid);
+ opcc->recoverDtok(txn_op_stats.rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
@@ -919,7 +907,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
} else {
headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
- qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize);
msg = recovery.recoverMessage(headerBuff);
}
msg->setPersistenceId(dtok.rid());
@@ -944,45 +932,30 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
} else {
uint64_t rid = dtok.rid();
std::string xid(i->xid);
- qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
- uint16_t enqCnt = 0UL;
- uint16_t deqCnt = 0UL;
- uint16_t tpcCnt = 0UL;
- uint16_t abortCnt = 0UL;
- for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
- if (j->enq_flag_) {
- ++enqCnt;
- } else {
- ++deqCnt;
- }
- if (!j->commit_flag_) {
- ++abortCnt;
- }
- if (j->tpc_flag_) {
- ++tpcCnt;
- }
- }
- if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recoverMessages(): Inconsistent TPL 2PC count");
- if (deqCnt > 0 || tpcCnt == 0) {
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
- if (abortCnt > 0) {
+ if (txn_op_stats.abortCnt > 0) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
} else {
// Enqueue and/or dequeue tx
qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
- qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
- for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->enq_flag_ && j->rid_ == rid) enq = true;
- else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
+ for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->enq_flag_ && j->rid_ == rid)
+ enq = true;
+ else if (!j->enq_flag_ && j->drid_ == rid)
+ deq = true;
}
- if (enq && !deq && abortCnt == 0) {
+ if (enq && !deq && txn_op_stats.abortCnt == 0) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
@@ -1101,27 +1074,10 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
std::vector<std::string> xidList;
tplStorePtr->get_txn_map().xid_list(xidList);
for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
- qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
- uint16_t enqCnt = 0UL;
- uint16_t deqCnt = 0UL;
- uint16_t tpcCnt = 0UL;
- uint16_t abortCnt = 0UL;
- for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
- if (j->enq_flag_) {
- ++enqCnt;
- } else {
- ++deqCnt;
- }
- if (!j->commit_flag_) {
- ++abortCnt;
- }
- if (j->tpc_flag_) {
- ++tpcCnt;
- }
- }
- if (tpcCnt > 0) {
- if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::collectPreparedXids: Inconsistent TPL 2PC count");
- if (enqCnt - deqCnt > 0) {
+ qpid::linearstore::journal::txn_data_list_t tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ if (txn_op_stats.tpcCnt > 0) {
+ if (txn_op_stats.enqCnt - txn_op_stats.deqCnt > 0) {
xids.insert(*i);
}
}
@@ -1554,6 +1510,15 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) {
journalList.erase(j_.id());
}
+std::string MessageStoreImpl::str2hexnum(const std::string& str) {
+ std::ostringstream oss;
+ oss << "(" << str.size() << ")0x" << std::hex;
+ for (unsigned i=str.size(); i>0; --i) {
+ oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+ }
+ return oss.str();
+}
+
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
qpid::Options(name_),
truncateFlag(defTruncateFlag),