diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-12-20 18:07:31 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-12-20 18:07:31 +0000 |
commit | 5d28e44df2fd9d627d52fda08faeba72815c2ed4 (patch) | |
tree | 1868335dc0cd71d81614e50fe1fa0c5645e6af51 /qpid/cpp | |
parent | 829b44f73e0825f838099af4b683b1f744c0e2aa (diff) | |
download | qpid-python-5d28e44df2fd9d627d52fda08faeba72815c2ed4.tar.gz |
QPID-5422: DTX test failure, and some tidying up of code in JournalImpl.cpp/h
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1552772 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
18 files changed, 407 insertions, 303 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 97a9e3fb2a..deeefb31fe 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -59,7 +59,7 @@ Current bugs and performance issues: 5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071) 6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL 6. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL -7. BZ 1039949 - DTX test failure - missing XIDs +7. QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs Code tidy-up ------------ diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index 081fab190d..a71db3d384 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -30,23 +30,36 @@ namespace qpid { namespace linearstore { -InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): - qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {} - -void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); } +InactivityFireEvent::InactivityFireEvent(JournalImpl* p, + const ::qpid::sys::Duration timeout): + ::qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {} + +void InactivityFireEvent::fire() { + ::qpid::sys::Mutex::ScopedLock sl(_ife_lock); + if (_parent) { + _parent->flushFire(); + } +} -GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): - qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {} +GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, + const ::qpid::sys::Duration timeout): + ::qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) +{} -void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); } +void GetEventsFireEvent::fire() { + ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock); + if (_parent) { + _parent->getEventsFire(); + } +} -JournalImpl::JournalImpl(qpid::sys::Timer& timer_, +JournalImpl::JournalImpl(::qpid::sys::Timer& timer_, const std::string& journalId, const std::string& journalDirectory, JournalLogImpl& journalLogRef, - const qpid::sys::Duration getEventsTimeout, - const qpid::sys::Duration flushTimeout, - qpid::management::ManagementAgent* a, + const ::qpid::sys::Duration getEventsTimeout, + const ::qpid::sys::Duration flushTimeout, + ::qpid::management::ManagementAgent* a, DeleteCallback onDelete): jcntl(journalId, journalDirectory, journalLogRef), timer(timer_), @@ -76,7 +89,7 @@ JournalImpl::~JournalImpl() if (deleteCallback) deleteCallback(*this); if (_init_flag && !_stop_flag){ try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete! - catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); } + catch (const ::qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); } } getEventsFireEventsPtr->cancel(); inactivityFireEventPtr->cancel(); @@ -90,7 +103,7 @@ JournalImpl::~JournalImpl() } void -JournalImpl::initManagement(qpid::management::ManagementAgent* a) +JournalImpl::initManagement(::qpid::management::ManagementAgent* a) { _agent = a; if (_agent != 0) @@ -117,10 +130,10 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) void -JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_, +JournalImpl::initialize(::qpid::linearstore::journal::EmptyFilePool* efpp_, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::linearstore::journal::aio_callback* const cbp) + ::qpid::linearstore::journal::aio_callback* const cbp) { // efpp->createJournal(_jdir); // QLS_LOG2(notice, _jid, "Initialized"); @@ -152,10 +165,10 @@ JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_, } void -JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, +JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::linearstore::journal::aio_callback* const cbp, + ::qpid::linearstore::journal::aio_callback* const cbp, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, uint64_t& highest_rid, uint64_t queue_id) @@ -196,8 +209,8 @@ JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePool if (prep_tx_list_ptr) { for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { - qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found - for (qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { + ::qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found + for (::qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { if (tdl_itr->enq_flag_) { // enqueue op i->enqueues->add(queue_id, tdl_itr->rid_); } else { // dequeue op @@ -237,8 +250,11 @@ JournalImpl::recover_complete() void -JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient) +JournalImpl::enqueue_data_record(const void* const data_buff, + const size_t tot_data_len, + const size_t this_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const bool transient) { handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient)); @@ -250,8 +266,9 @@ JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_d } void -JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, - const bool transient) +JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const bool transient) { handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)); @@ -263,12 +280,17 @@ JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linears } void -JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient) +JournalImpl::enqueue_txn_data_record(const void* const data_buff, + const size_t tot_data_len, + const size_t this_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const std::string& xid, + const bool tpc_flag, + const bool transient) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; - handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient)); + handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, tpc_flag, transient)); if (_mgmtObject.get() != 0) { @@ -281,12 +303,15 @@ JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t t } void -JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, - const std::string& xid, const bool transient) +JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const std::string& xid, + const bool tpc_flag, + const bool transient) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; - handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient)); + handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, tpc_flag, transient)); if (_mgmtObject.get() != 0) { @@ -299,7 +324,8 @@ JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::lin } void -JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit) +JournalImpl::dequeue_data_record(::qpid::linearstore::journal::data_tok* const dtokp, + const bool txn_coml_commit) { handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit)); @@ -312,11 +338,14 @@ JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dto } void -JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit) +JournalImpl::dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp, + const std::string& xid, + const bool tpc_flag, + const bool txn_coml_commit) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; - handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit)); + handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, tpc_flag, txn_coml_commit)); if (_mgmtObject.get() != 0) { @@ -329,7 +358,8 @@ JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const } void -JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid) +JournalImpl::txn_abort(::qpid::linearstore::journal::data_tok* const dtokp, + const std::string& xid) { handleIoResult(jcntl::txn_abort(dtokp, xid)); @@ -341,7 +371,8 @@ JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const } void -JournalImpl::txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid) +JournalImpl::txn_commit(::qpid::linearstore::journal::data_tok* const dtokp, + const std::string& xid) { handleIoResult(jcntl::txn_commit(dtokp, xid)); @@ -366,12 +397,12 @@ JournalImpl::stop(bool block_till_aio_cmpl) } } -qpid::linearstore::journal::iores +::qpid::linearstore::journal::iores JournalImpl::flush(const bool block_till_aio_cmpl) { - const qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl); + const ::qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl); { - qpid::sys::Mutex::ScopedLock sl(_getf_lock); + ::qpid::sys::Mutex::ScopedLock sl(_getf_lock); if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); } } return res; @@ -380,7 +411,7 @@ JournalImpl::flush(const bool block_till_aio_cmpl) void JournalImpl::getEventsFire() { - qpid::sys::Mutex::ScopedLock sl(_getf_lock); + ::qpid::sys::Mutex::ScopedLock sl(_getf_lock); getEventsTimerSetFlag = false; if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); } if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); } @@ -394,7 +425,7 @@ JournalImpl::flushFire() flushTriggeredFlag = false; } else { if (!flushTriggeredFlag) { - flush(); + flush(false); flushTriggeredFlag = true; } } @@ -405,20 +436,20 @@ JournalImpl::flushFire() } void -JournalImpl::wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl) +JournalImpl::wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl) { - for (std::vector<qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++) + for (std::vector< ::qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++) { DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i); if (/*!is_stopped() &&*/ dtokp->getSourceMessage()) { switch (dtokp->wstate()) { - case qpid::linearstore::journal::data_tok::ENQ: + case ::qpid::linearstore::journal::data_tok::ENQ: //std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG dtokp->getSourceMessage()->enqueueComplete(); break; - case qpid::linearstore::journal::data_tok::DEQ: + case ::qpid::linearstore::journal::data_tok::DEQ: //std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG /* Don't need to signal until we have a way to ack completion of dequeue in AMQP dtokp->getSourceMessage()->dequeueComplete(); @@ -443,25 +474,25 @@ JournalImpl::createStore() { } void -JournalImpl::handleIoResult(const qpid::linearstore::journal::iores r) +JournalImpl::handleIoResult(const ::qpid::linearstore::journal::iores r) { writeActivityFlag = true; switch (r) { - case qpid::linearstore::journal::RHM_IORES_SUCCESS: + case ::qpid::linearstore::journal::RHM_IORES_SUCCESS: return; default: { std::ostringstream oss; - oss << "Unexpected I/O response (" << qpid::linearstore::journal::iores_str(r) << ")."; + oss << "Unexpected I/O response (" << ::qpid::linearstore::journal::iores_str(r) << ")."; QLS_LOG2(error, _jid, oss.str()); THROW_STORE_FULL_EXCEPTION(oss.str()); } } } -qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/, - qpid::management::Args& /*args*/, +::qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/, + ::qpid::management::Args& /*args*/, std::string& /*text*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index 763089f3d0..667579253e 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -44,86 +44,90 @@ namespace journal { class JournalImpl; class JournalLogImpl; -class InactivityFireEvent : public qpid::sys::TimerTask +class InactivityFireEvent : public ::qpid::sys::TimerTask { JournalImpl* _parent; - qpid::sys::Mutex _ife_lock; + ::qpid::sys::Mutex _ife_lock; public: - InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout); + InactivityFireEvent(JournalImpl* p, + const ::qpid::sys::Duration timeout); virtual ~InactivityFireEvent() {} void fire(); - inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; } + inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; } }; -class GetEventsFireEvent : public qpid::sys::TimerTask +class GetEventsFireEvent : public ::qpid::sys::TimerTask { JournalImpl* _parent; - qpid::sys::Mutex _gefe_lock; + ::qpid::sys::Mutex _gefe_lock; public: - GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout); + GetEventsFireEvent(JournalImpl* p, + const ::qpid::sys::Duration timeout); virtual ~GetEventsFireEvent() {} void fire(); - inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; } + inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; } }; -class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linearstore::journal::jcntl, public qpid::linearstore::journal::aio_callback +class JournalImpl : public ::qpid::broker::ExternalQueueStore, + public ::qpid::linearstore::journal::jcntl, + public ::qpid::linearstore::journal::aio_callback { public: typedef boost::function<void (JournalImpl&)> DeleteCallback; protected: - qpid::sys::Timer& timer; + ::qpid::sys::Timer& timer; JournalLogImpl& _journalLogRef; bool getEventsTimerSetFlag; - boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr; - qpid::sys::Mutex _getf_lock; - qpid::sys::Mutex _read_lock; + boost::intrusive_ptr< ::qpid::sys::TimerTask> getEventsFireEventsPtr; + ::qpid::sys::Mutex _getf_lock; + ::qpid::sys::Mutex _read_lock; bool writeActivityFlag; bool flushTriggeredFlag; - boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr; + boost::intrusive_ptr< ::qpid::sys::TimerTask> inactivityFireEventPtr; - qpid::management::ManagementAgent* _agent; - qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject; + ::qpid::management::ManagementAgent* _agent; + ::qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject; DeleteCallback deleteCallback; public: - JournalImpl(qpid::sys::Timer& timer, + JournalImpl(::qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, JournalLogImpl& journalLogRef, - const qpid::sys::Duration getEventsTimeout, - const qpid::sys::Duration flushTimeout, - qpid::management::ManagementAgent* agent, + const ::qpid::sys::Duration getEventsTimeout, + const ::qpid::sys::Duration flushTimeout, + ::qpid::management::ManagementAgent* agent, DeleteCallback deleteCallback=DeleteCallback() ); virtual ~JournalImpl(); - void initManagement(qpid::management::ManagementAgent* agent); + void initManagement(::qpid::management::ManagementAgent* agent); - void initialize(qpid::linearstore::journal::EmptyFilePool* efp, + void initialize(::qpid::linearstore::journal::EmptyFilePool* efp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::linearstore::journal::aio_callback* const cbp); + ::qpid::linearstore::journal::aio_callback* const cbp); - inline void initialize(qpid::linearstore::journal::EmptyFilePool* efpp, + inline void initialize(::qpid::linearstore::journal::EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks) { initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this); } - void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, + void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - qpid::linearstore::journal::aio_callback* const cbp, + ::qpid::linearstore::journal::aio_callback* const cbp, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, uint64_t& highest_rid, uint64_t queue_id); - inline void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm, + inline void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, @@ -135,47 +139,62 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linear void recover_complete(); // Overrides for write inactivity timer - void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, - const bool transient = false); - - void enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, - const bool transient = false); - - void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, - const bool transient = false); - - void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp, - const std::string& xid, const bool transient = false); - - void dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit = false); - - void dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false); - - void txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid); - - void txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid); + void enqueue_data_record(const void* const data_buff, + const size_t tot_data_len, + const size_t this_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const bool transient); + + void enqueue_extern_data_record(const size_t tot_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const bool transient); + + void enqueue_txn_data_record(const void* const data_buff, + const size_t tot_data_len, + const size_t this_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const std::string& xid, + const bool tpc_flag, + const bool transient); + + void enqueue_extern_txn_data_record(const size_t tot_data_len, + ::qpid::linearstore::journal::data_tok* dtokp, + const std::string& xid, + const bool tpc_flag, + const bool transient); + + void dequeue_data_record(::qpid::linearstore::journal::data_tok* + const dtokp, + const bool txn_coml_commit); + + void dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp, + const std::string& xid, + const bool tpc_flag, + const bool txn_coml_commit); + + void txn_abort(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid); + + void txn_commit(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid); void stop(bool block_till_aio_cmpl = false); // Overrides for get_events timer - qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl = false); + ::qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl); // TimerTask callback void getEventsFire(); void flushFire(); // AIO callbacks - virtual void wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl); + virtual void wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl); virtual void rd_aio_cb(std::vector<uint16_t>& pil); - qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const + ::qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const { return _mgmtObject; } - qpid::management::Manageable::status_t ManagementMethod (uint32_t, - qpid::management::Args&, - std::string&); + ::qpid::management::Manageable::status_t ManagementMethod(uint32_t, + ::qpid::management::Args&, + std::string&); void resetDeleteCallback() { deleteCallback = DeleteCallback(); } @@ -188,7 +207,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linear timer.add(getEventsFireEventsPtr); getEventsTimerSetFlag = true; } - void handleIoResult(const qpid::linearstore::journal::iores r); + void handleIoResult(const ::qpid::linearstore::journal::iores r); // Management instrumentation callbacks overridden from jcntl inline void instr_incr_outstanding_aio_cnt() { @@ -203,23 +222,27 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linear class TplJournalImpl : public JournalImpl { public: - TplJournalImpl(qpid::sys::Timer& timer, + TplJournalImpl(::qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, JournalLogImpl& journalLogRef, - const qpid::sys::Duration getEventsTimeout, - const qpid::sys::Duration flushTimeout, - qpid::management::ManagementAgent* agent) : + const ::qpid::sys::Duration getEventsTimeout, + const ::qpid::sys::Duration flushTimeout, + ::qpid::management::ManagementAgent* agent) : JournalImpl(timer, journalId, journalDirectory, journalLogRef, getEventsTimeout, flushTimeout, agent) {} virtual ~TplJournalImpl() {} // Special version of read_data_record that ignores transactions - needed when reading the TPL - inline qpid::linearstore::journal::iores read_data_record(void** const datapp, std::size_t& dsize, - void** const xidpp, std::size_t& xidsize, bool& transient, bool& external, - qpid::linearstore::journal::data_tok* const dtokp) { - return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true); + inline ::qpid::linearstore::journal::iores read_data_record(void** const datapp, + std::size_t& dsize, + void** const xidpp, + std::size_t& xidsize, + bool& transient, + bool& external, + ::qpid::linearstore::journal::data_tok* const dtokp) { + return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, false); } }; // class TplJournalImpl diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 593d2bd4b7..dd4198c6fd 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -51,16 +51,6 @@ qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys:: qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s qpid::sys::Mutex TxnCtxt::globalSerialiser; -MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_, - const bool deq_flag_, - const bool commit_flag_, - const bool tpc_flag_) : - rid(rid_), - deq_flag(deq_flag_), - commit_flag(commit_flag_), - tpc_flag(tpc_flag_) -{} - MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) : defaultEfpPartitionNumber(0), defaultEfpFileSize_kib(0), @@ -601,7 +591,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) TxnCtxt txn; txn.begin(dbenv.get(), false); try { - //read all queues, calls recoversMessages + //read all queues, calls recoversMessages for each queue recoverQueues(txn, registry_, queues, prepared, messages); //recover exchange & bindings: @@ -621,6 +611,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } //recover transactions: + qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map(); for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { const PreparedTransaction pt = *i; if (mgmtObject.get() != 0) { @@ -629,20 +620,39 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } std::string xid = pt.xid; - - // Restore data token state in TxnCtxt - TplRecoverMapCitr citr = tplRecoverMap.find(xid); - if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap"); + qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); + uint16_t enqCnt = 0UL; + uint16_t deqCnt = 0UL; + uint16_t tpcCnt = 0UL; + uint16_t abortCnt = 0UL; + uint64_t rid = 0ULL; + for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { + if (j->enq_flag_) { + ++enqCnt; + rid = j->rid_; + } else { + ++deqCnt; + } + if (!j->commit_flag_) { + ++abortCnt; + } + if (j->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count"); + bool commitFlag = abortCnt == 0; // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call // was interrupted part way through committing/aborting the impacted queues. Complete this process. - bool incomplTplTxnFlag = citr->second.deq_flag; + bool incomplTplTxnFlag = deqCnt > 0; - if (citr->second.tpc_flag) { + if (tpcCnt > 0) { // Dtx (2PC) transaction TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence); std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc); - tpcc->recoverDtok(citr->second.rid, xid); + tpcc->recoverDtok(rid, xid); tpcc->prepare(tplStorePtr.get()); qpid::broker::RecoverableTransaction::shared_ptr dtx; @@ -661,12 +671,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } if (incomplTplTxnFlag) { - tpcc->complete(citr->second.commit_flag); + tpcc->complete(commitFlag); } } else { // Local (1PC) transaction boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence)); - opcc->recoverDtok(citr->second.rid, xid); + opcc->recoverDtok(rid, xid); opcc->prepare(tplStorePtr.get()); if (pt.enqueues.get()) { @@ -680,11 +690,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) } } if (incomplTplTxnFlag) { - opcc->complete(citr->second.commit_flag); + opcc->complete(commitFlag); } else { - completed(*opcc.get(), citr->second.commit_flag); + completed(*opcc.get(), commitFlag); } } + } registry_.recoveryComplete(); } @@ -888,12 +899,13 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, bool externalFlag = false; DataTokenImpl dtok; dtok.set_wstate(DataTokenImpl::NONE); + qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map(); // Read the message from the Journal. try { unsigned aio_sleep_cnt = 0; while (read) { - qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok, false); switch (res) { @@ -932,16 +944,31 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } else { uint64_t rid = dtok.rid(); std::string xid(i->xid); - TplRecoverMapCitr citr = tplRecoverMap.find(xid); - if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap"); - - // deq present in prepared list: this xid is part of incomplete txn commit/abort - // or this is a 1PC txn that must be rolled forward - if (citr->second.deq_flag || !citr->second.tpc_flag) { + qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid); + if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map"); + uint16_t enqCnt = 0UL; + uint16_t deqCnt = 0UL; + uint16_t tpcCnt = 0UL; + uint16_t abortCnt = 0UL; + for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { + if (j->enq_flag_) { + ++enqCnt; + } else { + ++deqCnt; + } + if (!j->commit_flag_) { + ++abortCnt; + } + if (j->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count"); + if (deqCnt > 0 || tpcCnt == 0) { if (jc->is_enqueued(rid, true)) { // Enqueue is non-tx, dequeue tx assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue - if (!citr->second.commit_flag) { + if (abortCnt > 0) { rcnt++; queue->recover(msg); // recover message in abort case only } @@ -955,7 +982,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, if (j->enq_flag_ && j->rid_ == rid) enq = true; else if (!j->enq_flag_ && j->drid_ == rid) deq = true; } - if (enq && !deq && citr->second.commit_flag) { + if (enq && !deq && abortCnt == 0) { rcnt++; queue->recover(msg); // recover txn message in commit case only } @@ -969,10 +996,14 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, dtok.reset(); dtok.set_wstate(DataTokenImpl::NONE); - if (xidbuff) + if (xidbuff) { ::free(xidbuff); - else if (dbuff) + xidbuff = NULL; + } + if (dbuff) { ::free(dbuff); + dbuff = NULL; + } aio_sleep_cnt = 0; break; } @@ -1033,77 +1064,6 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, return count; } -void MessageStoreImpl::readTplStore() -{ - tplRecoverMap.clear(); - qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map(); - DataTokenImpl dtok; - void* dbuff = NULL; size_t dbuffSize = 0; - void* xidbuff = NULL; size_t xidbuffSize = 0; - bool transientFlag = false; - bool externalFlag = false; - bool done = false; - try { - unsigned aio_sleep_cnt = 0; - while (!done) { - dtok.reset(); - dtok.set_wstate(DataTokenImpl::ENQ); - qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); - switch (res) { - case qpid::linearstore::journal::RHM_IORES_SUCCESS: { - // Every TPL record contains both data and an XID - assert(dbuffSize>0); - assert(xidbuffSize>0); - std::string xid(static_cast<const char*>(xidbuff), xidbuffSize); - bool is2PC = *(static_cast<char*>(dbuff)) != 0; - - // Check transaction details; add to recover map - qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found - if (!txnList.empty()) { // xid found in tmap - unsigned enqCnt = 0; - unsigned deqCnt = 0; - uint64_t rid = 0; - - // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists. - // Note: will apply to both 1PC and 2PC transactions. - bool commitFlag = true; - - for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { - if (j->enq_flag_) { - rid = j->rid_; - enqCnt++; - } else { - commitFlag = j->commit_flag_; - deqCnt++; - } - } - assert(enqCnt == 1); - assert(deqCnt <= 1); - tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC))); - } - - ::free(xidbuff); - aio_sleep_cnt = 0; - break; - } - case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT: - if (++aio_sleep_cnt > MAX_AIO_SLEEPS) - THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()"); - ::usleep(AIO_SLEEP_TIME_US); - break; - case qpid::linearstore::journal::RHM_IORES_EMPTY: - done = true; - break; // done with all messages. (add call in jrnl to test that _emap is empty.) - default: - std::ostringstream oss; - oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res); - THROW_STORE_EXCEPTION(oss.str()); - } // switch - } - } catch (const qpid::linearstore::journal::jexception& e) { - THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); - } -} void MessageStoreImpl::recoverTplStore() { @@ -1114,11 +1074,7 @@ void MessageStoreImpl::recoverTplStore() highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit highestRid = thisHighestRid; - - // Load tplRecoverMap by reading the TPL store - readTplStore(); - - tplStorePtr->recover_complete(); // start journal. + tplStorePtr->recover_complete(); // start TPL } } @@ -1126,28 +1082,49 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) { if (!tplStorePtr->is_ready()) recoverTplStore(); - - // Abort unprepared xids and populate the locked maps - for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) { + std::vector<std::string> xidList; + tplStorePtr->get_txn_map().xid_list(xidList); + for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) { LockedMappings::shared_ptr enq_ptr; enq_ptr.reset(new LockedMappings); LockedMappings::shared_ptr deq_ptr; deq_ptr.reset(new LockedMappings); - txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr)); + txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr)); } } void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) { - if (tplStorePtr->is_ready()) { - readTplStore(); - } else { + if (!tplStorePtr->is_ready()) { recoverTplStore(); } - for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) { - // Discard all txns that are to be rolled forward/back and 1PC transactions - if (!i->second.deq_flag && i->second.tpc_flag) - xids.insert(i->first); + std::vector<std::string> xidList; + tplStorePtr->get_txn_map().xid_list(xidList); + for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) { + qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i); + uint16_t enqCnt = 0UL; + uint16_t deqCnt = 0UL; + uint16_t tpcCnt = 0UL; + uint16_t abortCnt = 0UL; + for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) { + if (j->enq_flag_) { + ++enqCnt; + } else { + ++deqCnt; + } + if (!j->commit_flag_) { + ++abortCnt; + } + if (j->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0) { + if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count"); + if (enqCnt - deqCnt > 0) { + xids.insert(*i); + } + } } } @@ -1186,7 +1163,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore()); if (jc) { // TODO: check if this result should be used... - /*mrg::journal::iores res =*/ jc->flush(); + /*mrg::journal::iores res =*/ jc->flush(false); } } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); @@ -1258,7 +1235,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_, if (txn_->getXid().empty()) { jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent()); } else { - jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent()); + jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), txn_->isTPC(), !message_->isPersistent()); } } else { THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL.")); @@ -1309,9 +1286,10 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, ddtokp->set_rid(messageIdSequence.next()); ddtokp->set_dequeue_rid(msg_->getPersistenceId()); ddtokp->set_wstate(DataTokenImpl::ENQ); + TxnCtxt* txn = 0; std::string tid; if (ctxt_) { - TxnCtxt* txn = check(ctxt_); + txn = check(ctxt_); tid = txn->getXid(); } // Manually increase the ref count, as raw pointers are used beyond this point @@ -1319,9 +1297,9 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, try { JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore()); if (tid.empty()) { - jc->dequeue_data_record(ddtokp.get()); + jc->dequeue_data_record(ddtokp.get(), false); } else { - jc->dequeue_txn_data_record(ddtokp.get(), tid); + jc->dequeue_txn_data_record(ddtokp.get(), tid, txn?txn->isTPC():false, false); } } catch (const qpid::linearstore::journal::jexception& e) { ddtokp->release(); @@ -1341,7 +1319,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_, DataTokenImpl* dtokp = txn_.getDtok(); dtokp->set_dequeue_rid(dtokp->rid()); dtokp->set_rid(messageIdSequence.next()); - tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_); + tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), txn_.isTPC(), commit_); } txn_.complete(commit_); if (mgmtObject.get() != 0) { @@ -1376,12 +1354,16 @@ void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt_) { checkInit(); TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_); +//std::string xid=txn->getXid(); std::cout << "*** MessageStoreImpl::prepare() xid=" << std::hex; +//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl; if(!txn) throw qpid::broker::InvalidTransactionContextException(); localPrepare(txn); } void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_) { +//std::string xid=ctxt_->getXid(); std::cout << "*** MessageStoreImpl::localPrepare() xid=" << std::hex; +//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl; try { chkTplStoreInit(); // Late initialize (if needed) @@ -1394,7 +1376,7 @@ void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_) dtokp->set_external_rid(true); dtokp->set_rid(messageIdSequence.next()); char tpcFlag = static_cast<char>(ctxt_->isTPC()); - tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false); + tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), tpcFlag != 0, false); ctxt_->prepare(tplStorePtr.get()); // make sure all the data is written to disk before returning ctxt_->sync(); diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index e995237d99..95667be82e 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -89,19 +89,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem typedef LockedMappings::map txn_lock_map; typedef boost::ptr_list<PreparedTransaction> txn_list; - // Structs for Transaction Recover List (TPL) recover state - struct TplRecoverStruct { - uint64_t rid; // rid of TPL record - bool deq_flag; - bool commit_flag; - bool tpc_flag; - TplRecoverStruct(const uint64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag); - }; - typedef TplRecoverStruct TplRecover; - typedef std::pair<std::string, TplRecover> TplRecoverMapPair; - typedef std::map<std::string, TplRecover> TplRecoverMap; - typedef TplRecoverMap::const_iterator TplRecoverMapCitr; - typedef std::map<std::string, JournalImpl*> JournalListMap; typedef JournalListMap::iterator JournalListMapItr; @@ -127,7 +114,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // Pointer to Transaction Prepared List (TPL) journal instance boost::shared_ptr<TplJournalImpl> tplStorePtr; - TplRecoverMap tplRecoverMap; qpid::sys::Mutex tplInitLock; JournalListMap journalList; qpid::sys::Mutex journalListLock; @@ -202,7 +188,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem queue_index& index, txn_list& locked, message_index& prepared); - void readTplStore(); void recoverTplStore(); void recoverLockedMappings(txn_list& txns); TxnCtxt* check(qpid::broker::TransactionContext* ctxt); diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp index 50ef58aafa..743d12989a 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -114,7 +114,7 @@ void TxnCtxt::sync() { void TxnCtxt::jrnl_flush(JournalImpl* jc) { if (jc && !(jc->is_txn_synced(getXid()))) - jc->flush(); + jc->flush(false); } void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) { diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp index f962976daf..5483f3bb94 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp @@ -51,6 +51,10 @@ void LinearFileController::initialize(const std::string& journalDirectory, } void LinearFileController::finalize() { + if (currentJournalFilePtr_) { + currentJournalFilePtr_->close(); + currentJournalFilePtr_ = NULL; + } while (!journalFileList_.empty()) { delete journalFileList_.front(); journalFileList_.pop_front(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 66ac7c3a2d..16ca1e0994 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -49,6 +49,18 @@ namespace qpid { namespace linearstore { namespace journal { +RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn) : + recordId_(rid), + fileId_(fid), + fileOffset_(foffs), + pendingTransaction_(ptxn) +{} + + +bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) { + return a.recordId_ < b.recordId_; +} + RecoveryManager::RecoveryManager(const std::string& journalDirectory, const std::string& queuename, enq_map& enqueueMapRef, @@ -86,6 +98,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr if (!journalEmptyFlag_) { // Read all records, establish remaining enqueued records + if (inFileStream_.is_open()) { + inFileStream_.close(); + } while (getNextRecordHeader()) {} if (inFileStream_.is_open()) { inFileStream_.close(); @@ -120,11 +135,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr } } } - - // Set up recordIdList_ from enqueue map - enqueueMapRef_.rid_list(recordIdList_); - - recordIdListConstItr_ = recordIdList_.begin(); + prepareRecordList(); } } @@ -151,37 +162,44 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, bool& transient, bool& external, data_tok* const dtokp, - bool /*ignore_pending_txns*/) { - if (recordIdListConstItr_ == recordIdList_.end()) { - return false; - } - enq_map::emap_data_struct_t eds; - enqueueMapRef_.get_data(*recordIdListConstItr_, eds); - if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) { - getFile(eds._pfid, false); - } -//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG + bool ignore_pending_txns) { + bool foundRecord = false; + do { + if (recordIdListConstItr_ == recordIdList_.end()) { + return false; + } + if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction + ++recordIdListConstItr_; // ignore, go to next record + } else { + foundRecord = true; + } + } while (!foundRecord); - inFileStream_.seekg(eds._file_posn, std::ifstream::beg); + if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) { + if (!getFile(recordIdListConstItr_->fileId_, false)) { + std::ostringstream oss; + oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_; + throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord"); + } + } + inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg); if (!inFileStream_.good()) { std::ostringstream oss; - oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName(); + oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName(); throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord"); } + ::enq_hdr_t enqueueHeader; inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t)); if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) { std::ostringstream oss; - oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn; + oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_; throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord"); } // check flags transient = ::is_enq_transient(&enqueueHeader); external = ::is_enq_external(&enqueueHeader); -//char magicBuff[5]; // DEBUG -//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG -//magicBuff[4] = 0; // DEBUG -//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG + // read xid xidSize = enqueueHeader._xidsize; *xidPtrPtr = ::malloc(xidSize); @@ -386,6 +404,12 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) { } void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) { + if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) { + std::ostringstream oss; + oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition; + oss << " (dblk alignment offset = 0x" << (recordPosition % QLS_DBLK_SIZE_BYTES); + throw jexception(jerrno::JERR_RCVM_NOTDBLKALIGNED, oss.str(), "RecoveryManager", "checkJournalAlignment"); + } std::streampos currentPosn = recordPosition; unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES; if (sblkOffset) @@ -574,7 +598,7 @@ bool RecoveryManager::getNextRecordHeader() throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, er.xid_size()); - transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true)); + transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false /*tpcFlag*/)); if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found std::ostringstream oss; oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid; @@ -725,6 +749,35 @@ bool RecoveryManager::needNextFile() { return true; } +void RecoveryManager::prepareRecordList() { + // Set up recordIdList_ from enqueue map and transaction map + recordIdList_.clear(); + + // Extract records from enqueue list + std::vector<uint64_t> ridList; + enqueueMapRef_.rid_list(ridList); + qpid::linearstore::journal::enq_map::emap_data_struct_t eds; + for (std::vector<uint64_t>::const_iterator i=ridList.begin(); i!=ridList.end(); ++i) { + enqueueMapRef_.get_data(*i, eds); + recordIdList_.push_back(RecoveredRecordData_t(*i, eds._pfid, eds._file_posn, false)); + } + + // Extract records from pending transaction enqueues + std::vector<std::string> xidList; + transactionMapRef_.xid_list(xidList); + for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) { + qpid::linearstore::journal::txn_data_list tdsl = transactionMapRef_.get_tdata_list(*j); + for (qpid::linearstore::journal::tdl_itr k=tdsl.begin(); k!=tdsl.end(); ++k) { + if (k->enq_flag_) { + recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true)); + } + } + } + + std::sort(recordIdList_.begin(), recordIdList_.end(), recordIdListCompare); + recordIdListConstItr_ = recordIdList_.begin(); +} + void RecoveryManager::readJournalData(char* target, const std::streamsize readSize) { std::streamoff bytesRead = 0; diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h index a32fdc1853..997596938b 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h @@ -22,7 +22,6 @@ #ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_ #define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_ -#include <deque> #include <fstream> #include <map> #include "qpid/linearstore/journal/LinearFileController.h" @@ -44,6 +43,16 @@ class JournalLog; class jrec; class txn_map; +struct RecoveredRecordData_t { + uint64_t recordId_; + uint64_t fileId_; + std::streampos fileOffset_; + bool pendingTransaction_; + RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn); +}; + +bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b); + class RecoveryManager { protected: @@ -53,7 +62,7 @@ protected: typedef std::map<uint64_t, JournalFile*> fileNumberMap_t; typedef fileNumberMap_t::iterator fileNumberMapItr_t; typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t; - typedef std::vector<uint64_t> recordIdList_t; + typedef std::vector<RecoveredRecordData_t> recordIdList_t; typedef recordIdList_t::const_iterator recordIdListConstItr_t; // Location and identity @@ -123,6 +132,7 @@ protected: bool getNextFile(bool jumpToFirstRecordOffsetFlag); bool getNextRecordHeader(); bool needNextFile(); + void prepareRecordList(); bool readFileHeader(); void readJournalData(char* target, const std::streamsize size); void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr); diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp index cb03978696..55bac4a2e5 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp @@ -155,7 +155,7 @@ jcntl::enqueue_data_record(const void* const data_buff, check_wstatus("enqueue_data_record"); { slock s(_wr_mutex); - while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r, + while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, false, transient, false), r, dtokp)) ; } return r; @@ -170,7 +170,7 @@ jcntl::enqueue_extern_data_record(const std::size_t tot_data_len, check_wstatus("enqueue_extern_data_record"); { slock s(_wr_mutex); - while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient, true), r, dtokp)) ; + while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, false, transient, true), r, dtokp)) ; } return r; } @@ -181,6 +181,7 @@ jcntl::enqueue_txn_data_record(const void* const data_buff, const std::size_t this_data_len, data_tok* dtokp, const std::string& xid, + const bool tpc_flag, const bool transient) { iores r; @@ -188,7 +189,7 @@ jcntl::enqueue_txn_data_record(const void* const data_buff, { slock s(_wr_mutex); while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(), - transient, false), r, dtokp)) ; + tpc_flag, transient, false), r, dtokp)) ; } return r; } @@ -197,14 +198,15 @@ iores jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp, const std::string& xid, + const bool tpc_flag, const bool transient) { iores r; check_wstatus("enqueue_extern_txn_data_record"); { slock s(_wr_mutex); - while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true), r, - dtokp)) ; + while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), tpc_flag, transient, + true), r, dtokp)) ; } return r; } @@ -234,7 +236,7 @@ jcntl::dequeue_data_record(data_tok* const dtokp, check_wstatus("dequeue_data"); { slock s(_wr_mutex); - while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ; + while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, false, txn_coml_commit), r, dtokp)) ; } return r; } @@ -242,13 +244,14 @@ jcntl::dequeue_data_record(data_tok* const dtokp, iores jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, + const bool tpc_flag, const bool txn_coml_commit) { iores r; check_wstatus("dequeue_data"); { slock s(_wr_mutex); - while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ; + while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), tpc_flag, txn_coml_commit), r, dtokp)) ; } return r; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h index c12c8afbc9..2db0e707a7 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h @@ -270,11 +270,11 @@ public: const std::size_t tot_data_len, const std::size_t this_data_len, data_tok* dtokp, - const bool transient = false); + const bool transient); iores enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp, - const bool transient = false); + const bool transient); /** * \brief Enqueue data. @@ -294,12 +294,14 @@ public: const std::size_t this_data_len, data_tok* dtokp, const std::string& xid, - const bool transient = false); + const bool tpc_flag, + const bool transient); iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp, const std::string& xid, - const bool transient = false); + const bool tpc_flag, + const bool transient); /** * \brief Reads data from the journal. It is the responsibility of the reader to free @@ -350,7 +352,7 @@ public: bool& transient, bool& external, data_tok* const dtokp, - bool ignore_pending_txns = false); + bool ignore_pending_txns); /** * \brief Dequeues (marks as no longer needed) data record in journal. @@ -370,7 +372,7 @@ public: * \exception TODO */ iores dequeue_data_record(data_tok* const dtokp, - const bool txn_coml_commit = false); + const bool txn_coml_commit); /** * \brief Dequeues (marks as no longer needed) data record in journal. @@ -393,7 +395,8 @@ public: */ iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, - const bool txn_coml_commit = false); + const bool tpc_flag, + const bool txn_coml_commit); /** * \brief Abort the transaction for all records enqueued or dequeued with the matching xid. @@ -458,12 +461,12 @@ public: * \param block_till_aio_cmpl If true, will block the thread while waiting for all * outstanding AIO operations to complete. */ - void stop(const bool block_till_aio_cmpl = false); + void stop(const bool block_till_aio_cmpl); /** * \brief Force a flush of the write page cache, creating a single AIO write operation. */ - iores flush(const bool block_till_aio_cmpl = false); + iores flush(const bool block_till_aio_cmpl); inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe? @@ -480,7 +483,7 @@ public: * false if the rid is transactionally enqueued and is not committed, or if it is * locked (i.e. transactionally dequeued, but the dequeue has not been committed). */ - inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); } + inline bool is_enqueued(const uint64_t rid, bool ignore_lock) { return _emap.is_enqueued(rid, ignore_lock); } inline bool is_locked(const uint64_t rid) { if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp index 51e90c6bb8..01c432d37b 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp @@ -92,6 +92,8 @@ const uint32_t jerrno::JERR_RCVM_STREAMBAD = 0x0901; ///< Read/write const uint32_t jerrno::JERR_RCVM_READ = 0x0902; ///< Read error: no or insufficient data to read const uint32_t jerrno::JERR_RCVM_WRITE = 0x0903; ///< Write error const uint32_t jerrno::JERR_RCVM_NULLXID = 0x0904; ///< Null XID when XID length non-null in header +const uint32_t jerrno::JERR_RCVM_NOTDBLKALIGNED = 0x0905; ///< Offset is not data block (dblk)-aligned + // class data_tok const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00; @@ -182,6 +184,7 @@ jerrno::__init() _err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read"; _err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error"; _err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header"; + _err_map[JERR_RCVM_NOTDBLKALIGNED] = "JERR_RCVM_NOTDBLKALIGNED: Offset is not data block (dblk)-aligned"; // class data_tok _err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state."; diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h index 5af0a7ada0..62f18c1878 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h @@ -110,6 +110,7 @@ namespace journal { static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read static const uint32_t JERR_RCVM_WRITE; ///< Write error static const uint32_t JERR_RCVM_NULLXID; ///< Null XID when XID length non-null in header + static const uint32_t JERR_RCVM_NOTDBLKALIGNED; ///< Offset is not data block (dblk)-aligned // class data_tok static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp index 5e0a28814d..304ab0f6ee 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp @@ -39,12 +39,14 @@ txn_data_t::txn_data_t(const uint64_t rid, const uint16_t pfid, const uint64_t foffs, const bool enq_flag, + const bool tpc_flag, const bool commit_flag): rid_(rid), drid_(drid), pfid_(pfid), foffs_(foffs), enq_flag_(enq_flag), + tpc_flag_(tpc_flag), commit_flag_(commit_flag), aio_compl_(false) {} diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h index 0f8cd5f3d7..e2df6c7d0e 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h @@ -42,13 +42,15 @@ namespace journal { uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit uint64_t foffs_; ///< Offset in file for this record bool enq_flag_; ///< If true, enq op, otherwise deq op - bool commit_flag_; ///< (2PC transactions) Records 2PC complete c/a mode + bool tpc_flag_; ///< 2PC transaction if true + bool commit_flag_; ///< TPL only: (2PC transactions) Records 2PC complete c/a mode bool aio_compl_; ///< Initially false, set to true when record AIO returns txn_data_t(const uint64_t rid, const uint64_t drid, const uint16_t pfid, const uint64_t foffs, const bool enq_flag, + const bool tpc_flag, const bool commit_flag = false); } txn_data_t; typedef std::vector<txn_data_t> txn_data_list; diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp index b5d2e51ec0..fa5f83cd24 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp @@ -220,11 +220,9 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) } } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); - if (::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, 0)) { // TODO: add checksum - throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail - } assert(!ifsp->fail() && !ifsp->bad()); assert(_txn_hdr._xidsize > 0); + Checksum checksum; checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr)); checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize); diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index 9b94c7959d..403ad68922 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -108,6 +108,7 @@ wmgr::enqueue(const void* const data_buff, data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, + const bool tpc_flag, const bool transient, const bool external) { @@ -196,7 +197,7 @@ wmgr::enqueue(const void* const data_buff, if (xid_len) // If part of transaction, add to transaction map { std::string xid((const char*)xid_ptr, xid_len); - _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true)); + _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true, tpc_flag)); } else { @@ -228,6 +229,7 @@ iores wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, + const bool tpc_flag, const bool txn_coml_commit) { if (xid_len) @@ -312,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp, // If the enqueue is part of a pending txn, it will not yet be in emap _emap.lock(dequeue_rid); // ignore rid not found error std::string xid((const char*)xid_ptr, xid_len); - _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false)); + _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false, tpc_flag)); } else { diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h index 0aa21ca545..8837e51e97 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h @@ -97,11 +97,13 @@ public: data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, + const bool tpc_flag, const bool transient, const bool external); iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, + const bool tpc_flag, const bool txn_coml_commit); iores abort(data_tok* dtokp, const void* const xid_ptr, |