summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-12-20 18:07:31 +0000
committerKim van der Riet <kpvdr@apache.org>2013-12-20 18:07:31 +0000
commit03c59b8e046edd380485af253bf3962f3feff39d (patch)
tree999f331622be3485e2f6a09e4c75108d519b20ed /cpp/src/qpid/linearstore/MessageStoreImpl.cpp
parent44297255a2a408dbb9114395467da6384b9bc012 (diff)
downloadqpid-python-03c59b8e046edd380485af253bf3962f3feff39d.tar.gz
QPID-5422: DTX test failure, and some tidying up of code in JournalImpl.cpp/h
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1552772 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
-rw-r--r--cpp/src/qpid/linearstore/MessageStoreImpl.cpp236
1 files changed, 109 insertions, 127 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 593d2bd4b7..dd4198c6fd 100644
--- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -51,16 +51,6 @@ qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::
qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_,
- const bool deq_flag_,
- const bool commit_flag_,
- const bool tpc_flag_) :
- rid(rid_),
- deq_flag(deq_flag_),
- commit_flag(commit_flag_),
- tpc_flag(tpc_flag_)
-{}
-
MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
defaultEfpPartitionNumber(0),
defaultEfpFileSize_kib(0),
@@ -601,7 +591,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
TxnCtxt txn;
txn.begin(dbenv.get(), false);
try {
- //read all queues, calls recoversMessages
+ //read all queues, calls recoversMessages for each queue
recoverQueues(txn, registry_, queues, prepared, messages);
//recover exchange & bindings:
@@ -621,6 +611,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
//recover transactions:
+ qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
const PreparedTransaction pt = *i;
if (mgmtObject.get() != 0) {
@@ -629,20 +620,39 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
std::string xid = pt.xid;
-
- // Restore data token state in TxnCtxt
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
+ qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+ uint16_t enqCnt = 0UL;
+ uint16_t deqCnt = 0UL;
+ uint16_t tpcCnt = 0UL;
+ uint16_t abortCnt = 0UL;
+ uint64_t rid = 0ULL;
+ for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
+ if (j->enq_flag_) {
+ ++enqCnt;
+ rid = j->rid_;
+ } else {
+ ++deqCnt;
+ }
+ if (!j->commit_flag_) {
+ ++abortCnt;
+ }
+ if (j->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count");
+ bool commitFlag = abortCnt == 0;
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTplTxnFlag = citr->second.deq_flag;
+ bool incomplTplTxnFlag = deqCnt > 0;
- if (citr->second.tpc_flag) {
+ if (tpcCnt > 0) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
- tpcc->recoverDtok(citr->second.rid, xid);
+ tpcc->recoverDtok(rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
@@ -661,12 +671,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
if (incomplTplTxnFlag) {
- tpcc->complete(citr->second.commit_flag);
+ tpcc->complete(commitFlag);
}
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
- opcc->recoverDtok(citr->second.rid, xid);
+ opcc->recoverDtok(rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
@@ -680,11 +690,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
}
if (incomplTplTxnFlag) {
- opcc->complete(citr->second.commit_flag);
+ opcc->complete(commitFlag);
} else {
- completed(*opcc.get(), citr->second.commit_flag);
+ completed(*opcc.get(), commitFlag);
}
}
+
}
registry_.recoveryComplete();
}
@@ -888,12 +899,13 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
bool externalFlag = false;
DataTokenImpl dtok;
dtok.set_wstate(DataTokenImpl::NONE);
+ qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
- qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+ qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok, false);
switch (res)
{
@@ -932,16 +944,31 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
} else {
uint64_t rid = dtok.rid();
std::string xid(i->xid);
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-
- // deq present in prepared list: this xid is part of incomplete txn commit/abort
- // or this is a 1PC txn that must be rolled forward
- if (citr->second.deq_flag || !citr->second.tpc_flag) {
+ qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+ uint16_t enqCnt = 0UL;
+ uint16_t deqCnt = 0UL;
+ uint16_t tpcCnt = 0UL;
+ uint16_t abortCnt = 0UL;
+ for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
+ if (j->enq_flag_) {
+ ++enqCnt;
+ } else {
+ ++deqCnt;
+ }
+ if (!j->commit_flag_) {
+ ++abortCnt;
+ }
+ if (j->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count");
+ if (deqCnt > 0 || tpcCnt == 0) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
- if (!citr->second.commit_flag) {
+ if (abortCnt > 0) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
@@ -955,7 +982,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
if (j->enq_flag_ && j->rid_ == rid) enq = true;
else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
}
- if (enq && !deq && citr->second.commit_flag) {
+ if (enq && !deq && abortCnt == 0) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
@@ -969,10 +996,14 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
dtok.reset();
dtok.set_wstate(DataTokenImpl::NONE);
- if (xidbuff)
+ if (xidbuff) {
::free(xidbuff);
- else if (dbuff)
+ xidbuff = NULL;
+ }
+ if (dbuff) {
::free(dbuff);
+ dbuff = NULL;
+ }
aio_sleep_cnt = 0;
break;
}
@@ -1033,77 +1064,6 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_,
return count;
}
-void MessageStoreImpl::readTplStore()
-{
- tplRecoverMap.clear();
- qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map();
- DataTokenImpl dtok;
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
- bool transientFlag = false;
- bool externalFlag = false;
- bool done = false;
- try {
- unsigned aio_sleep_cnt = 0;
- while (!done) {
- dtok.reset();
- dtok.set_wstate(DataTokenImpl::ENQ);
- qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
- switch (res) {
- case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
- // Every TPL record contains both data and an XID
- assert(dbuffSize>0);
- assert(xidbuffSize>0);
- std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
- bool is2PC = *(static_cast<char*>(dbuff)) != 0;
-
- // Check transaction details; add to recover map
- qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
- if (!txnList.empty()) { // xid found in tmap
- unsigned enqCnt = 0;
- unsigned deqCnt = 0;
- uint64_t rid = 0;
-
- // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
- // Note: will apply to both 1PC and 2PC transactions.
- bool commitFlag = true;
-
- for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->enq_flag_) {
- rid = j->rid_;
- enqCnt++;
- } else {
- commitFlag = j->commit_flag_;
- deqCnt++;
- }
- }
- assert(enqCnt == 1);
- assert(deqCnt <= 1);
- tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
- }
-
- ::free(xidbuff);
- aio_sleep_cnt = 0;
- break;
- }
- case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()");
- ::usleep(AIO_SLEEP_TIME_US);
- break;
- case qpid::linearstore::journal::RHM_IORES_EMPTY:
- done = true;
- break; // done with all messages. (add call in jrnl to test that _emap is empty.)
- default:
- std::ostringstream oss;
- oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res);
- THROW_STORE_EXCEPTION(oss.str());
- } // switch
- }
- } catch (const qpid::linearstore::journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
- }
-}
void MessageStoreImpl::recoverTplStore()
{
@@ -1114,11 +1074,7 @@ void MessageStoreImpl::recoverTplStore()
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
highestRid = thisHighestRid;
-
- // Load tplRecoverMap by reading the TPL store
- readTplStore();
-
- tplStorePtr->recover_complete(); // start journal.
+ tplStorePtr->recover_complete(); // start TPL
}
}
@@ -1126,28 +1082,49 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
{
if (!tplStorePtr->is_ready())
recoverTplStore();
-
- // Abort unprepared xids and populate the locked maps
- for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
+ std::vector<std::string> xidList;
+ tplStorePtr->get_txn_map().xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
}
}
void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
- if (tplStorePtr->is_ready()) {
- readTplStore();
- } else {
+ if (!tplStorePtr->is_ready()) {
recoverTplStore();
}
- for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- // Discard all txns that are to be rolled forward/back and 1PC transactions
- if (!i->second.deq_flag && i->second.tpc_flag)
- xids.insert(i->first);
+ std::vector<std::string> xidList;
+ tplStorePtr->get_txn_map().xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
+ qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
+ uint16_t enqCnt = 0UL;
+ uint16_t deqCnt = 0UL;
+ uint16_t tpcCnt = 0UL;
+ uint16_t abortCnt = 0UL;
+ for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
+ if (j->enq_flag_) {
+ ++enqCnt;
+ } else {
+ ++deqCnt;
+ }
+ if (!j->commit_flag_) {
+ ++abortCnt;
+ }
+ if (j->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0) {
+ if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count");
+ if (enqCnt - deqCnt > 0) {
+ xids.insert(*i);
+ }
+ }
}
}
@@ -1186,7 +1163,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (jc) {
// TODO: check if this result should be used...
- /*mrg::journal::iores res =*/ jc->flush();
+ /*mrg::journal::iores res =*/ jc->flush(false);
}
} catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
@@ -1258,7 +1235,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_,
if (txn_->getXid().empty()) {
jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent());
} else {
- jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent());
+ jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), txn_->isTPC(), !message_->isPersistent());
}
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
@@ -1309,9 +1286,10 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg_->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
+ TxnCtxt* txn = 0;
std::string tid;
if (ctxt_) {
- TxnCtxt* txn = check(ctxt_);
+ txn = check(ctxt_);
tid = txn->getXid();
}
// Manually increase the ref count, as raw pointers are used beyond this point
@@ -1319,9 +1297,9 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (tid.empty()) {
- jc->dequeue_data_record(ddtokp.get());
+ jc->dequeue_data_record(ddtokp.get(), false);
} else {
- jc->dequeue_txn_data_record(ddtokp.get(), tid);
+ jc->dequeue_txn_data_record(ddtokp.get(), tid, txn?txn->isTPC():false, false);
}
} catch (const qpid::linearstore::journal::jexception& e) {
ddtokp->release();
@@ -1341,7 +1319,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_,
DataTokenImpl* dtokp = txn_.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_);
+ tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), txn_.isTPC(), commit_);
}
txn_.complete(commit_);
if (mgmtObject.get() != 0) {
@@ -1376,12 +1354,16 @@ void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt_)
{
checkInit();
TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_);
+//std::string xid=txn->getXid(); std::cout << "*** MessageStoreImpl::prepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
if(!txn) throw qpid::broker::InvalidTransactionContextException();
localPrepare(txn);
}
void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_)
{
+//std::string xid=ctxt_->getXid(); std::cout << "*** MessageStoreImpl::localPrepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
try {
chkTplStoreInit(); // Late initialize (if needed)
@@ -1394,7 +1376,7 @@ void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_)
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
char tpcFlag = static_cast<char>(ctxt_->isTPC());
- tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false);
+ tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), tpcFlag != 0, false);
ctxt_->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt_->sync();