diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2014-01-09 17:26:10 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2014-01-09 17:26:10 +0000 |
| commit | 27a33cae759d317658a46fa5afe22adaa4aed594 (patch) | |
| tree | e5bdc059a322b4f73a9c171e82fefe14dd7e0497 /cpp/src/qpid/linearstore/MessageStoreImpl.cpp | |
| parent | 01eb49eba283d02dd7291eff5b276e2d3a638521 (diff) | |
| download | qpid-python-27a33cae759d317658a46fa5afe22adaa4aed594.tar.gz | |
QPID-5460: [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1556892 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 115 |
1 files changed, 40 insertions, 75 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index ca71d1b780..3df194d479 100644 --- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -341,7 +341,7 @@ void MessageStoreImpl::chkTplStoreInit() qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { qpid::linearstore::journal::jdir::create_dir(getTplBaseDir()); - tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); + tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } } @@ -584,6 +584,13 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) txn_list prepared; recoverLockedMappings(prepared); + std::ostringstream oss; + oss << "Recovered transaction prepared list:"; + for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { + oss << std::endl << " " << str2hexnum(i->xid); + } + QLS_LOG(debug, oss.str()); + queue_index queues;//id->queue exchange_index exchanges;//id->exchange message_index messages;//id->message @@ -620,39 +627,20 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } std::string xid = pt.xid; - qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid); if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); - uint16_t enqCnt = 0UL; - uint16_t deqCnt = 0UL; - uint16_t tpcCnt = 0UL; - uint16_t abortCnt = 0UL; - uint64_t rid = 0ULL; - for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { - if (j->enq_flag_) { - ++enqCnt; - rid = j->rid_; - } else { - ++deqCnt; - } - if (!j->commit_flag_) { - ++abortCnt; - } - if (j->tpc_flag_) { - ++tpcCnt; - } - } - if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recover(): Inconsistent TPL 2PC count"); - bool commitFlag = abortCnt == 0; + qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl); + bool commitFlag = txn_op_stats.abortCnt == 0; // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call // was interrupted part way through committing/aborting the impacted queues. Complete this process. - bool incomplTplTxnFlag = deqCnt > 0; + bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0; - if (tpcCnt > 0) { + if (txn_op_stats.tpcCnt > 0) { // Dtx (2PC) transaction TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence); std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc); - tpcc->recoverDtok(rid, xid); + tpcc->recoverDtok(txn_op_stats.rid, xid); tpcc->prepare(tplStorePtr.get()); qpid::broker::RecoverableTransaction::shared_ptr dtx; @@ -676,7 +664,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } else { // Local (1PC) transaction boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence)); - opcc->recoverDtok(rid, xid); + opcc->recoverDtok(txn_op_stats.rid, xid); opcc->prepare(tplStorePtr.get()); if (pt.enqueues.get()) { @@ -919,7 +907,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl } else { headerSize = qpid::framing::Buffer(data, preambleLength).getLong(); - qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ???? + qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); msg = recovery.recoverMessage(headerBuff); } msg->setPersistenceId(dtok.rid()); @@ -944,45 +932,30 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } else { uint64_t rid = dtok.rid(); std::string xid(i->xid); - qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid); if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); - uint16_t enqCnt = 0UL; - uint16_t deqCnt = 0UL; - uint16_t tpcCnt = 0UL; - uint16_t abortCnt = 0UL; - for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { - if (j->enq_flag_) { - ++enqCnt; - } else { - ++deqCnt; - } - if (!j->commit_flag_) { - ++abortCnt; - } - if (j->tpc_flag_) { - ++tpcCnt; - } - } - if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::recoverMessages(): Inconsistent TPL 2PC count"); - if (deqCnt > 0 || tpcCnt == 0) { + qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl); + if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) { if (jc->is_enqueued(rid, true)) { // Enqueue is non-tx, dequeue tx assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue - if (abortCnt > 0) { + if (txn_op_stats.abortCnt > 0) { rcnt++; queue->recover(msg); // recover message in abort case only } } else { // Enqueue and/or dequeue tx qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map(); - qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found bool enq = false; bool deq = false; - for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->enq_flag_ && j->rid_ == rid) enq = true; - else if (!j->enq_flag_ && j->drid_ == rid) deq = true; + for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) { + if (j->enq_flag_ && j->rid_ == rid) + enq = true; + else if (!j->enq_flag_ && j->drid_ == rid) + deq = true; } - if (enq && !deq && abortCnt == 0) { + if (enq && !deq && txn_op_stats.abortCnt == 0) { rcnt++; queue->recover(msg); // recover txn message in commit case only } @@ -1101,27 +1074,10 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) std::vector<std::string> xidList; tplStorePtr->get_txn_map().xid_list(xidList); for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) { - qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i); - uint16_t enqCnt = 0UL; - uint16_t deqCnt = 0UL; - uint16_t tpcCnt = 0UL; - uint16_t abortCnt = 0UL; - for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { - if (j->enq_flag_) { - ++enqCnt; - } else { - ++deqCnt; - } - if (!j->commit_flag_) { - ++abortCnt; - } - if (j->tpc_flag_) { - ++tpcCnt; - } - } - if (tpcCnt > 0) { - if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("MessageStoreImpl::collectPreparedXids: Inconsistent TPL 2PC count"); - if (enqCnt - deqCnt > 0) { + qpid::linearstore::journal::txn_data_list_t tdl = tplStorePtr->get_txn_map().get_tdata_list(*i); + qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl); + if (txn_op_stats.tpcCnt > 0) { + if (txn_op_stats.enqCnt - txn_op_stats.deqCnt > 0) { xids.insert(*i); } } @@ -1554,6 +1510,15 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) { journalList.erase(j_.id()); } +std::string MessageStoreImpl::str2hexnum(const std::string& str) { + std::ostringstream oss; + oss << "(" << str.size() << ")0x" << std::hex; + for (unsigned i=str.size(); i>0; --i) { + oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1]; + } + return oss.str(); +} + MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : qpid::Options(name_), truncateFlag(defTruncateFlag), |
