summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-12-20 18:07:31 +0000
committerKim van der Riet <kpvdr@apache.org>2013-12-20 18:07:31 +0000
commit5d28e44df2fd9d627d52fda08faeba72815c2ed4 (patch)
tree1868335dc0cd71d81614e50fe1fa0c5645e6af51 /qpid/cpp
parent829b44f73e0825f838099af4b683b1f744c0e2aa (diff)
downloadqpid-python-5d28e44df2fd9d627d52fda08faeba72815c2ed4.tar.gz
QPID-5422: DTX test failure, and some tidying up of code in JournalImpl.cpp/h
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1552772 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES2
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.cpp127
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.h149
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp236
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp99
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h14
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp17
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.h23
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.h1
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_map.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp6
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.h2
18 files changed, 407 insertions, 303 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 97a9e3fb2a..deeefb31fe 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -59,7 +59,7 @@ Current bugs and performance issues:
5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
6. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
-7. BZ 1039949 - DTX test failure - missing XIDs
+7. QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
Code tidy-up
------------
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
index 081fab190d..a71db3d384 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
@@ -30,23 +30,36 @@
namespace qpid {
namespace linearstore {
-InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
-
-void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
+InactivityFireEvent::InactivityFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout):
+ ::qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
+
+void InactivityFireEvent::fire() {
+ ::qpid::sys::Mutex::ScopedLock sl(_ife_lock);
+ if (_parent) {
+ _parent->flushFire();
+ }
+}
-GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {}
+GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout):
+ ::qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p)
+{}
-void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
+void GetEventsFireEvent::fire() {
+ ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock);
+ if (_parent) {
+ _parent->getEventsFire();
+ }
+}
-JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+JournalImpl::JournalImpl(::qpid::sys::Timer& timer_,
const std::string& journalId,
const std::string& journalDirectory,
JournalLogImpl& journalLogRef,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* a,
+ const ::qpid::sys::Duration getEventsTimeout,
+ const ::qpid::sys::Duration flushTimeout,
+ ::qpid::management::ManagementAgent* a,
DeleteCallback onDelete):
jcntl(journalId, journalDirectory, journalLogRef),
timer(timer_),
@@ -76,7 +89,7 @@ JournalImpl::~JournalImpl()
if (deleteCallback) deleteCallback(*this);
if (_init_flag && !_stop_flag){
try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
- catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); }
+ catch (const ::qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); }
}
getEventsFireEventsPtr->cancel();
inactivityFireEventPtr->cancel();
@@ -90,7 +103,7 @@ JournalImpl::~JournalImpl()
}
void
-JournalImpl::initManagement(qpid::management::ManagementAgent* a)
+JournalImpl::initManagement(::qpid::management::ManagementAgent* a)
{
_agent = a;
if (_agent != 0)
@@ -117,10 +130,10 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
void
-JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_,
+JournalImpl::initialize(::qpid::linearstore::journal::EmptyFilePool* efpp_,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp)
+ ::qpid::linearstore::journal::aio_callback* const cbp)
{
// efpp->createJournal(_jdir);
// QLS_LOG2(notice, _jid, "Initialized");
@@ -152,10 +165,10 @@ JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_,
}
void
-JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp,
+ ::qpid::linearstore::journal::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
uint64_t& highest_rid,
uint64_t queue_id)
@@ -196,8 +209,8 @@ JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePool
if (prep_tx_list_ptr)
{
for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
- qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
- for (qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ ::qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+ for (::qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
if (tdl_itr->enq_flag_) { // enqueue op
i->enqueues->add(queue_id, tdl_itr->rid_);
} else { // dequeue op
@@ -237,8 +250,11 @@ JournalImpl::recover_complete()
void
-JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient)
+JournalImpl::enqueue_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient)
{
handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
@@ -250,8 +266,9 @@ JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_d
}
void
-JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const bool transient)
+JournalImpl::enqueue_extern_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient)
{
handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
@@ -263,12 +280,17 @@ JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linears
}
void
-JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient)
+JournalImpl::enqueue_txn_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
- handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
+ handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, tpc_flag, transient));
if (_mgmtObject.get() != 0)
{
@@ -281,12 +303,15 @@ JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t t
}
void
-JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const std::string& xid, const bool transient)
+JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
- handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
+ handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, tpc_flag, transient));
if (_mgmtObject.get() != 0)
{
@@ -299,7 +324,8 @@ JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::lin
}
void
-JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit)
+JournalImpl::dequeue_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+ const bool txn_coml_commit)
{
handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
@@ -312,11 +338,14 @@ JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dto
}
void
-JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+JournalImpl::dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool txn_coml_commit)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
- handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
+ handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, tpc_flag, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
@@ -329,7 +358,8 @@ JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const
}
void
-JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_abort(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid)
{
handleIoResult(jcntl::txn_abort(dtokp, xid));
@@ -341,7 +371,8 @@ JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const
}
void
-JournalImpl::txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_commit(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid)
{
handleIoResult(jcntl::txn_commit(dtokp, xid));
@@ -366,12 +397,12 @@ JournalImpl::stop(bool block_till_aio_cmpl)
}
}
-qpid::linearstore::journal::iores
+::qpid::linearstore::journal::iores
JournalImpl::flush(const bool block_till_aio_cmpl)
{
- const qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl);
+ const ::qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl);
{
- qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+ ::qpid::sys::Mutex::ScopedLock sl(_getf_lock);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
}
return res;
@@ -380,7 +411,7 @@ JournalImpl::flush(const bool block_till_aio_cmpl)
void
JournalImpl::getEventsFire()
{
- qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+ ::qpid::sys::Mutex::ScopedLock sl(_getf_lock);
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
@@ -394,7 +425,7 @@ JournalImpl::flushFire()
flushTriggeredFlag = false;
} else {
if (!flushTriggeredFlag) {
- flush();
+ flush(false);
flushTriggeredFlag = true;
}
}
@@ -405,20 +436,20 @@ JournalImpl::flushFire()
}
void
-JournalImpl::wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl)
+JournalImpl::wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl)
{
- for (std::vector<qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+ for (std::vector< ::qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
{
DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
{
switch (dtokp->wstate())
{
- case qpid::linearstore::journal::data_tok::ENQ:
+ case ::qpid::linearstore::journal::data_tok::ENQ:
//std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
dtokp->getSourceMessage()->enqueueComplete();
break;
- case qpid::linearstore::journal::data_tok::DEQ:
+ case ::qpid::linearstore::journal::data_tok::DEQ:
//std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
dtokp->getSourceMessage()->dequeueComplete();
@@ -443,25 +474,25 @@ JournalImpl::createStore() {
}
void
-JournalImpl::handleIoResult(const qpid::linearstore::journal::iores r)
+JournalImpl::handleIoResult(const ::qpid::linearstore::journal::iores r)
{
writeActivityFlag = true;
switch (r)
{
- case qpid::linearstore::journal::RHM_IORES_SUCCESS:
+ case ::qpid::linearstore::journal::RHM_IORES_SUCCESS:
return;
default:
{
std::ostringstream oss;
- oss << "Unexpected I/O response (" << qpid::linearstore::journal::iores_str(r) << ").";
+ oss << "Unexpected I/O response (" << ::qpid::linearstore::journal::iores_str(r) << ").";
QLS_LOG2(error, _jid, oss.str());
THROW_STORE_FULL_EXCEPTION(oss.str());
}
}
}
-qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
- qpid::management::Args& /*args*/,
+::qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
+ ::qpid::management::Args& /*args*/,
std::string& /*text*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
index 763089f3d0..667579253e 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
@@ -44,86 +44,90 @@ namespace journal {
class JournalImpl;
class JournalLogImpl;
-class InactivityFireEvent : public qpid::sys::TimerTask
+class InactivityFireEvent : public ::qpid::sys::TimerTask
{
JournalImpl* _parent;
- qpid::sys::Mutex _ife_lock;
+ ::qpid::sys::Mutex _ife_lock;
public:
- InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+ InactivityFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout);
virtual ~InactivityFireEvent() {}
void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+ inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
};
-class GetEventsFireEvent : public qpid::sys::TimerTask
+class GetEventsFireEvent : public ::qpid::sys::TimerTask
{
JournalImpl* _parent;
- qpid::sys::Mutex _gefe_lock;
+ ::qpid::sys::Mutex _gefe_lock;
public:
- GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+ GetEventsFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout);
virtual ~GetEventsFireEvent() {}
void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+ inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
};
-class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linearstore::journal::jcntl, public qpid::linearstore::journal::aio_callback
+class JournalImpl : public ::qpid::broker::ExternalQueueStore,
+ public ::qpid::linearstore::journal::jcntl,
+ public ::qpid::linearstore::journal::aio_callback
{
public:
typedef boost::function<void (JournalImpl&)> DeleteCallback;
protected:
- qpid::sys::Timer& timer;
+ ::qpid::sys::Timer& timer;
JournalLogImpl& _journalLogRef;
bool getEventsTimerSetFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
- qpid::sys::Mutex _getf_lock;
- qpid::sys::Mutex _read_lock;
+ boost::intrusive_ptr< ::qpid::sys::TimerTask> getEventsFireEventsPtr;
+ ::qpid::sys::Mutex _getf_lock;
+ ::qpid::sys::Mutex _read_lock;
bool writeActivityFlag;
bool flushTriggeredFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+ boost::intrusive_ptr< ::qpid::sys::TimerTask> inactivityFireEventPtr;
- qpid::management::ManagementAgent* _agent;
- qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
+ ::qpid::management::ManagementAgent* _agent;
+ ::qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
DeleteCallback deleteCallback;
public:
- JournalImpl(qpid::sys::Timer& timer,
+ JournalImpl(::qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
JournalLogImpl& journalLogRef,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent,
+ const ::qpid::sys::Duration getEventsTimeout,
+ const ::qpid::sys::Duration flushTimeout,
+ ::qpid::management::ManagementAgent* agent,
DeleteCallback deleteCallback=DeleteCallback() );
virtual ~JournalImpl();
- void initManagement(qpid::management::ManagementAgent* agent);
+ void initManagement(::qpid::management::ManagementAgent* agent);
- void initialize(qpid::linearstore::journal::EmptyFilePool* efp,
+ void initialize(::qpid::linearstore::journal::EmptyFilePool* efp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp);
+ ::qpid::linearstore::journal::aio_callback* const cbp);
- inline void initialize(qpid::linearstore::journal::EmptyFilePool* efpp,
+ inline void initialize(::qpid::linearstore::journal::EmptyFilePool* efpp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks) {
initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
}
- void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+ void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp,
+ ::qpid::linearstore::journal::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
uint64_t& highest_rid,
uint64_t queue_id);
- inline void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+ inline void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
@@ -135,47 +139,62 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linear
void recover_complete();
// Overrides for write inactivity timer
- void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const bool transient = false);
-
- void enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const bool transient = false);
-
- void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid,
- const bool transient = false);
-
- void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const std::string& xid, const bool transient = false);
-
- void dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
-
- void dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
-
- void txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
-
- void txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
+ void enqueue_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient);
+
+ void enqueue_extern_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient);
+
+ void enqueue_txn_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient);
+
+ void enqueue_extern_txn_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient);
+
+ void dequeue_data_record(::qpid::linearstore::journal::data_tok*
+ const dtokp,
+ const bool txn_coml_commit);
+
+ void dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool txn_coml_commit);
+
+ void txn_abort(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
+
+ void txn_commit(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
void stop(bool block_till_aio_cmpl = false);
// Overrides for get_events timer
- qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl = false);
+ ::qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl);
// TimerTask callback
void getEventsFire();
void flushFire();
// AIO callbacks
- virtual void wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl);
+ virtual void wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl);
virtual void rd_aio_cb(std::vector<uint16_t>& pil);
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ ::qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return _mgmtObject; }
- qpid::management::Manageable::status_t ManagementMethod (uint32_t,
- qpid::management::Args&,
- std::string&);
+ ::qpid::management::Manageable::status_t ManagementMethod(uint32_t,
+ ::qpid::management::Args&,
+ std::string&);
void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
@@ -188,7 +207,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linear
timer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
- void handleIoResult(const qpid::linearstore::journal::iores r);
+ void handleIoResult(const ::qpid::linearstore::journal::iores r);
// Management instrumentation callbacks overridden from jcntl
inline void instr_incr_outstanding_aio_cnt() {
@@ -203,23 +222,27 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linear
class TplJournalImpl : public JournalImpl
{
public:
- TplJournalImpl(qpid::sys::Timer& timer,
+ TplJournalImpl(::qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
JournalLogImpl& journalLogRef,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent) :
+ const ::qpid::sys::Duration getEventsTimeout,
+ const ::qpid::sys::Duration flushTimeout,
+ ::qpid::management::ManagementAgent* agent) :
JournalImpl(timer, journalId, journalDirectory, journalLogRef, getEventsTimeout, flushTimeout, agent)
{}
virtual ~TplJournalImpl() {}
// Special version of read_data_record that ignores transactions - needed when reading the TPL
- inline qpid::linearstore::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
- void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
- qpid::linearstore::journal::data_tok* const dtokp) {
- return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+ inline ::qpid::linearstore::journal::iores read_data_record(void** const datapp,
+ std::size_t& dsize,
+ void** const xidpp,
+ std::size_t& xidsize,
+ bool& transient,
+ bool& external,
+ ::qpid::linearstore::journal::data_tok* const dtokp) {
+ return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, false);
}
}; // class TplJournalImpl
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 593d2bd4b7..dd4198c6fd 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -51,16 +51,6 @@ qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::
qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_,
- const bool deq_flag_,
- const bool commit_flag_,
- const bool tpc_flag_) :
- rid(rid_),
- deq_flag(deq_flag_),
- commit_flag(commit_flag_),
- tpc_flag(tpc_flag_)
-{}
-
MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
defaultEfpPartitionNumber(0),
defaultEfpFileSize_kib(0),
@@ -601,7 +591,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
TxnCtxt txn;
txn.begin(dbenv.get(), false);
try {
- //read all queues, calls recoversMessages
+ //read all queues, calls recoversMessages for each queue
recoverQueues(txn, registry_, queues, prepared, messages);
//recover exchange & bindings:
@@ -621,6 +611,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
//recover transactions:
+ qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
const PreparedTransaction pt = *i;
if (mgmtObject.get() != 0) {
@@ -629,20 +620,39 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
std::string xid = pt.xid;
-
- // Restore data token state in TxnCtxt
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
+ qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+ uint16_t enqCnt = 0UL;
+ uint16_t deqCnt = 0UL;
+ uint16_t tpcCnt = 0UL;
+ uint16_t abortCnt = 0UL;
+ uint64_t rid = 0ULL;
+ for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
+ if (j->enq_flag_) {
+ ++enqCnt;
+ rid = j->rid_;
+ } else {
+ ++deqCnt;
+ }
+ if (!j->commit_flag_) {
+ ++abortCnt;
+ }
+ if (j->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count");
+ bool commitFlag = abortCnt == 0;
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTplTxnFlag = citr->second.deq_flag;
+ bool incomplTplTxnFlag = deqCnt > 0;
- if (citr->second.tpc_flag) {
+ if (tpcCnt > 0) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
- tpcc->recoverDtok(citr->second.rid, xid);
+ tpcc->recoverDtok(rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
@@ -661,12 +671,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
if (incomplTplTxnFlag) {
- tpcc->complete(citr->second.commit_flag);
+ tpcc->complete(commitFlag);
}
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
- opcc->recoverDtok(citr->second.rid, xid);
+ opcc->recoverDtok(rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
@@ -680,11 +690,12 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
}
}
if (incomplTplTxnFlag) {
- opcc->complete(citr->second.commit_flag);
+ opcc->complete(commitFlag);
} else {
- completed(*opcc.get(), citr->second.commit_flag);
+ completed(*opcc.get(), commitFlag);
}
}
+
}
registry_.recoveryComplete();
}
@@ -888,12 +899,13 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
bool externalFlag = false;
DataTokenImpl dtok;
dtok.set_wstate(DataTokenImpl::NONE);
+ qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
- qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+ qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok, false);
switch (res)
{
@@ -932,16 +944,31 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
} else {
uint64_t rid = dtok.rid();
std::string xid(i->xid);
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-
- // deq present in prepared list: this xid is part of incomplete txn commit/abort
- // or this is a 1PC txn that must be rolled forward
- if (citr->second.deq_flag || !citr->second.tpc_flag) {
+ qpid::linearstore::journal::txn_data_list tdl = txn_map_ref.get_tdata_list(xid);
+ if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+ uint16_t enqCnt = 0UL;
+ uint16_t deqCnt = 0UL;
+ uint16_t tpcCnt = 0UL;
+ uint16_t abortCnt = 0UL;
+ for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
+ if (j->enq_flag_) {
+ ++enqCnt;
+ } else {
+ ++deqCnt;
+ }
+ if (!j->commit_flag_) {
+ ++abortCnt;
+ }
+ if (j->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0 && tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count");
+ if (deqCnt > 0 || tpcCnt == 0) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
- if (!citr->second.commit_flag) {
+ if (abortCnt > 0) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
@@ -955,7 +982,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
if (j->enq_flag_ && j->rid_ == rid) enq = true;
else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
}
- if (enq && !deq && citr->second.commit_flag) {
+ if (enq && !deq && abortCnt == 0) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
@@ -969,10 +996,14 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
dtok.reset();
dtok.set_wstate(DataTokenImpl::NONE);
- if (xidbuff)
+ if (xidbuff) {
::free(xidbuff);
- else if (dbuff)
+ xidbuff = NULL;
+ }
+ if (dbuff) {
::free(dbuff);
+ dbuff = NULL;
+ }
aio_sleep_cnt = 0;
break;
}
@@ -1033,77 +1064,6 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_,
return count;
}
-void MessageStoreImpl::readTplStore()
-{
- tplRecoverMap.clear();
- qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map();
- DataTokenImpl dtok;
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
- bool transientFlag = false;
- bool externalFlag = false;
- bool done = false;
- try {
- unsigned aio_sleep_cnt = 0;
- while (!done) {
- dtok.reset();
- dtok.set_wstate(DataTokenImpl::ENQ);
- qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
- switch (res) {
- case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
- // Every TPL record contains both data and an XID
- assert(dbuffSize>0);
- assert(xidbuffSize>0);
- std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
- bool is2PC = *(static_cast<char*>(dbuff)) != 0;
-
- // Check transaction details; add to recover map
- qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
- if (!txnList.empty()) { // xid found in tmap
- unsigned enqCnt = 0;
- unsigned deqCnt = 0;
- uint64_t rid = 0;
-
- // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
- // Note: will apply to both 1PC and 2PC transactions.
- bool commitFlag = true;
-
- for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->enq_flag_) {
- rid = j->rid_;
- enqCnt++;
- } else {
- commitFlag = j->commit_flag_;
- deqCnt++;
- }
- }
- assert(enqCnt == 1);
- assert(deqCnt <= 1);
- tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
- }
-
- ::free(xidbuff);
- aio_sleep_cnt = 0;
- break;
- }
- case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()");
- ::usleep(AIO_SLEEP_TIME_US);
- break;
- case qpid::linearstore::journal::RHM_IORES_EMPTY:
- done = true;
- break; // done with all messages. (add call in jrnl to test that _emap is empty.)
- default:
- std::ostringstream oss;
- oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res);
- THROW_STORE_EXCEPTION(oss.str());
- } // switch
- }
- } catch (const qpid::linearstore::journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
- }
-}
void MessageStoreImpl::recoverTplStore()
{
@@ -1114,11 +1074,7 @@ void MessageStoreImpl::recoverTplStore()
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
highestRid = thisHighestRid;
-
- // Load tplRecoverMap by reading the TPL store
- readTplStore();
-
- tplStorePtr->recover_complete(); // start journal.
+ tplStorePtr->recover_complete(); // start TPL
}
}
@@ -1126,28 +1082,49 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
{
if (!tplStorePtr->is_ready())
recoverTplStore();
-
- // Abort unprepared xids and populate the locked maps
- for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
+ std::vector<std::string> xidList;
+ tplStorePtr->get_txn_map().xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
}
}
void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
- if (tplStorePtr->is_ready()) {
- readTplStore();
- } else {
+ if (!tplStorePtr->is_ready()) {
recoverTplStore();
}
- for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- // Discard all txns that are to be rolled forward/back and 1PC transactions
- if (!i->second.deq_flag && i->second.tpc_flag)
- xids.insert(i->first);
+ std::vector<std::string> xidList;
+ tplStorePtr->get_txn_map().xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
+ qpid::linearstore::journal::txn_data_list tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
+ uint16_t enqCnt = 0UL;
+ uint16_t deqCnt = 0UL;
+ uint16_t tpcCnt = 0UL;
+ uint16_t abortCnt = 0UL;
+ for (qpid::linearstore::journal::tdl_itr j=tdl.begin(); j!=tdl.end(); ++j) {
+ if (j->enq_flag_) {
+ ++enqCnt;
+ } else {
+ ++deqCnt;
+ }
+ if (!j->commit_flag_) {
+ ++abortCnt;
+ }
+ if (j->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0) {
+ if (tpcCnt != tdl.size()) THROW_STORE_EXCEPTION("Inconsistent TPL 2PC count");
+ if (enqCnt - deqCnt > 0) {
+ xids.insert(*i);
+ }
+ }
}
}
@@ -1186,7 +1163,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (jc) {
// TODO: check if this result should be used...
- /*mrg::journal::iores res =*/ jc->flush();
+ /*mrg::journal::iores res =*/ jc->flush(false);
}
} catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
@@ -1258,7 +1235,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_,
if (txn_->getXid().empty()) {
jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent());
} else {
- jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent());
+ jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), txn_->isTPC(), !message_->isPersistent());
}
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
@@ -1309,9 +1286,10 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg_->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
+ TxnCtxt* txn = 0;
std::string tid;
if (ctxt_) {
- TxnCtxt* txn = check(ctxt_);
+ txn = check(ctxt_);
tid = txn->getXid();
}
// Manually increase the ref count, as raw pointers are used beyond this point
@@ -1319,9 +1297,9 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (tid.empty()) {
- jc->dequeue_data_record(ddtokp.get());
+ jc->dequeue_data_record(ddtokp.get(), false);
} else {
- jc->dequeue_txn_data_record(ddtokp.get(), tid);
+ jc->dequeue_txn_data_record(ddtokp.get(), tid, txn?txn->isTPC():false, false);
}
} catch (const qpid::linearstore::journal::jexception& e) {
ddtokp->release();
@@ -1341,7 +1319,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_,
DataTokenImpl* dtokp = txn_.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_);
+ tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), txn_.isTPC(), commit_);
}
txn_.complete(commit_);
if (mgmtObject.get() != 0) {
@@ -1376,12 +1354,16 @@ void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt_)
{
checkInit();
TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_);
+//std::string xid=txn->getXid(); std::cout << "*** MessageStoreImpl::prepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
if(!txn) throw qpid::broker::InvalidTransactionContextException();
localPrepare(txn);
}
void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_)
{
+//std::string xid=ctxt_->getXid(); std::cout << "*** MessageStoreImpl::localPrepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
try {
chkTplStoreInit(); // Late initialize (if needed)
@@ -1394,7 +1376,7 @@ void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_)
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
char tpcFlag = static_cast<char>(ctxt_->isTPC());
- tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false);
+ tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), tpcFlag != 0, false);
ctxt_->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt_->sync();
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index e995237d99..95667be82e 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -89,19 +89,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for Transaction Recover List (TPL) recover state
- struct TplRecoverStruct {
- uint64_t rid; // rid of TPL record
- bool deq_flag;
- bool commit_flag;
- bool tpc_flag;
- TplRecoverStruct(const uint64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
- };
- typedef TplRecoverStruct TplRecover;
- typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
- typedef std::map<std::string, TplRecover> TplRecoverMap;
- typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
-
typedef std::map<std::string, JournalImpl*> JournalListMap;
typedef JournalListMap::iterator JournalListMapItr;
@@ -127,7 +114,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// Pointer to Transaction Prepared List (TPL) journal instance
boost::shared_ptr<TplJournalImpl> tplStorePtr;
- TplRecoverMap tplRecoverMap;
qpid::sys::Mutex tplInitLock;
JournalListMap journalList;
qpid::sys::Mutex journalListLock;
@@ -202,7 +188,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
queue_index& index,
txn_list& locked,
message_index& prepared);
- void readTplStore();
void recoverTplStore();
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
index 50ef58aafa..743d12989a 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
@@ -114,7 +114,7 @@ void TxnCtxt::sync() {
void TxnCtxt::jrnl_flush(JournalImpl* jc) {
if (jc && !(jc->is_txn_synced(getXid())))
- jc->flush();
+ jc->flush(false);
}
void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index f962976daf..5483f3bb94 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -51,6 +51,10 @@ void LinearFileController::initialize(const std::string& journalDirectory,
}
void LinearFileController::finalize() {
+ if (currentJournalFilePtr_) {
+ currentJournalFilePtr_->close();
+ currentJournalFilePtr_ = NULL;
+ }
while (!journalFileList_.empty()) {
delete journalFileList_.front();
journalFileList_.pop_front();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 66ac7c3a2d..16ca1e0994 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -49,6 +49,18 @@ namespace qpid {
namespace linearstore {
namespace journal {
+RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn) :
+ recordId_(rid),
+ fileId_(fid),
+ fileOffset_(foffs),
+ pendingTransaction_(ptxn)
+{}
+
+
+bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
+ return a.recordId_ < b.recordId_;
+}
+
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
enq_map& enqueueMapRef,
@@ -86,6 +98,9 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
if (!journalEmptyFlag_) {
// Read all records, establish remaining enqueued records
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+ }
while (getNextRecordHeader()) {}
if (inFileStream_.is_open()) {
inFileStream_.close();
@@ -120,11 +135,7 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
}
}
}
-
- // Set up recordIdList_ from enqueue map
- enqueueMapRef_.rid_list(recordIdList_);
-
- recordIdListConstItr_ = recordIdList_.begin();
+ prepareRecordList();
}
}
@@ -151,37 +162,44 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
bool& transient,
bool& external,
data_tok* const dtokp,
- bool /*ignore_pending_txns*/) {
- if (recordIdListConstItr_ == recordIdList_.end()) {
- return false;
- }
- enq_map::emap_data_struct_t eds;
- enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
- if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
- getFile(eds._pfid, false);
- }
-//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
+ bool ignore_pending_txns) {
+ bool foundRecord = false;
+ do {
+ if (recordIdListConstItr_ == recordIdList_.end()) {
+ return false;
+ }
+ if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction
+ ++recordIdListConstItr_; // ignore, go to next record
+ } else {
+ foundRecord = true;
+ }
+ } while (!foundRecord);
- inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+ if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+ if (!getFile(recordIdListConstItr_->fileId_, false)) {
+ std::ostringstream oss;
+ oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
+ throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ }
+ inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg);
if (!inFileStream_.good()) {
std::ostringstream oss;
- oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+ oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName();
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
+
::enq_hdr_t enqueueHeader;
inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
std::ostringstream oss;
- oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+ oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_;
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
// check flags
transient = ::is_enq_transient(&enqueueHeader);
external = ::is_enq_external(&enqueueHeader);
-//char magicBuff[5]; // DEBUG
-//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
-//magicBuff[4] = 0; // DEBUG
-//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
+
// read xid
xidSize = enqueueHeader._xidsize;
*xidPtrPtr = ::malloc(xidSize);
@@ -386,6 +404,12 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) {
}
void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+ if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
+ std::ostringstream oss;
+ oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
+ oss << " (dblk alignment offset = 0x" << (recordPosition % QLS_DBLK_SIZE_BYTES);
+ throw jexception(jerrno::JERR_RCVM_NOTDBLKALIGNED, oss.str(), "RecoveryManager", "checkJournalAlignment");
+ }
std::streampos currentPosn = recordPosition;
unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
if (sblkOffset)
@@ -574,7 +598,7 @@ bool RecoveryManager::getNextRecordHeader()
throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, er.xid_size());
- transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true));
+ transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false /*tpcFlag*/));
if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
@@ -725,6 +749,35 @@ bool RecoveryManager::needNextFile() {
return true;
}
+void RecoveryManager::prepareRecordList() {
+ // Set up recordIdList_ from enqueue map and transaction map
+ recordIdList_.clear();
+
+ // Extract records from enqueue list
+ std::vector<uint64_t> ridList;
+ enqueueMapRef_.rid_list(ridList);
+ qpid::linearstore::journal::enq_map::emap_data_struct_t eds;
+ for (std::vector<uint64_t>::const_iterator i=ridList.begin(); i!=ridList.end(); ++i) {
+ enqueueMapRef_.get_data(*i, eds);
+ recordIdList_.push_back(RecoveredRecordData_t(*i, eds._pfid, eds._file_posn, false));
+ }
+
+ // Extract records from pending transaction enqueues
+ std::vector<std::string> xidList;
+ transactionMapRef_.xid_list(xidList);
+ for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) {
+ qpid::linearstore::journal::txn_data_list tdsl = transactionMapRef_.get_tdata_list(*j);
+ for (qpid::linearstore::journal::tdl_itr k=tdsl.begin(); k!=tdsl.end(); ++k) {
+ if (k->enq_flag_) {
+ recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
+ }
+ }
+ }
+
+ std::sort(recordIdList_.begin(), recordIdList_.end(), recordIdListCompare);
+ recordIdListConstItr_ = recordIdList_.begin();
+}
+
void RecoveryManager::readJournalData(char* target,
const std::streamsize readSize) {
std::streamoff bytesRead = 0;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
index a32fdc1853..997596938b 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
@@ -22,7 +22,6 @@
#ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
#define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
-#include <deque>
#include <fstream>
#include <map>
#include "qpid/linearstore/journal/LinearFileController.h"
@@ -44,6 +43,16 @@ class JournalLog;
class jrec;
class txn_map;
+struct RecoveredRecordData_t {
+ uint64_t recordId_;
+ uint64_t fileId_;
+ std::streampos fileOffset_;
+ bool pendingTransaction_;
+ RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn);
+};
+
+bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b);
+
class RecoveryManager
{
protected:
@@ -53,7 +62,7 @@ protected:
typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
typedef fileNumberMap_t::iterator fileNumberMapItr_t;
typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
- typedef std::vector<uint64_t> recordIdList_t;
+ typedef std::vector<RecoveredRecordData_t> recordIdList_t;
typedef recordIdList_t::const_iterator recordIdListConstItr_t;
// Location and identity
@@ -123,6 +132,7 @@ protected:
bool getNextFile(bool jumpToFirstRecordOffsetFlag);
bool getNextRecordHeader();
bool needNextFile();
+ void prepareRecordList();
bool readFileHeader();
void readJournalData(char* target, const std::streamsize size);
void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
index cb03978696..55bac4a2e5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
@@ -155,7 +155,7 @@ jcntl::enqueue_data_record(const void* const data_buff,
check_wstatus("enqueue_data_record");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
+ while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, false, transient, false), r,
dtokp)) ;
}
return r;
@@ -170,7 +170,7 @@ jcntl::enqueue_extern_data_record(const std::size_t tot_data_len,
check_wstatus("enqueue_extern_data_record");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient, true), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, false, transient, true), r, dtokp)) ;
}
return r;
}
@@ -181,6 +181,7 @@ jcntl::enqueue_txn_data_record(const void* const data_buff,
const std::size_t this_data_len,
data_tok* dtokp,
const std::string& xid,
+ const bool tpc_flag,
const bool transient)
{
iores r;
@@ -188,7 +189,7 @@ jcntl::enqueue_txn_data_record(const void* const data_buff,
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false), r, dtokp)) ;
+ tpc_flag, transient, false), r, dtokp)) ;
}
return r;
}
@@ -197,14 +198,15 @@ iores
jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
const std::string& xid,
+ const bool tpc_flag,
const bool transient)
{
iores r;
check_wstatus("enqueue_extern_txn_data_record");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true), r,
- dtokp)) ;
+ while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), tpc_flag, transient,
+ true), r, dtokp)) ;
}
return r;
}
@@ -234,7 +236,7 @@ jcntl::dequeue_data_record(data_tok* const dtokp,
check_wstatus("dequeue_data");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, false, txn_coml_commit), r, dtokp)) ;
}
return r;
}
@@ -242,13 +244,14 @@ jcntl::dequeue_data_record(data_tok* const dtokp,
iores
jcntl::dequeue_txn_data_record(data_tok* const dtokp,
const std::string& xid,
+ const bool tpc_flag,
const bool txn_coml_commit)
{
iores r;
check_wstatus("dequeue_data");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), tpc_flag, txn_coml_commit), r, dtokp)) ;
}
return r;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
index c12c8afbc9..2db0e707a7 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
@@ -270,11 +270,11 @@ public:
const std::size_t tot_data_len,
const std::size_t this_data_len,
data_tok* dtokp,
- const bool transient = false);
+ const bool transient);
iores enqueue_extern_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
- const bool transient = false);
+ const bool transient);
/**
* \brief Enqueue data.
@@ -294,12 +294,14 @@ public:
const std::size_t this_data_len,
data_tok* dtokp,
const std::string& xid,
- const bool transient = false);
+ const bool tpc_flag,
+ const bool transient);
iores enqueue_extern_txn_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
const std::string& xid,
- const bool transient = false);
+ const bool tpc_flag,
+ const bool transient);
/**
* \brief Reads data from the journal. It is the responsibility of the reader to free
@@ -350,7 +352,7 @@ public:
bool& transient,
bool& external,
data_tok* const dtokp,
- bool ignore_pending_txns = false);
+ bool ignore_pending_txns);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -370,7 +372,7 @@ public:
* \exception TODO
*/
iores dequeue_data_record(data_tok* const dtokp,
- const bool txn_coml_commit = false);
+ const bool txn_coml_commit);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -393,7 +395,8 @@ public:
*/
iores dequeue_txn_data_record(data_tok* const dtokp,
const std::string& xid,
- const bool txn_coml_commit = false);
+ const bool tpc_flag,
+ const bool txn_coml_commit);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -458,12 +461,12 @@ public:
* \param block_till_aio_cmpl If true, will block the thread while waiting for all
* outstanding AIO operations to complete.
*/
- void stop(const bool block_till_aio_cmpl = false);
+ void stop(const bool block_till_aio_cmpl);
/**
* \brief Force a flush of the write page cache, creating a single AIO write operation.
*/
- iores flush(const bool block_till_aio_cmpl = false);
+ iores flush(const bool block_till_aio_cmpl);
inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe?
@@ -480,7 +483,7 @@ public:
* false if the rid is transactionally enqueued and is not committed, or if it is
* locked (i.e. transactionally dequeued, but the dequeue has not been committed).
*/
- inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); }
+ inline bool is_enqueued(const uint64_t rid, bool ignore_lock) { return _emap.is_enqueued(rid, ignore_lock); }
inline bool is_locked(const uint64_t rid) {
if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK)
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
index 51e90c6bb8..01c432d37b 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
@@ -92,6 +92,8 @@ const uint32_t jerrno::JERR_RCVM_STREAMBAD = 0x0901; ///< Read/write
const uint32_t jerrno::JERR_RCVM_READ = 0x0902; ///< Read error: no or insufficient data to read
const uint32_t jerrno::JERR_RCVM_WRITE = 0x0903; ///< Write error
const uint32_t jerrno::JERR_RCVM_NULLXID = 0x0904; ///< Null XID when XID length non-null in header
+const uint32_t jerrno::JERR_RCVM_NOTDBLKALIGNED = 0x0905; ///< Offset is not data block (dblk)-aligned
+
// class data_tok
const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
@@ -182,6 +184,7 @@ jerrno::__init()
_err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read";
_err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error";
_err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header";
+ _err_map[JERR_RCVM_NOTDBLKALIGNED] = "JERR_RCVM_NOTDBLKALIGNED: Offset is not data block (dblk)-aligned";
// class data_tok
_err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.";
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
index 5af0a7ada0..62f18c1878 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
@@ -110,6 +110,7 @@ namespace journal {
static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read
static const uint32_t JERR_RCVM_WRITE; ///< Write error
static const uint32_t JERR_RCVM_NULLXID; ///< Null XID when XID length non-null in header
+ static const uint32_t JERR_RCVM_NOTDBLKALIGNED; ///< Offset is not data block (dblk)-aligned
// class data_tok
static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
index 5e0a28814d..304ab0f6ee 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
@@ -39,12 +39,14 @@ txn_data_t::txn_data_t(const uint64_t rid,
const uint16_t pfid,
const uint64_t foffs,
const bool enq_flag,
+ const bool tpc_flag,
const bool commit_flag):
rid_(rid),
drid_(drid),
pfid_(pfid),
foffs_(foffs),
enq_flag_(enq_flag),
+ tpc_flag_(tpc_flag),
commit_flag_(commit_flag),
aio_compl_(false)
{}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
index 0f8cd5f3d7..e2df6c7d0e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
@@ -42,13 +42,15 @@ namespace journal {
uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit
uint64_t foffs_; ///< Offset in file for this record
bool enq_flag_; ///< If true, enq op, otherwise deq op
- bool commit_flag_; ///< (2PC transactions) Records 2PC complete c/a mode
+ bool tpc_flag_; ///< 2PC transaction if true
+ bool commit_flag_; ///< TPL only: (2PC transactions) Records 2PC complete c/a mode
bool aio_compl_; ///< Initially false, set to true when record AIO returns
txn_data_t(const uint64_t rid,
const uint64_t drid,
const uint16_t pfid,
const uint64_t foffs,
const bool enq_flag,
+ const bool tpc_flag,
const bool commit_flag = false);
} txn_data_t;
typedef std::vector<txn_data_t> txn_data_list;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
index b5d2e51ec0..fa5f83cd24 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
@@ -220,11 +220,9 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- if (::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, 0)) { // TODO: add checksum
- throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail
- }
assert(!ifsp->fail() && !ifsp->bad());
assert(_txn_hdr._xidsize > 0);
+
Checksum checksum;
checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index 9b94c7959d..403ad68922 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -108,6 +108,7 @@ wmgr::enqueue(const void* const data_buff,
data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool transient,
const bool external)
{
@@ -196,7 +197,7 @@ wmgr::enqueue(const void* const data_buff,
if (xid_len) // If part of transaction, add to transaction map
{
std::string xid((const char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true));
+ _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true, tpc_flag));
}
else
{
@@ -228,6 +229,7 @@ iores
wmgr::dequeue(data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool txn_coml_commit)
{
if (xid_len)
@@ -312,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp,
// If the enqueue is part of a pending txn, it will not yet be in emap
_emap.lock(dequeue_rid); // ignore rid not found error
std::string xid((const char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false));
+ _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false, tpc_flag));
}
else
{
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
index 0aa21ca545..8837e51e97 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
@@ -97,11 +97,13 @@ public:
data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool transient,
const bool external);
iores dequeue(data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool txn_coml_commit);
iores abort(data_tok* dtokp,
const void* const xid_ptr,