diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-12-20 18:07:31 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-12-20 18:07:31 +0000 |
| commit | 03c59b8e046edd380485af253bf3962f3feff39d (patch) | |
| tree | 999f331622be3485e2f6a09e4c75108d519b20ed /cpp/src/qpid/linearstore/MessageStoreImpl.cpp | |
| parent | 44297255a2a408dbb9114395467da6384b9bc012 (diff) | |
| download | qpid-python-03c59b8e046edd380485af253bf3962f3feff39d.tar.gz | |
QPID-5422: DTX test failure, and some tidying up of code in JournalImpl.cpp/h
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1552772 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 236 |
1 files changed, 109 insertions, 127 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 593d2bd4b7..dd4198c6fd 100644 --- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -51,16 +51,6 @@ qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys:: qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s qpid::sys::Mutex TxnCtxt::globalSerialiser; -MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_, - const bool deq_flag_, - const bool commit_flag_, - const bool tpc_flag_) : - rid(rid_), - deq_flag(deq_flag_), - commit_flag(commit_flag_), - tpc_flag(tpc_flag_) -{} - MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) : defaultEfpPartitionNumber(0), defaultEfpFileSize_kib(0), @@ -601,7 +591,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) TxnCtxt txn; txn.begin(dbenv.get(), false); try { - //read all queues, calls recoversMessages + //read all queues, calls recoversMessages for each queue recoverQueues(txn, registry_, queues, prepared, messages); //recover exchange & bindings: @@ -621,6 +611,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } //recover transactions: + qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map(); for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { const PreparedTransaction pt = *i; if (mgmtObject.get() != 0) { @@ -629,20 +620,39 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } std::string xid = pt.xid; - - // Restore data token state in TxnCtxt - TplRecoverMapCitr citr = tplRecoverMap.find(xid); - if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap"); + qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); + uint16_t enqCnt = 0UL; + uint16_t deqCnt = 0UL; + uint16_t tpcCnt = 0UL; + uint16_t abortCnt = 0UL; + uint64_t rid = 0ULL; + for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { + if (j->enq_flag_) { + ++enqCnt; + rid = j->rid_; + } else { + ++deqCnt; + } + if (!j->commit_flag_) { + ++abortCnt; + } + if (j->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count"); + bool commitFlag = abortCnt == 0; // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call // was interrupted part way through committing/aborting the impacted queues. Complete this process. - bool incomplTplTxnFlag = citr->second.deq_flag; + bool incomplTplTxnFlag = deqCnt > 0; - if (citr->second.tpc_flag) { + if (tpcCnt > 0) { // Dtx (2PC) transaction TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence); std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc); - tpcc->recoverDtok(citr->second.rid, xid); + tpcc->recoverDtok(rid, xid); tpcc->prepare(tplStorePtr.get()); qpid::broker::RecoverableTransaction::shared_ptr dtx; @@ -661,12 +671,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } if (incomplTplTxnFlag) { - tpcc->complete(citr->second.commit_flag); + tpcc->complete(commitFlag); } } else { // Local (1PC) transaction boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence)); - opcc->recoverDtok(citr->second.rid, xid); + opcc->recoverDtok(rid, xid); opcc->prepare(tplStorePtr.get()); if (pt.enqueues.get()) { @@ -680,11 +690,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } } if (incomplTplTxnFlag) { - opcc->complete(citr->second.commit_flag); + opcc->complete(commitFlag); } else { - completed(*opcc.get(), citr->second.commit_flag); + completed(*opcc.get(), commitFlag); } } + } registry_.recoveryComplete(); } @@ -888,12 +899,13 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, bool externalFlag = false; DataTokenImpl dtok; dtok.set_wstate(DataTokenImpl::NONE); + qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map(); // Read the message from the Journal. try { unsigned aio_sleep_cnt = 0; while (read) { - qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok, false); switch (res) { @@ -932,16 +944,31 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } else { uint64_t rid = dtok.rid(); std::string xid(i->xid); - TplRecoverMapCitr citr = tplRecoverMap.find(xid); - if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap"); - - // deq present in prepared list: this xid is part of incomplete txn commit/abort - // or this is a 1PC txn that must be rolled forward - if (citr->second.deq_flag || !citr->second.tpc_flag) { + qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); + uint16_t enqCnt = 0UL; + uint16_t deqCnt = 0UL; + uint16_t tpcCnt = 0UL; + uint16_t abortCnt = 0UL; + for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { + if (j->enq_flag_) { + ++enqCnt; + } else { + ++deqCnt; + } + if (!j->commit_flag_) { + ++abortCnt; + } + if (j->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count"); + if (deqCnt > 0 || tpcCnt == 0) { if (jc->is_enqueued(rid, true)) { // Enqueue is non-tx, dequeue tx assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue - if (!citr->second.commit_flag) { + if (abortCnt > 0) { rcnt++; queue->recover(msg); // recover message in abort case only } @@ -955,7 +982,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, if (j->enq_flag_ && j->rid_ == rid) enq = true; else if (!j->enq_flag_ && j->drid_ == rid) deq = true; } - if (enq && !deq && citr->second.commit_flag) { + if (enq && !deq && abortCnt == 0) { rcnt++; queue->recover(msg); // recover txn message in commit case only } @@ -969,10 +996,14 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, dtok.reset(); dtok.set_wstate(DataTokenImpl::NONE); - if (xidbuff) + if (xidbuff) { ::free(xidbuff); - else if (dbuff) + xidbuff = NULL; + } + if (dbuff) { ::free(dbuff); + dbuff = NULL; + } aio_sleep_cnt = 0; break; } @@ -1033,77 +1064,6 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, return count; } -void MessageStoreImpl::readTplStore() -{ - tplRecoverMap.clear(); - qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map(); - DataTokenImpl dtok; - void* dbuff = NULL; size_t dbuffSize = 0; - void* xidbuff = NULL; size_t xidbuffSize = 0; - bool transientFlag = false; - bool externalFlag = false; - bool done = false; - try { - unsigned aio_sleep_cnt = 0; - while (!done) { - dtok.reset(); - dtok.set_wstate(DataTokenImpl::ENQ); - qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); - switch (res) { - case qpid::linearstore::journal::RHM_IORES_SUCCESS: { - // Every TPL record contains both data and an XID - assert(dbuffSize>0); - assert(xidbuffSize>0); - std::string xid(static_cast<const char*>(xidbuff), xidbuffSize); - bool is2PC = *(static_cast<char*>(dbuff)) != 0; - - // Check transaction details; add to recover map - qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found - if (!txnList.empty()) { // xid found in tmap - unsigned enqCnt = 0; - unsigned deqCnt = 0; - uint64_t rid = 0; - - // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists. - // Note: will apply to both 1PC and 2PC transactions. - bool commitFlag = true; - - for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->enq_flag_) { - rid = j->rid_; - enqCnt++; - } else { - commitFlag = j->commit_flag_; - deqCnt++; - } - } - assert(enqCnt == 1); - assert(deqCnt <= 1); - tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC))); - } - - ::free(xidbuff); - aio_sleep_cnt = 0; - break; - } - case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT: - if (++aio_sleep_cnt > MAX_AIO_SLEEPS) - THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()"); - ::usleep(AIO_SLEEP_TIME_US); - break; - case qpid::linearstore::journal::RHM_IORES_EMPTY: - done = true; - break; // done with all messages. (add call in jrnl to test that _emap is empty.) - default: - std::ostringstream oss; - oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res); - THROW_STORE_EXCEPTION(oss.str()); - } // switch - } - } catch (const qpid::linearstore::journal::jexception& e) { - THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); - } -} void MessageStoreImpl::recoverTplStore() { @@ -1114,11 +1074,7 @@ void MessageStoreImpl::recoverTplStore() highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit highestRid = thisHighestRid; - - // Load tplRecoverMap by reading the TPL store - readTplStore(); - - tplStorePtr->recover_complete(); // start journal. + tplStorePtr->recover_complete(); // start TPL } } @@ -1126,28 +1082,49 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) { if (!tplStorePtr->is_ready()) recoverTplStore(); - - // Abort unprepared xids and populate the locked maps - for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) { + std::vector<std::string> xidList; + tplStorePtr->get_txn_map().xid_list(xidList); + for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) { LockedMappings::shared_ptr enq_ptr; enq_ptr.reset(new LockedMappings); LockedMappings::shared_ptr deq_ptr; deq_ptr.reset(new LockedMappings); - txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr)); + txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr)); } } void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) { - if (tplStorePtr->is_ready()) { - readTplStore(); - } else { + if (!tplStorePtr->is_ready()) { recoverTplStore(); } - for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) { - // Discard all txns that are to be rolled forward/back and 1PC transactions - if (!i->second.deq_flag && i->second.tpc_flag) - xids.insert(i->first); + std::vector<std::string> xidList; + tplStorePtr->get_txn_map().xid_list(xidList); + for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) { + qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i); + uint16_t enqCnt = 0UL; + uint16_t deqCnt = 0UL; + uint16_t tpcCnt = 0UL; + uint16_t abortCnt = 0UL; + for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { + if (j->enq_flag_) { + ++enqCnt; + } else { + ++deqCnt; + } + if (!j->commit_flag_) { + ++abortCnt; + } + if (j->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0) { + if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count"); + if (enqCnt - deqCnt > 0) { + xids.insert(*i); + } + } } } @@ -1186,7 +1163,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore()); if (jc) { // TODO: check if this result should be used... - /*mrg::journal::iores res =*/ jc->flush(); + /*mrg::journal::iores res =*/ jc->flush(false); } } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); @@ -1258,7 +1235,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_, if (txn_->getXid().empty()) { jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent()); } else { - jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent()); + jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), txn_->isTPC(), !message_->isPersistent()); } } else { THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL.")); @@ -1309,9 +1286,10 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, ddtokp->set_rid(messageIdSequence.next()); ddtokp->set_dequeue_rid(msg_->getPersistenceId()); ddtokp->set_wstate(DataTokenImpl::ENQ); + TxnCtxt* txn = 0; std::string tid; if (ctxt_) { - TxnCtxt* txn = check(ctxt_); + txn = check(ctxt_); tid = txn->getXid(); } // Manually increase the ref count, as raw pointers are used beyond this point @@ -1319,9 +1297,9 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, try { JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore()); if (tid.empty()) { - jc->dequeue_data_record(ddtokp.get()); + jc->dequeue_data_record(ddtokp.get(), false); } else { - jc->dequeue_txn_data_record(ddtokp.get(), tid); + jc->dequeue_txn_data_record(ddtokp.get(), tid, txn?txn->isTPC():false, false); } } catch (const qpid::linearstore::journal::jexception& e) { ddtokp->release(); @@ -1341,7 +1319,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_, DataTokenImpl* dtokp = txn_.getDtok(); dtokp->set_dequeue_rid(dtokp->rid()); dtokp->set_rid(messageIdSequence.next()); - tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_); + tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), txn_.isTPC(), commit_); } txn_.complete(commit_); if (mgmtObject.get() != 0) { @@ -1376,12 +1354,16 @@ void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt_) { checkInit(); TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_); +//std::string xid=txn->getXid(); std::cout << "*** MessageStoreImpl::prepare() xid=" << std::hex; +//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl; if(!txn) throw qpid::broker::InvalidTransactionContextException(); localPrepare(txn); } void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_) { +//std::string xid=ctxt_->getXid(); std::cout << "*** MessageStoreImpl::localPrepare() xid=" << std::hex; +//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl; try { chkTplStoreInit(); // Late initialize (if needed) @@ -1394,7 +1376,7 @@ void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_) dtokp->set_external_rid(true); dtokp->set_rid(messageIdSequence.next()); char tpcFlag = static_cast<char>(ctxt_->isTPC()); - tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false); + tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), tpcFlag != 0, false); ctxt_->prepare(tplStorePtr.get()); // make sure all the data is written to disk before returning ctxt_->sync(); |
