summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-11-04 22:15:14 +0000
committerKim van der Riet <kpvdr@apache.org>2013-11-04 22:15:14 +0000
commitf9808a83e93e1972c615c13edc344b675986420a (patch)
treefa00b1e26e8895aef2b7e4491f0ab9de074d0c81
parent5c1efa08b20d4bca2e8e5b81c7f1e78cdcc87c8a (diff)
downloadqpid-python-f9808a83e93e1972c615c13edc344b675986420a.tar.gz
QPID-4984: WIP. Basic enqueue/dequeue/txns work, still no EFP recycling.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1538790 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES36
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.cpp167
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.h9
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalLogImpl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp82
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp90
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp86
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp27
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h22
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp146
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h37
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp154
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h43
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp224
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h15
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp124
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp28
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h109
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp58
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h33
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c4
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp47
28 files changed, 653 insertions, 924 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
new file mode 100644
index 0000000000..0d15744694
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -0,0 +1,36 @@
+LinearStore issues:
+
+Store:
+------
+
+1. Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record start,
+ no way of discriminating old from new at boundary (used to use OWI).
+
+2. Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve #1 first.
+
+3. Checksum not implemented in record tail, not checked during read.
+
+4. Rework qpid management parameters and controls.
+
+Tests
+-----
+
+* No existing tests for linearstore:
+** Basic broker-level tests for txn and non-txn recovery
+** Store-level tests which check write boundary conditions
+** Unit tests
+** Basic performance tests
+
+Tools
+-----
+
+* Store analysis and status
+* Recovery/reading of message content
+
+Code tidy-up
+------------
+
+* Remove old comments
+* Use c++ cast templates instead of (xxx)y
+* Member names: xxx_
+* Add docs to classes
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
index 412a4922ca..4f904665ee 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
@@ -63,14 +63,8 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
timer(timer_),
_journalLogRef(journalLogRef),
getEventsTimerSetFlag(false),
-// lastReadRid(0),
writeActivityFlag(false),
flushTriggeredFlag(true),
-// _xidp(0),
-// _datap(0),
-// _dlen(0),
-// _dtok(),
-// _external(false),
deleteCallback(onDelete)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
@@ -97,7 +91,6 @@ JournalImpl::~JournalImpl()
}
getEventsFireEventsPtr->cancel();
inactivityFireEventPtr->cancel();
-// free_read_buffers();
if (_mgmtObject.get() != 0) {
_mgmtObject->resourceDestroy();
@@ -149,7 +142,7 @@ JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_,
// oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
// oss << " wcache_num_pages=" << wcache_num_pages;
// QLS_LOG2(debug, _jid, oss.str());
- jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
+ jcntl::initialize(efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
// QLS_LOG2(debug, _jid, "Initialization complete");
// TODO: replace for linearstore: _lpmgr
/*
@@ -183,7 +176,6 @@ JournalImpl::recover(/*const uint16_t num_jfiles,
uint64_t queue_id)
{
std::ostringstream oss1;
-// oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
oss1 << "Recover;";
oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
@@ -210,11 +202,9 @@ JournalImpl::recover(/*const uint16_t num_jfiles,
prep_xid_list.push_back(i->xid);
}
- jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
- cbp, &prep_xid_list, highest_rid);
+ jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid);
} else {
- jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
- cbp, 0, highest_rid);
+ jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid);
}
// Populate PreparedTransaction lists from _tmap
@@ -223,10 +213,10 @@ JournalImpl::recover(/*const uint16_t num_jfiles,
for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
- if (tdl_itr->_enq_flag) { // enqueue op
- i->enqueues->add(queue_id, tdl_itr->_rid);
+ if (tdl_itr->enq_flag_) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->rid_);
} else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_drid);
+ i->dequeues->add(queue_id, tdl_itr->drid_);
}
}
}
@@ -260,102 +250,6 @@ JournalImpl::recover_complete()
*/
}
-//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
-//#define AIO_SLEEP_TIME_US 10 // 0.01 ms
-// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
-// Throw exception for all errors.
-/*
-bool
-JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset)
-{
- qpid::sys::Mutex::ScopedLock sl(_read_lock);
- if (_dtok.rid() != rid)
- {
- // Free any previous msg
- free_read_buffers();
-
- // Last read encountered out-of-order rids, check if this rid is in that list
- bool oooFlag = false;
- for (std::vector<uint64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) {
- if (*i == rid) {
- oooFlag = true;
- }
- }
-
- // TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering
- // jumpover points and allow the read to jump back to the first known jumpover point - but this needs
- // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
- // combination of lid/offset.
- // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing.
- if (oooFlag || rid < lastReadRid) {
- _rmgr.invalidate();
- oooRidList.clear();
- }
- _dlen = 0;
- _dtok.reset();
- _dtok.set_wstate(DataTokenImpl::ENQ);
- _dtok.set_rid(0);
- _external = false;
- size_t xlen = 0;
- bool transient = false;
- bool done = false;
- bool rid_found = false;
- while (!done) {
- iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
- switch (res) {
- case qpid::qls_jrnl::RHM_IORES_SUCCESS:
- if (_dtok.rid() != rid) {
- // Check if this is an out-of-order rid that may impact next read
- if (_dtok.rid() > rid)
- oooRidList.push_back(_dtok.rid());
- free_read_buffers();
- // Reset data token for next read
- _dlen = 0;
- _dtok.reset();
- _dtok.set_wstate(DataTokenImpl::ENQ);
- _dtok.set_rid(0);
- } else {
- rid_found = _dtok.rid() == rid;
- lastReadRid = rid;
- done = true;
- }
- break;
- case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT:
- if (get_wr_events(&_aio_cmpl_timeout) == qpid::qls_jrnl::jerrno::AIO_TIMEOUT) {
- std::stringstream ss;
- ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res);
- ss << "; timed out waiting for page to be processed.";
- throw jexception(qpid::qls_jrnl::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
- "loadMsgContent");
- }
- break;
- default:
- std::stringstream ss;
- ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res);
- throw jexception(qpid::qls_jrnl::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl",
- "loadMsgContent");
- }
- }
- if (!rid_found) {
- std::stringstream ss;
- ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec;
- ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec;
- ss << " (" << _dtok.rid() << ")";
- throw jexception(qpid::qls_jrnl::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
- }
- }
-
- if (_external) return false;
-
- uint32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(uint32_t)).getLong() + sizeof(uint32_t);
- if (hdr_offs + offset + length > _dlen) {
- data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
- } else {
- data.append((const char*)_datap + hdr_offs + offset, length);
- }
- return true;
-}
-*/
void
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
@@ -498,29 +392,6 @@ JournalImpl::flush(const bool block_till_aio_cmpl)
return res;
}
-/*
-void
-JournalImpl::log(qpid::qls_jrnl::log_level ll, const std::string& log_stmt) const
-{
- log(ll, log_stmt.c_str());
-}
-
-void
-JournalImpl::log(qpid::qls_jrnl::log_level ll, const char* const log_stmt) const
-{
- switch (ll)
- {
- case LOG_TRACE: QPID_LOG(trace, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_DEBUG: QPID_LOG(debug, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_INFO: QPID_LOG(info, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_NOTICE: QPID_LOG(notice, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_WARN: QPID_LOG(warning, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_ERROR: QPID_LOG(error, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_CRITICAL: QPID_LOG(critical, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
- }
-}
-*/
-
void
JournalImpl::getEventsFire()
{
@@ -581,21 +452,6 @@ void
JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/)
{}
-/*
-void
-JournalImpl::free_read_buffers()
-{
- if (_xidp) {
- ::free(_xidp);
- _xidp = 0;
- _datap = 0;
- } else if (_datap) {
- ::free(_datap);
- _datap = 0;
- }
-}
-*/
-
void
JournalImpl::createStore() {
@@ -609,17 +465,6 @@ JournalImpl::handleIoResult(const iores r)
{
case qpid::qls_jrnl::RHM_IORES_SUCCESS:
return;
-/*
- case qpid::qls_jrnl::RHM_IORES_FULL:
- {
- std::ostringstream oss;
- oss << "Journal full on queue \"" << _jid << "\".";
- QLS_LOG2(critical, _jid, "Journal full.");
- if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
- THROW_STORE_FULL_EXCEPTION(oss.str());
- }
-*/
default:
{
std::ostringstream oss;
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
index fd2433803c..ba62b1d64a 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
@@ -140,11 +140,6 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
void recover_complete();
- // Temporary fn to read and save last msg read from journal so it can be assigned
- // in chunks. To be replaced when coding to do this direct from the journal is ready.
- // Returns true if the record is extern, false if local.
-// bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0);
-
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp,
@@ -191,7 +186,6 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
protected:
-// void free_read_buffers();
void createStore();
inline void setGetEventTimer()
@@ -227,15 +221,12 @@ class TplJournalImpl : public JournalImpl
virtual ~TplJournalImpl() {}
-/*
// Special version of read_data_record that ignores transactions - needed when reading the TPL
inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize,
void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
qpid::qls_jrnl::data_tok* const dtokp) {
return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
}
- inline void read_reset() { _rmgr.invalidate(); }
-*/
}; // class TplJournalImpl
} // namespace msgstore
diff --git a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
index a43af23ac6..43c2d7dc9c 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
@@ -26,7 +26,7 @@
#include "qpid/log/Statement.h"
#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
-#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg)
+#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \"" << queue << "\":" << msg)
namespace qpid {
namespace linearstore {
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index cbc395d61a..100c8925de 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -291,7 +291,10 @@ void MessageStoreImpl::init()
}
} while (!isInit);
- efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog));
+ efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(),
+ defaultEfpPartitionNumber,
+ defaultEfpFileSize_kib,
+ jrnlLog));
efpMgr->findEfpPartitions();
}
@@ -331,6 +334,9 @@ void MessageStoreImpl::truncateInit()
dbenv->close(0);
isInit = false;
}
+
+ // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
+
qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir());
qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir());
qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir());
@@ -730,6 +736,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
// Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
// from recovery of a store that has had its size changed externally by the resize utility.
// If so, update the queue store settings so that QMF queries will reflect the new values.
+ // TODO: Update this for new settings, as qpid.file_count and qpid.file_size no longer apply
/*
const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
qpid::framing::FieldTable::ValuePtr value;
@@ -866,8 +873,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
size_t preambleLength = sizeof(uint32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- DataTokenImpl dtok;
- size_t readSize = 0;
unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
@@ -876,19 +881,20 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
//bool read = jc->get_enq_cnt() > 0;
bool read = true;
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
+ void* dbuff = NULL;
+ size_t dbuffSize = 0;
+ void* xidbuff = NULL;
+ size_t xidbuffSize = 0;
bool transientFlag = false;
bool externalFlag = false;
-
- dtok.set_wstate(DataTokenImpl::ENQ);
+ DataTokenImpl dtok;
+ dtok.set_wstate(DataTokenImpl::NONE);
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
- readSize = dtok.dsize();
switch (res)
{
@@ -909,11 +915,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
// At some future point if delivery attempts are stored, then this call would
// become optional depending on that information.
msg->setRedelivered();
- // Reset the TTL for the recovered message
- msg->computeExpiration(broker->getExpiryPolicy());
+ // Reset the TTL for the recovered message
+ msg->computeExpiration(broker->getExpiryPolicy());
uint32_t contentOffset = headerSize + preambleLength;
- uint64_t contentSize = readSize - contentOffset;
+ uint64_t contentSize = dbuffSize - contentOffset;
if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
@@ -947,8 +953,8 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
bool enq = false;
bool deq = false;
for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->_enq_flag && j->_rid == rid) enq = true;
- else if (!j->_enq_flag && j->_drid == rid) deq = true;
+ if (j->enq_flag_ && j->rid_ == rid) enq = true;
+ else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
}
if (enq && !deq && citr->second.commit_flag) {
rcnt++;
@@ -962,7 +968,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
}
dtok.reset();
- dtok.set_wstate(DataTokenImpl::ENQ);
+ dtok.set_wstate(DataTokenImpl::NONE);
if (xidbuff)
::free(xidbuff);
@@ -1065,11 +1071,11 @@ void MessageStoreImpl::readTplStore()
bool commitFlag = true;
for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->_enq_flag) {
- rid = j->_rid;
+ if (j->enq_flag_) {
+ rid = j->rid_;
enqCnt++;
} else {
- commitFlag = j->_commit_flag;
+ commitFlag = j->commit_flag_;
deqCnt++;
}
}
@@ -1104,10 +1110,9 @@ void MessageStoreImpl::readTplStore()
void MessageStoreImpl::recoverTplStore()
{
QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()");
-/*
- if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
+ if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) {
uint64_t thisHighestRid = 0ULL;
- tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
@@ -1118,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore()
tplStorePtr->recover_complete(); // start journal.
}
-*/
}
void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
@@ -1137,12 +1141,10 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
}
}
-void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
+void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()");
-/*
if (tplStorePtr->is_ready()) {
- tplStorePtr->read_reset();
readTplStore();
} else {
recoverTplStore();
@@ -1152,7 +1154,6 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
if (!i->second.deq_flag && i->second.tpc_flag)
xids.insert(i->first);
}
-*/
}
void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
@@ -1178,31 +1179,6 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue
uint32_t /*length*/)
{
throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
-/*
- checkInit();
- uint64_t messageId (msg->getPersistenceId());
-
- if (messageId != 0) {
- try {
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc && jc->is_enqueued(messageId) ) {
- if (!jc->loadMsgContent(messageId, data, length, offset)) {
- std::ostringstream oss;
- oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern";
- THROW_STORE_EXCEPTION(oss.str());
- }
- } else {
- std::ostringstream oss;
- oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued";
- THROW_STORE_EXCEPTION(oss.str());
- }
- } catch (const qpid::qls_jrnl::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
- }
- } else {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
- }
-*/
}
void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
@@ -1358,12 +1334,6 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
}
}
-uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/)
-{
-/* checkInit();*/
- return 0;
-}
-
void MessageStoreImpl::completed(TxnCtxt& txn_,
bool commit_)
{
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index d1ce7a2b8a..a136f0ba80 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -331,7 +331,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
void flush(const qpid::broker::PersistableQueue& queue);
- uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+ inline uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) { return 0; }; // TODO: Deprecate this call
void collectPreparedXids(std::set<std::string>& xids);
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
index 2f9d406b7c..82d38e716e 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
@@ -23,6 +23,7 @@
#define QPID_LINEARSTORE_ATOMICCOUNTER_H_
#include "qpid/linearstore/jrnl/slock.h"
+#include <string>
namespace qpid {
namespace qls_jrnl {
@@ -31,11 +32,12 @@ template <class T>
class AtomicCounter
{
private:
+ std::string id_;
T count_;
mutable smutex countMutex;
public:
- AtomicCounter(const T& initValue = T(0)) : count_(initValue) {}
+ AtomicCounter(const std::string& id, const T& initValue) : id_(id), count_(initValue) {}
virtual ~AtomicCounter() {}
@@ -44,6 +46,11 @@ public:
return count_;
}
+ void set(const T v) {
+ slock l(countMutex);
+ count_ = v;
+ }
+
T increment() {
slock l(countMutex);
return ++count_;
@@ -57,7 +64,7 @@ public:
T addLimit(const T& a, const T& limit, const uint32_t jerr) {
slock l(countMutex);
- if (count_ + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
+ if (count_ + a > limit) throw jexception(jerr, id_, "AtomicCounter", "addLimit");
count_ += a;
return count_;
}
@@ -70,7 +77,7 @@ public:
T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
slock l(countMutex);
if (count_ < limit + 1) {
- throw jexception(jerr, "AtomicCounter", "decrementLimit");
+ throw jexception(jerr, id_, "AtomicCounter", "decrementLimit");
}
return --count_;
}
@@ -83,7 +90,7 @@ public:
T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
slock l(countMutex);
- if (count_ < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
+ if (count_ < limit + s) throw jexception(jerr, id_, "AtomicCounter", "subtractLimit");
count_ -= s;
return count_;
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
index 613da45633..015c2cbafe 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
@@ -35,6 +35,8 @@
#include <uuid/uuid.h>
#include <vector>
+//#include <iostream> // DEBUG
+
namespace qpid {
namespace qls_jrnl {
@@ -42,7 +44,7 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
const EmptyFilePoolPartition* partitionPtr,
JournalLog& journalLogRef) :
efpDirectory_(efpDirectory),
- efpDataSize_kib_(fileSizeKbFromDirName(efpDirectory, partitionPtr->getPartitionNumber())),
+ efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, partitionPtr->getPartitionNumber())),
partitionPtr_(partitionPtr),
journalLogRef_(journalLogRef)
{}
@@ -134,6 +136,39 @@ void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) {
pushEmptyFile(emptyFileName);
}
+//static
+std::string EmptyFilePool::dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib) {
+ std::ostringstream oss;
+ oss << efpDataSize_kib << "k";
+ return oss.str();
+}
+
+
+// static
+efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirName,
+ const efpPartitionNumber_t partitionNumber) {
+ // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
+ std::string n(dirName.substr(dirName.rfind('/')+1));
+ bool valid = true;
+ for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
+ if (charNum < n.length()-1) {
+ if (!::isdigit((int)n[charNum])) {
+ valid = false;
+ break;
+ }
+ } else {
+ valid = n[charNum] == 'k';
+ }
+ }
+ efpDataSize_kib_t s = ::atol(n.c_str());
+ if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) {
+ std::ostringstream oss;
+ oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'";
+ throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
+ }
+ return s;
+}
+
// --- protected functions ---
void EmptyFilePool::createEmptyFile() {
@@ -148,7 +183,7 @@ void EmptyFilePool::createEmptyFile() {
ofs.put('\0');
ofs.close();
pushEmptyFile(efpfn);
-//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl;
+//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
} else {
//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
}
@@ -196,7 +231,8 @@ void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
std::streampos bytesRead = fs.tellg();
if (std::streamoff(bytesRead) == buffsize) {
::file_hdr_reset((::file_hdr_t*)buff);
- ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
+ // set rest of buffer to 0
+ ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t));
fs.seekp(0, std::fstream::beg);
fs.write(buff, buffsize);
std::streampos bytesWritten = fs.tellp();
@@ -224,7 +260,8 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
// Size matches pool
efpDataSize_kib_t expectedSize = (QLS_SBLK_SIZE_KIB + efpDataSize_kib_) * 1024;
if ((efpDataSize_kib_t)s.st_size != expectedSize) {
- oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size;
+ oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize
+ << "; actual=" << s.st_size;
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
return false;
}
@@ -241,18 +278,19 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
fs.read((char*)buff, buffsize);
std::streampos bytesRead = fs.tellg();
if (std::streamoff(bytesRead) != buffsize) {
- oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read " << buffsize << " bytes; read " << bytesRead << " bytes";
+ oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read "
+ << buffsize << " bytes; read " << bytesRead << " bytes";
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
fs.close();
return false;
}
// Check file header
- ::file_hdr_t* header(reinterpret_cast< ::file_hdr_t* >(buff));
- const bool jrnlMagicError = header->_rhdr._magic != QLS_FILE_MAGIC;
- const bool jrnlVersionError = header->_rhdr._version != QLS_JRNL_VERSION;
- const bool jrnlPartitionError = header->_efp_partition != partitionPtr_->getPartitionNumber();
- const bool jrnlFileSizeError = header->_file_size_kib != efpDataSize_kib_;
+ ::file_hdr_t* fhp = (::file_hdr_t*)buff;
+ const bool jrnlMagicError = fhp->_rhdr._magic != QLS_FILE_MAGIC;
+ const bool jrnlVersionError = fhp->_rhdr._version != QLS_JRNL_VERSION;
+ const bool jrnlPartitionError = fhp->_efp_partition != partitionPtr_->getPartitionNumber();
+ const bool jrnlFileSizeError = fhp->_data_size_kib != efpDataSize_kib_;
if (jrnlMagicError || jrnlVersionError || jrnlPartitionError || jrnlFileSizeError)
{
oss << "ERROR: File " << emptyFileName << ": Invalid file header - mismatched header fields: " <<
@@ -266,14 +304,15 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
}
// Check file header is reset
- if (!::is_file_hdr_reset(header)) {
- ::file_hdr_reset(header);
+ if (!::is_file_hdr_reset(fhp)) {
+ ::file_hdr_reset(fhp);
::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
fs.seekp(0, std::fstream::beg);
fs.write(buff, buffsize);
std::streampos bytesWritten = fs.tellp();
if (std::streamoff(bytesWritten) != buffsize) {
- oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes";
+ oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write "
+ << buffsize << " bytes; wrote " << bytesWritten << " bytes";
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
fs.close();
return false;
@@ -288,31 +327,6 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
}
// static
-efpDataSize_kib_t EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName,
- const efpPartitionNumber_t partitionNumber) {
- // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
- std::string n(dirName.substr(dirName.rfind('/')+1));
- bool valid = true;
- for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
- if (charNum < n.length()-1) {
- if (!::isdigit((int)n[charNum])) {
- valid = false;
- break;
- }
- } else {
- valid = n[charNum] == 'k';
- }
- }
- efpDataSize_kib_t s = ::atol(n.c_str());
- if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) {
- std::ostringstream oss;
- oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'";
- throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
- }
- return s;
-}
-
-// static
int EmptyFilePool::moveEmptyFile(const std::string& from,
const std::string& to) {
if (::rename(from.c_str(), to.c_str())) {
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
index e4eeeaa042..cb6887a095 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
@@ -76,6 +76,10 @@ public:
std::string takeEmptyFile(const std::string& destDirectory);
void returnEmptyFile(const std::string& srcFile);
+ static std::string dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib);
+ static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& dirName,
+ const efpPartitionNumber_t partitionNumber);
+
protected:
void createEmptyFile();
std::string getEfpFileName();
@@ -84,8 +88,6 @@ protected:
void resetEmptyFileHeader(const std::string& fqFileName);
bool validateEmptyFile(const std::string& emptyFileName) const;
- static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName,
- const efpPartitionNumber_t partitionNumber);
static int moveEmptyFile(const std::string& fromFqPath,
const std::string& toFqPath);
};
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
index a9b3b12b81..3c2d7d69a2 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
@@ -24,15 +24,22 @@
#include <dirent.h>
#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
#include "qpid/linearstore/jrnl/slock.h"
#include <vector>
+//#include <iostream> // DEBUG
+
namespace qpid {
namespace qls_jrnl {
EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath,
+ const efpPartitionNumber_t defaultPartitionNumber,
+ const efpDataSize_kib_t defaultEfpDataSize_kib,
JournalLog& journalLogRef) :
qlsStorePath_(qlsStorePath),
+ defaultPartitionNumber_(defaultPartitionNumber),
+ defaultEfpDataSize_kib_(defaultEfpDataSize_kib),
journalLogRef_(journalLogRef)
{}
@@ -45,53 +52,70 @@ EmptyFilePoolManager::~EmptyFilePoolManager() {
}
void EmptyFilePoolManager::findEfpPartitions() {
- //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG
+//std::cout << "*** Reading " << qlsStorePath_ << std::endl; // DEBUG
+ bool foundPartition = false;
std::vector<std::string> dirList;
- jdir::read_dir(qlsStorePath_, dirList, true, false, true, false);
- for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
- if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN
- efpPartitionNumber_t pn = ::atoi(i->c_str() + 1);
- std::string fullDirPath(qlsStorePath_ + "/" + (*i));
- EmptyFilePoolPartition* efppp = 0;
- try {
- efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_);
- {
- slock l(partitionMapMutex_);
- partitionMap_[pn] = efppp;
- }
- } catch (const std::exception& e) {
- if (efppp != 0) {
- delete efppp;
- efppp = 0;
+ while (!foundPartition) {
+ jdir::read_dir(qlsStorePath_, dirList, true, false, true, false);
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ efpPartitionNumber_t pn = EmptyFilePoolPartition::getPartitionNumber(*i);
+ if (pn > 0) { // valid partition name found
+ std::string fullDirPath(qlsStorePath_ + "/" + (*i));
+ EmptyFilePoolPartition* efppp = 0;
+ try {
+ efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_);
+ {
+ slock l(partitionMapMutex_);
+ partitionMap_[pn] = efppp;
+ }
+ } catch (const std::exception& e) {
+ if (efppp != 0) {
+ delete efppp;
+ efppp = 0;
+ }
+//std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; // DEBUG
}
- //std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl;
+ if (efppp != 0)
+ efppp->findEmptyFilePools();
+ foundPartition = true;
}
- if (efppp != 0)
- efppp->findEmptyFilePools();
+ }
+
+ // If no partition was found, create an empty default partition with a warning.
+ if (!foundPartition) {
+ journalLogRef_.log(JournalLog::LOG_WARN, "No EFP partition found, creating an empty partition.");
+ std::ostringstream oss;
+ oss << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+ << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
+ jdir::create_dir(oss.str());
}
}
- // TODO: Log results
-/*
- QLS_LOG(info, "EFP Manager initialization complete");
+
+ journalLogRef_.log(JournalLog::LOG_NOTICE, "EFP Manager initialization complete");
std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList;
std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList;
getEfpPartitions(partitionList);
if (partitionList.size() == 0) {
- QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.")
+ journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible.");
} else {
- QLS_LOG(info, "> EFP Partitions found: " << partitionList.size());
+ std::stringstream oss;
+ oss << "> EFP Partitions found: " << partitionList.size();
+ journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
filePoolList.clear();
(*i)->getEmptyFilePools(filePoolList);
- QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
- (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
+ std::stringstream oss;
+ oss << " * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size()
+ << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'";
+ journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
- QLS_LOG(info, " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
- " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB");
+ std::ostringstream oss;
+ oss << " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
+ " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB";
+ journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
}
}
}
-*/
}
void EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
@@ -155,7 +179,7 @@ void EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>
}
EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) {
- return getEmptyFilePool(efpIdentity.first, efpIdentity.second);
+ return getEmptyFilePool(efpIdentity.pn_, efpIdentity.ds_);
}
EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
index eab3783d90..d5fab82800 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
@@ -36,13 +36,17 @@ protected:
typedef partitionMap_t::iterator partitionMapItr_t;
typedef partitionMap_t::const_iterator partitionMapConstItr_t;
- std::string qlsStorePath_;
+ const std::string qlsStorePath_;
+ const efpPartitionNumber_t defaultPartitionNumber_;
+ const efpDataSize_kib_t defaultEfpDataSize_kib_;
JournalLog& journalLogRef_;
partitionMap_t partitionMap_;
smutex partitionMapMutex_;
public:
EmptyFilePoolManager(const std::string& qlsStorePath_,
+ const efpPartitionNumber_t defaultPartitionNumber,
+ const efpDataSize_kib_t defaultEfpDataSize_kib,
JournalLog& journalLogRef_);
virtual ~EmptyFilePoolManager();
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
index 4d8b697b1d..d38db60975 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
@@ -22,11 +22,14 @@
#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
#include <dirent.h>
+#include <iomanip>
#include "qpid/linearstore/jrnl/jdir.h"
#include "qpid/linearstore/jrnl/jerrno.h"
#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/linearstore/jrnl/slock.h"
+//#include <iostream> // DEBUG
+
namespace qpid {
namespace qls_jrnl {
@@ -53,7 +56,7 @@ EmptyFilePoolPartition::~EmptyFilePoolPartition() {
void
EmptyFilePoolPartition::findEmptyFilePools() {
- //std::cout << "Reading " << partitionDir << std::endl; // DEBUG
+//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
std::vector<std::string> dirList;
jdir::read_dir(partitionDir_, dirList, true, false, false, false);
bool foundEfpDir = false;
@@ -65,7 +68,7 @@ EmptyFilePoolPartition::findEmptyFilePools() {
}
if (foundEfpDir) {
std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
- //std::cout << "Reading " << efpDir << std::endl; // DEBUG
+//std::cout << "Reading " << efpDir << std::endl; // DEBUG
dirList.clear();
jdir::read_dir(efpDir, dirList, true, false, false, true);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
@@ -117,6 +120,26 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber() const {
return partitionNum_;
}
+// static
+std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNumber_t partitionNumber) {
+ std::ostringstream oss;
+ oss << "p" << std::setfill('0') << std::setw(3) << partitionNumber;
+ return oss.str();
+}
+
+//static
+efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) {
+ if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) {
+ long pn = ::strtol(name.c_str() + 1, 0, 0);
+ if (pn == 0 && errno) {
+ return 0;
+ } else {
+ return (efpPartitionNumber_t)pn;
+ }
+ }
+ return 0;
+}
+
// --- protected functions ---
void EmptyFilePoolPartition::validatePartitionDir() {
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
index 5b8ba7b4fa..a9ca36d07b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
@@ -68,6 +68,9 @@ public:
std::string getPartitionDirectory() const;
efpPartitionNumber_t getPartitionNumber() const;
+ static std::string getPartionDirectoryName(const efpPartitionNumber_t partitionNumber);
+ static efpPartitionNumber_t getPartitionNumber(const std::string& name);
+
protected:
void validatePartitionDir();
};
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
index 4c470e4e40..2f9693fd95 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
@@ -22,19 +22,27 @@
#ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
#define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
+#include <iostream>
#include <stdint.h>
#include <utility> // std::pair
namespace qpid {
namespace qls_jrnl {
- typedef uint64_t efpDataSize_kib_t; ///< Size of data part of file (excluding file header) in kib
- typedef uint64_t efpFileSize_kib_t; ///< Size of file (header + data) in kib
- typedef uint32_t efpDataSize_sblks_t; ///< Size of data part of file (excluding file header) in sblks
- typedef uint32_t efpFileSize_sblks_t; ///< Size of file (header + data) in sblks
- typedef uint32_t efpFileCount_t; ///< Number of files in a partition or pool
- typedef uint16_t efpPartitionNumber_t; ///< Number assigned to a partition
- typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t; ///< Unique identity of a pool, consisting of the partition number and data size
+typedef uint64_t efpDataSize_kib_t; ///< Size of data part of file (excluding file header) in kib
+typedef uint64_t efpFileSize_kib_t; ///< Size of file (header + data) in kib
+typedef uint32_t efpDataSize_sblks_t; ///< Size of data part of file (excluding file header) in sblks
+typedef uint32_t efpFileSize_sblks_t; ///< Size of file (header + data) in sblks
+typedef uint32_t efpFileCount_t; ///< Number of files in a partition or pool
+typedef uint16_t efpPartitionNumber_t; ///< Number assigned to a partition
+
+typedef struct efpIdentity_t {
+ efpPartitionNumber_t pn_;
+ efpDataSize_kib_t ds_;
+ efpIdentity_t() : pn_(0), ds_(0) {}
+ efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {}
+ friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; }
+} efpIdentity_t;
}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
index e02e965823..3de7b6c1e2 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
@@ -28,10 +28,28 @@
#include "qpid/linearstore/jrnl/utils/file_hdr.h"
#include <unistd.h>
+//#include <iostream> // DEBUG
+
namespace qpid {
namespace qls_jrnl {
JournalFile::JournalFile(const std::string& fqFileName,
+ const ::file_hdr_t& fileHeader) :
+ fqFileName_(fqFileName),
+ fileSeqNum_(fileHeader._file_number),
+ fileHandle_(-1),
+ fileCloseFlag_(false),
+ fileHeaderBasePtr_ (0),
+ fileHeaderPtr_(0),
+ aioControlBlockPtr_(0),
+ fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+ enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
+ submittedDblkCount_("JournalFile::submittedDblkCount", 0),
+ completedDblkCount_("JournalFile::completedDblkCount", 0),
+ outstandingAioOpsCount_("JournalFile::outstandingAioOpsCount", 0)
+{}
+
+JournalFile::JournalFile(const std::string& fqFileName,
const uint64_t fileSeqNum,
const efpDataSize_kib_t efpDataSize_kib) :
fqFileName_(fqFileName),
@@ -42,10 +60,10 @@ JournalFile::JournalFile(const std::string& fqFileName,
fileHeaderPtr_(0),
aioControlBlockPtr_(0),
fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
- enqueuedRecordCount_(0),
- submittedDblkCount_(0),
- completedDblkCount_(0),
- outstandingAioOpsCount_(0)
+ enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
+ submittedDblkCount_("JournalFile::submittedDblkCount", 0),
+ completedDblkCount_("JournalFile::completedDblkCount", 0),
+ outstandingAioOpsCount_("JournalFile::outstandingAioOpsCount", 0)
{}
JournalFile::~JournalFile() {
@@ -82,14 +100,6 @@ JournalFile::finalize() {
}
}
-const std::string JournalFile::getDirectory() const {
- return fqFileName_.substr(0, fqFileName_.rfind('/'));
-}
-
-const std::string JournalFile::getFileName() const {
- return fqFileName_.substr(fqFileName_.rfind('/')+1);
-}
-
const std::string JournalFile::getFqFileName() const {
return fqFileName_;
}
@@ -98,10 +108,6 @@ uint64_t JournalFile::getFileSeqNum() const {
return fileSeqNum_;
}
-bool JournalFile::isOpen() const {
- return fileHandle_ >= 0;
-}
-
int JournalFile::open() {
fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
if (fileHandle_ < 0) {
@@ -182,30 +188,10 @@ uint32_t JournalFile::incrEnqueuedRecordCount() {
return enqueuedRecordCount_.increment();
}
-uint32_t JournalFile::addEnqueuedRecordCount(const uint32_t a) {
- return enqueuedRecordCount_.add(a);
-}
-
uint32_t JournalFile::decrEnqueuedRecordCount() {
return enqueuedRecordCount_.decrementLimit();
}
-uint32_t JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
- return enqueuedRecordCount_.subtractLimit(s);
-}
-
-uint32_t JournalFile::getSubmittedDblkCount() const {
- return submittedDblkCount_.get();
-}
-
-uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) {
- return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL);
-}
-
-uint32_t JournalFile::getCompletedDblkCount() const {
- return completedDblkCount_.get();
-}
-
uint32_t JournalFile::addCompletedDblkCount(const uint32_t a) {
return completedDblkCount_.addLimit(a, submittedDblkCount_.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
}
@@ -214,10 +200,6 @@ uint16_t JournalFile::getOutstandingAioOperationCount() const {
return outstandingAioOpsCount_.get();
}
-uint16_t JournalFile::incrOutstandingAioOperationCount() {
- return outstandingAioOpsCount_.increment();
-}
-
uint16_t JournalFile::decrOutstandingAioOperationCount() {
uint16_t r = outstandingAioOpsCount_.decrementLimit();
if (fileCloseFlag_ && outstandingAioOpsCount_ == 0) { // Delayed close
@@ -232,33 +214,10 @@ bool JournalFile::isEmpty() const {
return submittedDblkCount_ == 0;
}
-bool JournalFile::isDataEmpty() const {
- return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS;
-}
-
-u_int32_t JournalFile::dblksRemaining() const {
- return fileSize_dblks_ - submittedDblkCount_;
-}
-
-bool JournalFile::isFull() const {
- return submittedDblkCount_ == fileSize_dblks_;
-}
-
-bool JournalFile::isFullAndComplete() const {
- return completedDblkCount_ == fileSize_dblks_;
-}
-
-u_int32_t JournalFile::getOutstandingAioDblks() const {
- return submittedDblkCount_ - completedDblkCount_;
-}
-
-bool JournalFile::getNextFile() const {
- return isFull();
-}
-
bool JournalFile::isNoEnqueuedRecordsRemaining() const {
- return !isDataEmpty() && // Must be written to, not empty
- enqueuedRecordCount_ == 0; // No remaining enqueued records
+ return /*!enqueueStarted_ &&*/ // Not part-way through encoding an enqueue
+ isFullAndComplete() && // Full with all AIO returned
+ enqueuedRecordCount_ == 0; // No remaining enqueued records
}
// debug aid
@@ -284,4 +243,59 @@ const std::string JournalFile::status_str(const uint8_t indentDepth) const {
return oss.str();
}
+// --- protected functions ---
+
+const std::string JournalFile::getDirectory() const {
+ return fqFileName_.substr(0, fqFileName_.rfind('/'));
+}
+
+const std::string JournalFile::getFileName() const {
+ return fqFileName_.substr(fqFileName_.rfind('/')+1);
+}
+
+bool JournalFile::isOpen() const {
+ return fileHandle_ >= 0;
+}
+
+uint32_t JournalFile::getSubmittedDblkCount() const {
+ return submittedDblkCount_.get();
+}
+
+uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) {
+ return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL);
+}
+
+uint32_t JournalFile::getCompletedDblkCount() const {
+ return completedDblkCount_.get();
+}
+
+uint16_t JournalFile::incrOutstandingAioOperationCount() {
+ return outstandingAioOpsCount_.increment();
+}
+
+u_int32_t JournalFile::dblksRemaining() const {
+ return fileSize_dblks_ - submittedDblkCount_;
+}
+
+bool JournalFile::getNextFile() const {
+ return isFull();
+}
+
+u_int32_t JournalFile::getOutstandingAioDblks() const {
+ return submittedDblkCount_ - completedDblkCount_;
+}
+
+bool JournalFile::isDataEmpty() const {
+ return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS;
+}
+
+bool JournalFile::isFull() const {
+ return submittedDblkCount_ == fileSize_dblks_;
+}
+
+bool JournalFile::isFullAndComplete() const {
+ return completedDblkCount_ == fileSize_dblks_;
+}
+
+
}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
index 262d3ce147..0abbc7bcca 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
@@ -52,6 +52,8 @@ protected:
public:
JournalFile(const std::string& fqFileName,
+ const ::file_hdr_t& fileHeader);
+ JournalFile(const std::string& fqFileName,
const uint64_t fileSeqNum,
const efpDataSize_kib_t efpDataSize_kib);
virtual ~JournalFile();
@@ -59,12 +61,9 @@ public:
void initialize(const uint32_t completedDblkCount);
void finalize();
- const std::string getDirectory() const;
- const std::string getFileName() const;
const std::string getFqFileName() const;
uint64_t getFileSeqNum() const;
- bool isOpen() const;
int open();
void close();
void asyncFileHeaderWrite(io_context_t ioContextPtr,
@@ -81,32 +80,38 @@ public:
uint32_t getEnqueuedRecordCount() const;
uint32_t incrEnqueuedRecordCount();
- uint32_t addEnqueuedRecordCount(const uint32_t a);
uint32_t decrEnqueuedRecordCount();
- uint32_t subtrEnqueuedRecordCount(const uint32_t s);
- uint32_t getSubmittedDblkCount() const;
- uint32_t addSubmittedDblkCount(const uint32_t a);
-
- uint32_t getCompletedDblkCount() const;
uint32_t addCompletedDblkCount(const uint32_t a);
uint16_t getOutstandingAioOperationCount() const;
- uint16_t incrOutstandingAioOperationCount();
uint16_t decrOutstandingAioOperationCount();
// Status helper functions
bool isEmpty() const; ///< True if no writes of any kind have occurred
- bool isDataEmpty() const; ///< True if only file header written, data is still empty
- u_int32_t dblksRemaining() const; ///< Dblks remaining until full
- bool isFull() const; ///< True if all possible dblks have been submitted (but may not yet have returned from AIO)
- bool isFullAndComplete() const; ///< True if all submitted dblks have returned from AIO
- u_int32_t getOutstandingAioDblks() const; ///< Dblks still to be written
- bool getNextFile() const; ///< True when next file is needed
bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued
// debug aid
const std::string status_str(const uint8_t indentDepth) const;
+
+protected:
+ const std::string getDirectory() const;
+ const std::string getFileName() const;
+ bool isOpen() const;
+
+ uint32_t getSubmittedDblkCount() const;
+ uint32_t addSubmittedDblkCount(const uint32_t a);
+
+ uint32_t getCompletedDblkCount() const;
+
+ uint16_t incrOutstandingAioOperationCount();
+
+ u_int32_t dblksRemaining() const; ///< Dblks remaining until full
+ bool getNextFile() const; ///< True when next file is needed
+ u_int32_t getOutstandingAioDblks() const; ///< Dblks still to be written
+ bool isDataEmpty() const; ///< True if only file header written, data is still empty
+ bool isFull() const; ///< True if all possible dblks have been submitted (but may not yet have returned from AIO)
+ bool isFullAndComplete() const; ///< True if all submitted dblks have returned from AIO
};
}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
index 925e6597a9..2c1327d645 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
@@ -29,6 +29,8 @@
#include "qpid/linearstore/jrnl/slock.h"
#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+//#include <iostream> // DEBUG
+
namespace qpid {
namespace qls_jrnl {
@@ -36,8 +38,8 @@ LinearFileController::LinearFileController(jcntl& jcntlRef) :
jcntlRef_(jcntlRef),
emptyFilePoolPtr_(0),
currentJournalFilePtr_(0),
- fileSeqCounter_(0),
- recordIdCounter_(0)
+ fileSeqCounter_("LinearFileController::fileSeqCounter", 0),
+ recordIdCounter_("LinearFileController::recordIdCounter", 0)
{}
LinearFileController::~LinearFileController() {}
@@ -47,7 +49,7 @@ void LinearFileController::initialize(const std::string& journalDirectory,
uint64_t initialFileNumberVal) {
journalDirectory_.assign(journalDirectory);
emptyFilePoolPtr_ = emptyFilePoolPtr;
- fileSeqCounter_ = initialFileNumberVal;
+ fileSeqCounter_.set(initialFileNumberVal);
}
void LinearFileController::finalize() {
@@ -57,14 +59,13 @@ void LinearFileController::finalize() {
}
}
-void LinearFileController::addJournalFile(const std::string& fileName,
- const uint64_t fileNumber,
- const uint32_t fileSize_kib,
+void LinearFileController::addJournalFile(JournalFile* journalFilePtr,
const uint32_t completedDblkCount) {
- if (currentJournalFilePtr_)
+ if (currentJournalFilePtr_) {
currentJournalFilePtr_->close();
- currentJournalFilePtr_ = new JournalFile(fileName, fileNumber, fileSize_kib);
- currentJournalFilePtr_->initialize(completedDblkCount);
+ }
+ journalFilePtr->initialize(completedDblkCount);
+ currentJournalFilePtr_ = journalFilePtr;
{
slock l(journalFileListMutex_);
journalFileList_.push_back(currentJournalFilePtr_);
@@ -72,18 +73,10 @@ void LinearFileController::addJournalFile(const std::string& fileName,
currentJournalFilePtr_->open();
}
-efpDataSize_kib_t LinearFileController::dataSize_kib() const {
- return emptyFilePoolPtr_->dataSize_kib();
-}
-
efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
return emptyFilePoolPtr_->dataSize_sblks();
}
-efpFileSize_kib_t LinearFileController::fileSize_kib() const {
- return emptyFilePoolPtr_->fileSize_kib();
-}
-
efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
return emptyFilePoolPtr_->fileSize_sblks();
}
@@ -100,28 +93,24 @@ void LinearFileController::pullEmptyFileFromEfp() {
addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0);
}
-void LinearFileController::purgeFilesToEfp() {
+void LinearFileController::purgeEmptyFilesToEfp() {
slock l(journalFileListMutex_);
- while (journalFileList_.front()->isNoEnqueuedRecordsRemaining()) {
- emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
- delete journalFileList_.front();
- journalFileList_.pop_front();
- }
+ purgeEmptyFilesToEfpNoLock();
}
uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
- slock l(journalFileListMutex_);
return find(fileSeqNumber)->getEnqueuedRecordCount();
}
uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
- assertCurrentJournalFileValid("incrEnqueuedRecordCount");
return find(fileSeqNumber)->incrEnqueuedRecordCount();
}
uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
slock l(journalFileListMutex_);
- return find(fileSeqNumber)->decrEnqueuedRecordCount();
+ uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
+// purgeEmptyFilesToEfpNoLock();
+ return r;
}
uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
@@ -160,101 +149,11 @@ uint64_t LinearFileController::getCurrentFileSeqNum() const {
return currentJournalFilePtr_->getFileSeqNum();
}
-uint32_t LinearFileController::getEnqueuedRecordCount() const {
- assertCurrentJournalFileValid("getEnqueuedRecordCount");
- return currentJournalFilePtr_->getEnqueuedRecordCount();
-}
-
-uint32_t LinearFileController::incrEnqueuedRecordCount() {
- assertCurrentJournalFileValid("incrEnqueuedRecordCount");
- return currentJournalFilePtr_->incrEnqueuedRecordCount();
-}
-
-uint32_t LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
- assertCurrentJournalFileValid("addEnqueuedRecordCount");
- return currentJournalFilePtr_->addEnqueuedRecordCount(a);
-}
-
-uint32_t LinearFileController::decrEnqueuedRecordCount() {
- assertCurrentJournalFileValid("decrEnqueuedRecordCount");
- return currentJournalFilePtr_->decrEnqueuedRecordCount();
-}
-
-uint32_t LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
- assertCurrentJournalFileValid("subtrEnqueuedRecordCount");
- return currentJournalFilePtr_->subtrEnqueuedRecordCount(s);
-}
-
-uint32_t LinearFileController::getWriteSubmittedDblkCount() const {
- assertCurrentJournalFileValid("getWriteSubmittedDblkCount");
- return currentJournalFilePtr_->getSubmittedDblkCount();
-}
-
-uint32_t LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
- assertCurrentJournalFileValid("addWriteSubmittedDblkCount");
- return currentJournalFilePtr_->addSubmittedDblkCount(a);
-}
-
-uint32_t LinearFileController::getWriteCompletedDblkCount() const {
- assertCurrentJournalFileValid("getWriteCompletedDblkCount");
- return currentJournalFilePtr_->getCompletedDblkCount();
-}
-
-uint32_t LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
- assertCurrentJournalFileValid("addWriteCompletedDblkCount");
- return currentJournalFilePtr_->addCompletedDblkCount(a);
-}
-
-uint16_t LinearFileController::getOutstandingAioOperationCount() const {
- assertCurrentJournalFileValid("getOutstandingAioOperationCount");
- return currentJournalFilePtr_->getOutstandingAioOperationCount();
-}
-
-uint16_t LinearFileController::incrOutstandingAioOperationCount() {
- assertCurrentJournalFileValid("incrOutstandingAioOperationCount");
- return currentJournalFilePtr_->incrOutstandingAioOperationCount();
-}
-
-uint16_t LinearFileController::decrOutstandingAioOperationCount() {
- assertCurrentJournalFileValid("decrOutstandingAioOperationCount");
- return currentJournalFilePtr_->decrOutstandingAioOperationCount();
-}
-
bool LinearFileController::isEmpty() const {
assertCurrentJournalFileValid("isEmpty");
return currentJournalFilePtr_->isEmpty();
}
-bool LinearFileController::isDataEmpty() const {
- assertCurrentJournalFileValid("isDataEmpty");
- return currentJournalFilePtr_->isDataEmpty();
-}
-
-u_int32_t LinearFileController::dblksRemaining() const {
- assertCurrentJournalFileValid("dblksRemaining");
- return currentJournalFilePtr_->dblksRemaining();
-}
-
-bool LinearFileController::isFull() const {
- assertCurrentJournalFileValid("isFull");
- return currentJournalFilePtr_->isFull();
-}
-
-bool LinearFileController::isFullAndComplete() const {
- assertCurrentJournalFileValid("isFullAndComplete");
- return currentJournalFilePtr_->isFullAndComplete();
-}
-
-u_int32_t LinearFileController::getOutstandingAioDblks() const {
- assertCurrentJournalFileValid("getOutstandingAioDblks");
- return currentJournalFilePtr_->getOutstandingAioDblks();
-}
-
-bool LinearFileController::needNextFile() const {
- assertCurrentJournalFileValid("getNextFile");
- return currentJournalFilePtr_->getNextFile();
-}
-
const std::string LinearFileController::status(const uint8_t indentDepth) const {
std::string indent((size_t)indentDepth, '.');
std::ostringstream oss;
@@ -273,8 +172,12 @@ const std::string LinearFileController::status(const uint8_t indentDepth) const
// --- protected functions ---
-bool LinearFileController::checkCurrentJournalFileValid() const {
- return currentJournalFilePtr_ != 0;
+void LinearFileController::addJournalFile(const std::string& fileName,
+ const uint64_t fileNumber,
+ const uint32_t fileSize_kib,
+ const uint32_t completedDblkCount) {
+ JournalFile* jfp = new JournalFile(fileName, fileNumber, fileSize_kib);
+ addJournalFile(jfp, completedDblkCount);
}
void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
@@ -283,6 +186,10 @@ void LinearFileController::assertCurrentJournalFileValid(const char* const funct
}
}
+bool LinearFileController::checkCurrentJournalFileValid() const {
+ return currentJournalFilePtr_ != 0;
+}
+
// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
@@ -301,4 +208,15 @@ uint64_t LinearFileController::getNextFileSeqNum() {
return fileSeqCounter_.increment();
}
+void LinearFileController::purgeEmptyFilesToEfpNoLock() {
+//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG
+ while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() &&
+ journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
+//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG
+ emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ delete journalFileList_.front();
+ journalFileList_.pop_front();
+ }
+}
+
}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
index 7721685196..83d360dad8 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
@@ -62,18 +62,14 @@ public:
uint64_t initialFileNumberVal);
void finalize();
- void addJournalFile(const std::string& fileName,
- const uint64_t fileNumber,
- const uint32_t fileSize_kib,
+ void addJournalFile(JournalFile* journalFilePtr,
const uint32_t completedDblkCount);
- efpDataSize_kib_t dataSize_kib() const;
efpDataSize_sblks_t dataSize_sblks() const;
- efpFileSize_kib_t fileSize_kib() const;
efpFileSize_sblks_t fileSize_sblks() const;
uint64_t getNextRecordId();
void pullEmptyFileFromEfp();
- void purgeFilesToEfp();
+ void purgeEmptyFilesToEfp();
// Functions for manipulating counts of non-current JournalFile instances in journalFileList_
uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
@@ -83,7 +79,7 @@ public:
const uint32_t a);
uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
- // Pass-through functions for JournalFile class
+ // Pass-through functions for current JournalFile class
void asyncFileHeaderWrite(io_context_t ioContextPtr,
const uint16_t userFlags,
const uint64_t recordId,
@@ -94,42 +90,25 @@ public:
uint32_t dataSize_dblks);
uint64_t getCurrentFileSeqNum() const;
-
- uint32_t getEnqueuedRecordCount() const;
- uint32_t incrEnqueuedRecordCount();
- uint32_t addEnqueuedRecordCount(const uint32_t a);
- uint32_t decrEnqueuedRecordCount();
- uint32_t subtrEnqueuedRecordCount(const uint32_t s);
-
- uint32_t getWriteSubmittedDblkCount() const;
- uint32_t addWriteSubmittedDblkCount(const uint32_t a);
-
- uint32_t getWriteCompletedDblkCount() const;
- uint32_t addWriteCompletedDblkCount(const uint32_t a);
-
- uint16_t getOutstandingAioOperationCount() const;
- uint16_t incrOutstandingAioOperationCount();
- uint16_t decrOutstandingAioOperationCount();
-
- bool isEmpty() const; // True if no writes of any kind have occurred
- bool isDataEmpty() const; // True if only file header written, data is still empty
- u_int32_t dblksRemaining() const; // Dblks remaining until full
- bool isFull() const; // True if all possible dblks have been submitted (but may not yet have returned from AIO)
- bool isFullAndComplete() const; // True if all submitted dblks have returned from AIO
- u_int32_t getOutstandingAioDblks() const; // Dblks still to be written
- bool needNextFile() const; // True when next file is needed
+ bool isEmpty() const;
// Debug aid
const std::string status(const uint8_t indentDepth) const;
protected:
+ void addJournalFile(const std::string& fileName,
+ const uint64_t fileNumber,
+ const uint32_t fileSize_kib,
+ const uint32_t completedDblkCount);
void assertCurrentJournalFileValid(const char* const functionName) const;
bool checkCurrentJournalFileValid() const;
JournalFile* find(const efpFileCount_t fileSeqNumber);
uint64_t getNextFileSeqNum();
+ void purgeEmptyFilesToEfpNoLock();
};
-typedef void (LinearFileController::*lfcAddJournalFileFn)(const std::string&, const uint64_t, const uint32_t, const uint32_t);
+typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr,
+ const uint32_t completedDblkCount);
}} // namespace qpid::qls_jrnl
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
index 3dc11ec27c..a1cf30bae9 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
@@ -31,6 +31,7 @@
#include "qpid/linearstore/jrnl/enq_rec.h"
#include "qpid/linearstore/jrnl/jcfg.h"
#include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
#include "qpid/linearstore/jrnl/JournalLog.h"
#include "qpid/linearstore/jrnl/jrec.h"
#include "qpid/linearstore/jrnl/LinearFileController.h"
@@ -62,7 +63,7 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory,
highestRecordId_(0ULL),
highestFileNumber_(0ULL),
lastFileFullFlag_(false),
- fileSize_kib_(0)
+ efpFileSize_kib_(0)
{}
RecoveryManager::~RecoveryManager() {}
@@ -74,18 +75,19 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
efpIdentity_t efpIdentity;
analyzeJournalFileHeaders(efpIdentity);
*emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
- fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+ efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+
// Check for file full condition
lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
- // Restore all read and write pointers and transactions
if (!journalEmptyFlag_) {
- while (getNextRecordHeader()) {
- }
+ // Read all records, establish remaining enqueued records
+ while (getNextRecordHeader()) {}
if (inFileStream_.is_open()) {
inFileStream_.close();
}
+
// Remove leading files which have no enqueued records
removeEmptyFiles(*emptyFilePoolPtrPtr);
@@ -100,14 +102,14 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
// Unlock any affected enqueues in emap
for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
- if (i->_enq_flag) { // enq op - decrement enqueue count
- enqueueCountList_[i->_pfid]--;
- } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record
- int16_t ret = enqueueMapRef_.unlock(i->_drid);
+ if (i->enq_flag_) { // enq op - decrement enqueue count
+ fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
+ } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
+ int16_t ret = enqueueMapRef_.unlock(i->drid_);
if (ret < enq_map::EMAP_OK) { // fail
// enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
std::ostringstream oss;
- oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
+ oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals");
}
}
@@ -115,7 +117,10 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
}
}
}
+
+ // Set up recordIdList_ from enqueue map
enqueueMapRef_.rid_list(recordIdList_);
+
recordIdListConstItr_ = recordIdList_.begin();
}
}
@@ -144,19 +149,13 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
bool& external,
data_tok* const dtokp,
bool /*ignore_pending_txns*/) {
- if (!dtokp->is_readable()) {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id();
- oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str();
- throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord");
- }
if (recordIdListConstItr_ == recordIdList_.end()) {
return false;
}
enq_map::emap_data_struct_t eds;
enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
uint64_t fileNumber = eds._pfid;
- currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber);
+ currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
getNextFile(false);
inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
@@ -195,18 +194,26 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
readJournalData((char*)*dataPtrPtr, dataSize);
+
+ // Set data token
+ dtokp->set_wstate(data_tok::ENQ);
+ dtokp->set_rid(enqueueHeader._rhdr._rid);
+ dtokp->set_dsize(dataSize);
+ if (xidSize) {
+ dtokp->set_xid(*xidPtrPtr, xidSize);
+ }
+
+ ++recordIdListConstItr_;
return true;
}
void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr) {
-//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG
- for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) {
+ for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file?
- endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset
- fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
- (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount);
-//std::cout << " ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG
+ endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset
+ efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
+ (lfcPtr->*fnPtr)(i->second, fileDblkCount);
}
}
@@ -216,14 +223,19 @@ std::string RecoveryManager::toString(const std::string& jid,
if (compact) {
oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
oss << " jfl=[";
- for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
- if (i!=fileNumberNameMap_.begin()) oss << " ";
- oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
+ for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) {
+ if (i!=fileNumberMap_.begin()) {
+ oss << " ";
+ }
+ std::string fqFileName = i->second->getFqFileName();
+ oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1);
}
oss << "] ecl=[ ";
- for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
- if (j != enqueueCountList_.begin()) oss << " ";
- oss << *j;
+ for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) {
+ if (j!=fileNumberMap_.begin()) {
+ oss << " ";
+ }
+ oss << j->second->getEnqueuedRecordCount();
}
oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
@@ -233,15 +245,18 @@ std::string RecoveryManager::toString(const std::string& jid,
oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
} else {
oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files = " << fileNumberNameMap_.size() << std::endl;
+ oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
oss << " Journal File List:" << std::endl;
- for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
- oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
+ for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
+ std::string fqFileName = k->second->getFqFileName();
+ oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
}
oss << " Enqueue Counts: [ " << std::endl;
- for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
- if (j != enqueueCountList_.begin()) oss << ", ";
- oss << *j;
+ for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
+ if (l != fileNumberMap_.begin()) {
+ oss << ", ";
+ }
+ oss << l->second->getEnqueuedRecordCount();
}
oss << " ]" << std::endl;
oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
@@ -271,21 +286,26 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
} else {
- fileNumberNameMap_[fileHeader._file_number] = *i;
+ JournalFile* jfp = new JournalFile(*i, fileHeader);
+ fileNumberMap_[fileHeader._file_number] = jfp;
if (fileHeader._file_number > highestFileNumber_) {
highestFileNumber_ = fileHeader._file_number;
}
}
}
- efpIdentity.first = fileHeader._efp_partition;
- efpIdentity.second = fileHeader._file_size_kib;
- enqueueCountList_.resize(fileNumberNameMap_.size(), 0);
- currentJournalFileConstItr_ = fileNumberNameMap_.begin();
+ efpIdentity.pn_ = fileHeader._efp_partition;
+ efpIdentity.ds_ = fileHeader._data_size_kib;
+ currentJournalFileConstItr_ = fileNumberMap_.begin();
}
void RecoveryManager::checkFileStreamOk(bool checkEof) {
if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) {
- throw jexception("read failure"); // TODO complete exception
+ std::ostringstream oss;
+ oss << "Stream status: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+ if (checkEof) {
+ oss << " eof=" << (inFileStream_.eof()?"T":"F");
+ }
+ throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "checkFileStreamOk");
}
}
@@ -339,7 +359,6 @@ bool RecoveryManager::decodeRecord(jrec& record,
::rec_hdr_t& headerRecord,
std::streampos& fileOffset)
{
-// uint16_t start_fid = getCurrentFileNumber();
std::streampos start_file_offs = fileOffset;
if (highestRecordId_ == 0) {
@@ -354,14 +373,7 @@ bool RecoveryManager::decodeRecord(jrec& record,
done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
}
catch (const jexception& e) {
-// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
-// Original
-// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
-// fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
-// Tried this, but did not work
-// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
checkJournalAlignment(start_file_offs);
-// rd._lfid = start_fid;
return false;
}
if (!done && !getNextFile(false)) {
@@ -373,7 +385,7 @@ bool RecoveryManager::decodeRecord(jrec& record,
}
std::string RecoveryManager::getCurrentFileName() const {
- return currentJournalFileConstItr_->second;
+ return currentJournalFileConstItr_->second->getFqFileName();
}
uint64_t RecoveryManager::getCurrentFileNumber() const {
@@ -382,19 +394,21 @@ uint64_t RecoveryManager::getCurrentFileNumber() const {
bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
if (inFileStream_.is_open()) {
- if (inFileStream_.eof() || !inFileStream_.good())
- {
+ if (inFileStream_.eof() || !inFileStream_.good()) {
inFileStream_.clear();
endOffset_ = inFileStream_.tellg(); // remember file offset before closing
- if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception
+ if (endOffset_ == -1) {
+ std::ostringstream oss;
+ oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+ throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile");
+ }
inFileStream_.close();
- if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) {
+ if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
return false;
}
}
}
- if (!inFileStream_.is_open())
- {
+ if (!inFileStream_.is_open()) {
inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
if (!inFileStream_.good()) {
@@ -402,7 +416,6 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
}
// Read file header
-//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG
file_hdr_t fhdr;
inFileStream_.read((char*)&fhdr, sizeof(fhdr));
checkFileStreamOk(true);
@@ -412,7 +425,7 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
inFileStream_.seekg(foffs);
} else {
inFileStream_.close();
- if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) {
+ if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
journalEmptyFlag_ = true;
}
return false;
@@ -436,7 +449,11 @@ bool RecoveryManager::getNextRecordHeader()
}
}
file_pos = inFileStream_.tellg();
-//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG
+ if (file_pos == -1) {
+ std::ostringstream oss;
+ oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+ throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader");
+ }
inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
hdr_ok = true;
@@ -450,19 +467,21 @@ bool RecoveryManager::getNextRecordHeader()
switch(h._magic) {
case QLS_ENQ_MAGIC:
{
-//std::cout << ".e" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
enq_rec er;
uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(er, cum_size_read, h, file_pos)) {
return false;
}
if (!er.is_transient()) { // Ignore transient msgs
- enqueueCountList_[start_fid]++;
+ fileNumberMap_[start_fid]->incrEnqueuedRecordCount();
if (er.xid_size()) {
er.get_xid(&xidp);
- if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+ if (xidp == 0) {
+ throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
+ }
std::string xid((char*)xidp, er.xid_size());
- transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
+ transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true));
if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
@@ -482,7 +501,7 @@ bool RecoveryManager::getNextRecordHeader()
break;
case QLS_DEQ_MAGIC:
{
-//std::cout << ".d" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
deq_rec dr;
uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
@@ -492,10 +511,12 @@ bool RecoveryManager::getNextRecordHeader()
// If the enqueue is part of a pending txn, it will not yet be in emap
enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
dr.get_xid(&xidp);
- if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+ if (xidp == 0) {
+ throw jexception(jerrno::JERR_RCVM_NULLXID, "DEQ", "RecoveryManager", "getNextRecordHeader");
+ }
std::string xid((char*)xidp, dr.xid_size());
- transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
- dr.is_txn_coml_commit()));
+ transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos,
+ false, dr.is_txn_coml_commit()));
if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
@@ -505,30 +526,30 @@ bool RecoveryManager::getNextRecordHeader()
} else {
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
- enqueueCountList_[enq_fid]--;
+ fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
}
}
}
break;
case QLS_TXA_MAGIC:
{
-//std::cout << ".a" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
txn_rec ar;
if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
return false;
}
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
- if (xidp != 0) {
- throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+ if (xidp == 0) {
+ throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, ar.xid_size());
txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
- if (itr->_enq_flag) {
- enqueueCountList_[itr->_pfid]--;
+ if (itr->enq_flag_) {
+ fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
} else {
- enqueueMapRef_.unlock(itr->_drid); // ignore not found error
+ enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
}
std::free(xidp);
@@ -536,30 +557,31 @@ bool RecoveryManager::getNextRecordHeader()
break;
case QLS_TXC_MAGIC:
{
-//std::cout << ".t" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
txn_rec cr;
if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
return false;
}
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
- if (xidp != 0) {
- throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+ if (xidp == 0) {
+ throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, cr.xid_size());
txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
- if (itr->_enq_flag) { // txn enqueue
- if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail
+ if (itr->enq_flag_) { // txn enqueue
+//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
+ if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
- oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+ oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
} else { // txn dequeue
uint64_t enq_fid;
- if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
- enqueueCountList_[enq_fid]--;
+ if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+ fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
}
}
std::free(xidp);
@@ -577,11 +599,11 @@ bool RecoveryManager::getNextRecordHeader()
}
break;
case 0:
-//std::cout << ".0" << std::endl << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
checkJournalAlignment(file_pos);
return false;
default:
-//std::cout << ".?" << std::endl << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
// Stop as this is the overwrite boundary.
checkJournalAlignment(file_pos);
return false;
@@ -593,16 +615,24 @@ void RecoveryManager::readJournalData(char* target,
const std::streamsize readSize) {
std::streamoff bytesRead = 0;
while (bytesRead < readSize) {
- if (inFileStream_.eof()) {
- getNextFile(false);
+ std::streampos file_pos = inFileStream_.tellg();
+ if (file_pos == -1) {
+ std::ostringstream oss;
+ oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+ throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData");
}
- bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024;
- std::streamoff actualReadSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg();
- inFileStream_.read(target + bytesRead, actualReadSize);
- if (inFileStream_.gcount() != actualReadSize) {
- throw jexception(); // TODO - proper exception
+ inFileStream_.read(target + bytesRead, readSize - bytesRead);
+ std::streamoff thisReadSize = inFileStream_.gcount();
+ if (thisReadSize < readSize) {
+ getNextFile(false);
+ file_pos = inFileStream_.tellg();
+ if (file_pos == -1) {
+ std::ostringstream oss;
+ oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+ throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData");
+ }
}
- bytesRead += actualReadSize;
+ bytesRead += thisReadSize;
}
}
@@ -633,12 +663,10 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
}
void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
- while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) {
- fileNumberNameMapItr_t i = fileNumberNameMap_.begin();
-//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl;
- emptyFilePoolPtr->returnEmptyFile(i->second);
- fileNumberNameMap_.erase(i);
- enqueueCountList_.pop_front();
+ while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
+//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG
+ emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName());
+ fileNumberMap_.erase(fileNumberMap_.begin()->first);
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
index 91446c6abf..3cacd7cfb3 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
@@ -49,11 +49,9 @@ protected:
// Types
typedef std::vector<std::string> directoryList_t;
typedef directoryList_t::const_iterator directoryListConstItr_t;
- typedef std::map<uint64_t, std::string> fileNumberNameMap_t;
- typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t;
- typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t;
- typedef std::deque<uint32_t> enqueueCountList_t;
- typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t;
+ typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
+ typedef fileNumberMap_t::iterator fileNumberMapItr_t;
+ typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
typedef std::vector<uint64_t> recordIdList_t;
typedef recordIdList_t::const_iterator recordIdListConstItr_t;
@@ -65,8 +63,7 @@ protected:
JournalLog& journalLogRef_;
// Initial journal analysis data
- fileNumberNameMap_t fileNumberNameMap_; ///< File number - name map
- enqueueCountList_t enqueueCountList_; ///< Number enqueued records found for each file
+ fileNumberMap_t fileNumberMap_; ///< File number - JournalFilePtr map
bool journalEmptyFlag_; ///< Journal data files empty
std::streamoff firstRecordOffset_; ///< First record offset in ffid
std::streamoff endOffset_; ///< End offset (first byte past last record)
@@ -75,8 +72,8 @@ protected:
bool lastFileFullFlag_; ///< Last file is full
// State for recovery of individual enqueued records
- uint32_t fileSize_kib_;
- fileNumberNameMapConstItr_t currentJournalFileConstItr_;
+ uint32_t efpFileSize_kib_;
+ fileNumberMapConstItr_t currentJournalFileConstItr_;
std::string currentFileName_;
std::ifstream inFileStream_;
recordIdList_t recordIdList_;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
index e8873cddcd..f21deead5d 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
@@ -103,35 +103,10 @@ jcntl::initialize(EmptyFilePool* efpp,
_tmap.clear();
_linearFileController.finalize();
-
-// _lpmgr.finalize();
-
- // Set new file geometry parameters
-// assert(num_jfiles >= JRNL_MIN_NUM_FILES);
-// assert(num_jfiles <= JRNL_MAX_NUM_FILES);
-// _emap.set_num_jfiles(num_jfiles);
-// _tmap.set_num_jfiles(num_jfiles);
-
-// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
-// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
-// _jfsize_sblks = jfsize_sblks;
-
- // Clear any existing journal files
- _jdir.clear_dir();
-// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
-
+ _jdir.clear_dir(); // Clear any existing journal files
_linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
_linearFileController.pullEmptyFileFromEfp();
-// std::cout << _linearFileController.status(2);
-// _wrfc.initialize(_jfsize_sblks);
-// _rrfc.initialize();
-// _rrfc.set_findex(0);
-// _rmgr.initialize(cbp);
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
-
- // Write info file (<basename>.jinf) to disk
-// write_infofile();
-
_init_flag = true;
}
@@ -152,36 +127,14 @@ jcntl::recover(EmptyFilePoolManager* efpmp,
_linearFileController.finalize();
-// _lpmgr.finalize();
-
-// assert(num_jfiles >= JRNL_MIN_NUM_FILES);
-// assert(num_jfiles <= JRNL_MAX_NUM_FILES);
-// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
-// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
-// _jfsize_sblks = jfsize_sblks;
-
// Verify journal dir and journal files
_jdir.verify_dir();
-// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
-
-// rcvr_janalyze(prep_txn_list_ptr, efpm);
- efpIdentity_t efpIdentity;
_recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
highest_rid = _recoveryManager.getHighestRecordId();
-// if (_rcvdat._jfull)
-// throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
_jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
-
-// _lpmgr.recover(_rcvdat, this, &new_fcntl);
-
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
-// _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber());
_recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
-// _wrfc.initialize(_jfsize_sblks, &_rcvdat);
-// _rrfc.initialize();
-// _rrfc.set_findex(_rcvdat.ffid());
-// _rmgr.initialize(cbp);
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
(_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
@@ -194,12 +147,6 @@ jcntl::recover_complete()
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
-// for (uint16_t i=0; i<_lpmgr.num_jfiles(); i++)
-// _lpmgr.get_fcntlp(i)->reset(&_rcvdat);
-// _wrfc.initialize(_jfsize_sblks, &_rcvdat);
-// _rrfc.initialize();
-// _rrfc.set_findex(_rcvdat.ffid());
-// _rmgr.recover_complete();
_readonly_flag = false;
}
@@ -207,7 +154,7 @@ void
jcntl::delete_jrnl_files()
{
stop(true); // wait for AIO to complete
- _linearFileController.purgeFilesToEfp();
+ _linearFileController.purgeEmptyFilesToEfp();
_jdir.delete_dir();
}
@@ -288,73 +235,10 @@ jcntl::read_data_record(void** const datapp,
bool ignore_pending_txns)
{
check_rstatus("read_data");
- if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns))
+ if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) {
return RHM_IORES_SUCCESS;
- return RHM_IORES_EMPTY;
-/*
- if (!dtokp->is_readable()) {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
- oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
- oss << "; dtok_wstate=" << dtokp->wstate_str();
- throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record");
}
- std::vector<uint64_t> ridl;
- _emap.rid_list(ridl);
- enq_map::emap_data_struct_t eds;
- for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
- short res = _emap.get_data(*i, eds);
- if (res == enq_map::EMAP_OK) {
- std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
- if (!ifs.good()) {
- std::ostringstream oss;
- oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
- throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record");
- }
- ifs.seekg(eds._file_posn, std::ifstream::beg);
- ::enq_hdr_t eh;
- ifs.read((char*)&eh, sizeof(::enq_hdr_t));
- if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
- std::ostringstream oss;
- oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
- throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
- }
- dsize = eh._dsize;
- xidsize = eh._xidsize;
- transient = ::is_enq_transient(&eh);
- external = ::is_enq_external(&eh);
- if (xidsize) {
- *xidpp = ::malloc(xidsize);
- ifs.read((char*)(*xidpp), xidsize);
- } else {
- *xidpp = 0;
- }
- if (dsize) {
- *datapp = ::malloc(dsize);
- ifs.read((char*)(*datapp), dsize);
- } else {
- *datapp = 0;
- }
- }
- }
-*/
-/*
- check_rstatus("read_data");
- iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
- if (res == RHM_IORES_RCINVALID)
- {
- get_wr_events(0); // check for outstanding write events
- iores sres = _rmgr.synchronize(); // flushes all outstanding read events
- if (sres != RHM_IORES_SUCCESS)
- return sres;
- // TODO: Does linear store need this?
-// _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs
- res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
- }
- return res;
-*/
- return RHM_IORES_SUCCESS;
+ return RHM_IORES_EMPTY;
}
iores
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
index 7c70be1ed7..c8fd2f55fe 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
@@ -53,7 +53,6 @@ const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204;
const uint32_t jerrno::JERR_JCNTL_ENQSTATE = 0x0207;
const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR = 0x0208;
-
// class jdir
const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300;
const uint32_t jerrno::JERR_JDIR_MKDIR = 0x0301;
@@ -89,16 +88,11 @@ const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
const uint32_t jerrno::JERR_WMGR_BADFH = 0x0806;
// class RecoveryManager
-const uint32_t jerrno::JERR_RCVM_OPENRD = 0x0900;
-const uint32_t jerrno::JERR_RCVM_READ = 0x0901;
-const uint32_t jerrno::JERR_RCVM_WRITE = 0x0902;
-
-//// class rmgr
-//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
-//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901;
-////const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902;
-//const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903;
-//const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904;
+const uint32_t jerrno::JERR_RCVM_OPENRD = 0x0900; ///< Unable to open file for read
+const uint32_t jerrno::JERR_RCVM_STREAMBAD = 0x0901; ///< Read/write stream error
+const uint32_t jerrno::JERR_RCVM_READ = 0x0902; ///< Read error: no or insufficient data to read
+const uint32_t jerrno::JERR_RCVM_WRITE = 0x0903; ///< Write error
+const uint32_t jerrno::JERR_RCVM_NULLXID = 0x0904; ///< Null XID when XID length non-null in header
// class data_tok
const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
@@ -184,15 +178,11 @@ jerrno::__init()
_err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle.";
// class RecoveryManager
- _err_map[JERR_RCVM_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
- _err_map[JERR_RCVM_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
+ _err_map[JERR_RCVM_OPENRD] = "JERR_RCVM_OPENRD: Unable to open file for read";
+ _err_map[JERR_RCVM_STREAMBAD] = "JERR_RCVM_STREAMBAD: Read/write stream error";
+ _err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read";
_err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error";
-// // class rmgr
-// _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
-// _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";
-// //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc";
-// _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ";
-// _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type";
+ _err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header";
// class data_tok
_err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.";
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
index 7bd521ecbf..4c7124ae22 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
@@ -50,40 +50,40 @@ namespace qls_jrnl
public:
// generic errors
- static const uint32_t JERR__MALLOC; ///< Buffer memory allocation failed
- static const uint32_t JERR__UNDERFLOW; ///< Underflow error
- static const uint32_t JERR__NINIT; ///< Operation on uninitialized class
- static const uint32_t JERR__AIO; ///< AIO failure
- static const uint32_t JERR__FILEIO; ///< File read or write failure
- static const uint32_t JERR__RTCLOCK; ///< Reading real-time clock failed
- static const uint32_t JERR__PTHREAD; ///< pthread failure
- static const uint32_t JERR__TIMEOUT; ///< Timeout waiting for an event
- static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event
- static const uint32_t JERR__RECNFOUND; ///< Record not found
- static const uint32_t JERR__NOTIMPL; ///< Not implemented
- static const uint32_t JERR__NULL; ///< Operation on null pointer
+ static const uint32_t JERR__MALLOC; ///< Buffer memory allocation failed
+ static const uint32_t JERR__UNDERFLOW; ///< Underflow error
+ static const uint32_t JERR__NINIT; ///< Operation on uninitialized class
+ static const uint32_t JERR__AIO; ///< AIO failure
+ static const uint32_t JERR__FILEIO; ///< File read or write failure
+ static const uint32_t JERR__RTCLOCK; ///< Reading real-time clock failed
+ static const uint32_t JERR__PTHREAD; ///< pthread failure
+ static const uint32_t JERR__TIMEOUT; ///< Timeout waiting for an event
+ static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event
+ static const uint32_t JERR__RECNFOUND; ///< Record not found
+ static const uint32_t JERR__NOTIMPL; ///< Not implemented
+ static const uint32_t JERR__NULL; ///< Operation on null pointer
// class jcntl
- static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal
- static const uint32_t JERR_JCNTL_READONLY; ///< Write operation on read-only journal
- static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete
- static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
- static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
- static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state
- static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header
+ static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal
+ static const uint32_t JERR_JCNTL_READONLY; ///< Write operation on read-only journal
+ static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete
+ static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
+ static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
+ static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state
+ static const uint32_t JERR_JCNTL_INVALIDENQHDR; ///< Invalid ENQ header
// class jdir
- static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory
- static const uint32_t JERR_JDIR_MKDIR; ///< Directory creation failed
- static const uint32_t JERR_JDIR_OPENDIR; ///< Directory open failed
- static const uint32_t JERR_JDIR_READDIR; ///< Directory read failed
- static const uint32_t JERR_JDIR_CLOSEDIR; ///< Directory close failed
- static const uint32_t JERR_JDIR_RMDIR; ///< Directory delete failed
- static const uint32_t JERR_JDIR_NOSUCHFILE; ///< File does not exist
- static const uint32_t JERR_JDIR_FMOVE; ///< File move failed
- static const uint32_t JERR_JDIR_STAT; ///< File stat failed
- static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed
- static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode)
+ static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory
+ static const uint32_t JERR_JDIR_MKDIR; ///< Directory creation failed
+ static const uint32_t JERR_JDIR_OPENDIR; ///< Directory open failed
+ static const uint32_t JERR_JDIR_READDIR; ///< Directory read failed
+ static const uint32_t JERR_JDIR_CLOSEDIR; ///< Directory close failed
+ static const uint32_t JERR_JDIR_RMDIR; ///< Directory delete failed
+ static const uint32_t JERR_JDIR_NOSUCHFILE; ///< File does not exist
+ static const uint32_t JERR_JDIR_FMOVE; ///< File move failed
+ static const uint32_t JERR_JDIR_STAT; ///< File stat failed
+ static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed
+ static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode)
// class JournalFile
static const uint32_t JERR_JNLF_OPEN; ///< Unable to open file for write
@@ -92,47 +92,42 @@ namespace qls_jrnl
static const uint32_t JERR_JNLF_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs
// class LinearFileController
- static const uint32_t JERR_LFCR_SEQNUMNOTFOUND;///< File sequence number not found
+ static const uint32_t JERR_LFCR_SEQNUMNOTFOUND; ///< File sequence number not found
// class jrec, enq_rec, deq_rec, txn_rec
- static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header
- static const uint32_t JERR_JREC_BADRECTAIL; ///< Invalid data record tail
+ static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header
+ static const uint32_t JERR_JREC_BADRECTAIL; ///< Invalid data record tail
// class wmgr
- static const uint32_t JERR_WMGR_BADPGSTATE; ///< Page buffer in illegal state.
- static const uint32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state.
- static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl.
- static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl.
- static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued
- static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle
+ static const uint32_t JERR_WMGR_BADPGSTATE; ///< Page buffer in illegal state.
+ static const uint32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state.
+ static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl.
+ static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl.
+ static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued
+ static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle
// class RecoveryManager
- static const uint32_t JERR_RCVM_OPENRD; ///< Unable to open file for read
- static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read
+ static const uint32_t JERR_RCVM_OPENRD; ///< Unable to open file for read
+ static const uint32_t JERR_RCVM_STREAMBAD; ///< Read/write stream error
+ static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read
static const uint32_t JERR_RCVM_WRITE; ///< Write error
-
-// // class rmgr
-// static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
-// static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok
-// //static const uint32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc
-// static const uint32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ
-// static const uint32_t JERR_RMGR_BADRECTYPE; ///< Attempted op on incorrect rec type
+ static const uint32_t JERR_RCVM_NULLXID; ///< Null XID when XID length non-null in header
// class data_tok
- static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state
-// static const uint32_t JERR_DTOK_RIDNOTSET; ///< Record ID not set
+ static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state
+// static const uint32_t JERR_DTOK_RIDNOTSET; ///< Record ID not set
// class enq_map, txn_map
- static const uint32_t JERR_MAP_DUPLICATE; ///< Attempted to insert using duplicate key
- static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map
- static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn
+ static const uint32_t JERR_MAP_DUPLICATE; ///< Attempted to insert using duplicate key
+ static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map
+ static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn
// EFP errors
static const uint32_t JERR_EFP_BADPARTITIONNAME; ///< Partition name invalid or of value 0
- static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid
- static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
- static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size
- static const uint32_t JERR_EFP_EMPTY; ///< EFP empty
+ static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid
+ static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
+ static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size
+ static const uint32_t JERR_EFP_EMPTY; ///< EFP empty
// Negative returns for some functions
static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
index d1972ea1f2..9c32f4128b 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
@@ -39,14 +39,19 @@ int16_t txn_map::TMAP_OK = 0;
int16_t txn_map::TMAP_NOT_SYNCED = 0;
int16_t txn_map::TMAP_SYNCED = 1;
-txn_data_struct::txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
- const bool enq_flag, const bool commit_flag):
- _rid(rid),
- _drid(drid),
- _pfid(pfid),
- _enq_flag(enq_flag),
- _commit_flag(commit_flag),
- _aio_compl(false)
+txn_data_t::txn_data_t(const uint64_t rid,
+ const uint64_t drid,
+ const uint16_t pfid,
+ const uint64_t foffs,
+ const bool enq_flag,
+ const bool commit_flag):
+ rid_(rid),
+ drid_(drid),
+ pfid_(pfid),
+ foffs_(foffs),
+ enq_flag_(enq_flag),
+ commit_flag_(commit_flag),
+ aio_compl_(false)
{}
txn_map::txn_map():
@@ -56,24 +61,8 @@ txn_map::txn_map():
txn_map::~txn_map() {}
-/*
-void
-txn_map::set_num_jfiles(const uint16_t num_jfiles)
-{
- _pfid_txn_cnt.resize(num_jfiles, 0);
-}
-*/
-
-/*
-uint32_t
-txn_map::get_txn_pfid_cnt(const uint16_t pfid) const
-{
- return _pfid_txn_cnt.at(pfid);
-}
-*/
-
bool
-txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
+txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td)
{
bool ok = true;
slock s(_mutex);
@@ -88,7 +77,6 @@ txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
}
else
itr->second.push_back(td);
-// _pfid_txn_cnt.at(td._pfid)++;
return ok;
}
@@ -117,8 +105,6 @@ txn_map::get_remove_tdata_list(const std::string& xid)
return _empty_data_list;
txn_data_list list = itr->second;
_map.erase(itr);
-// for (tdl_itr i=list.begin(); i!=list.end(); i++)
-// _pfid_txn_cnt.at(i->_pfid)--;
return list;
}
@@ -151,7 +137,7 @@ txn_map::cnt(const bool enq_flag)
{
for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
{
- if (j->_enq_flag == enq_flag)
+ if (j->enq_flag_ == enq_flag)
c++;
}
}
@@ -168,7 +154,7 @@ txn_map::is_txn_synced(const std::string& xid)
bool is_synced = true;
for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
- if (!litr->_aio_compl)
+ if (!litr->aio_compl_)
{
is_synced = false;
break;
@@ -186,9 +172,9 @@ txn_map::set_aio_compl(const std::string& xid, const uint64_t rid)
return TMAP_XID_NOT_FOUND;
for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
- if (litr->_rid == rid)
+ if (litr->rid_ == rid)
{
- litr->_aio_compl = true;
+ litr->aio_compl_ = true;
return TMAP_OK; // rid found
}
}
@@ -206,7 +192,7 @@ txn_map::data_exists(const std::string& xid, const uint64_t rid)
tdl_itr itr = tdl.begin();
while (itr != tdl.end() && !found)
{
- found = itr->_rid == rid;
+ found = itr->rid_ == rid;
itr++;
}
}
@@ -224,10 +210,10 @@ txn_map::is_enq(const uint64_t rid)
txn_data_list list = i->second;
for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
{
- if (j->_enq_flag)
- found = j->_rid == rid;
+ if (j->enq_flag_)
+ found = j->rid_ == rid;
else
- found = j->_drid == rid;
+ found = j->drid_ == rid;
}
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
index b69d17da1c..146997abe0 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
@@ -45,19 +45,23 @@ namespace qls_jrnl
* \brief Struct encapsulating transaction data necessary for processing a transaction
* in the journal once it is closed with either a commit or abort.
*/
- struct txn_data_struct
+ typedef struct txn_data_t
{
- uint64_t _rid; ///< Record id for this operation
- uint64_t _drid; ///< Dequeue record id for this operation
- uint16_t _pfid; ///< Physical file id, to be used when transferring to emap on commit
- bool _enq_flag; ///< If true, enq op, otherwise deq op
- bool _commit_flag; ///< (2PC transactions) Records 2PC complete c/a mode
- bool _aio_compl; ///< Initially false, set to true when record AIO returns
- txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
- const bool enq_flag, const bool commit_flag = false);
- };
- typedef txn_data_struct txn_data;
- typedef std::vector<txn_data> txn_data_list;
+ uint64_t rid_; ///< Record id for this operation
+ uint64_t drid_; ///< Dequeue record id for this operation
+ uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit
+ uint64_t foffs_; ///< Offset in file for this record
+ bool enq_flag_; ///< If true, enq op, otherwise deq op
+ bool commit_flag_; ///< (2PC transactions) Records 2PC complete c/a mode
+ bool aio_compl_; ///< Initially false, set to true when record AIO returns
+ txn_data_t(const uint64_t rid,
+ const uint64_t drid,
+ const uint16_t pfid,
+ const uint64_t foffs,
+ const bool enq_flag,
+ const bool commit_flag = false);
+ } txn_data_t;
+ typedef std::vector<txn_data_t> txn_data_list;
typedef txn_data_list::iterator tdl_itr;
/**
@@ -112,16 +116,13 @@ namespace qls_jrnl
xmap _map;
smutex _mutex;
-// std::vector<uint32_t> _pfid_txn_cnt;
const txn_data_list _empty_data_list;
public:
txn_map();
virtual ~txn_map();
-// void set_num_jfiles(const uint16_t num_jfiles);
-// uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
- bool insert_txn_data(const std::string& xid, const txn_data& td);
+ bool insert_txn_data(const std::string& xid, const txn_data_t& td);
const txn_data_list get_tdata_list(const std::string& xid);
const txn_data_list get_remove_tdata_list(const std::string& xid);
bool in_map(const std::string& xid);
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
index 7e53be5e18..dcc24bac8a 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
@@ -28,7 +28,7 @@ void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t vers
dest->_fhdr_size_sblks = fhdr_size_sblks;
dest->_efp_partition = efp_partition;
dest->_reserved = 0;
- dest->_file_size_kib = file_size;
+ dest->_data_size_kib = file_size;
dest->_fro = 0;
dest->_ts_nsec = 0;
dest->_ts_sec = 0;
@@ -58,7 +58,7 @@ void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
dest->_efp_partition = src->_efp_partition; // Should this be copied?
- dest->_file_size_kib = src->_file_size_kib;
+ dest->_data_size_kib = src->_data_size_kib;
dest->_fro = src->_fro;
dest->_ts_sec = src->_ts_sec;
dest->_ts_nsec = src->_ts_nsec;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
index efdd97b624..6a53c631d5 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
@@ -79,7 +79,7 @@ typedef struct file_hdr_t {
uint16_t _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */
uint16_t _efp_partition; /**< EFP Partition number from which this file was obtained */
uint32_t _reserved;
- uint64_t _file_size_kib; /**< Size of this file in KiB, excluding header sblk */
+ uint64_t _data_size_kib; /**< Size of the data part of this file in KiB. (ie file size excluding file header sblk) */
uint64_t _fro; /**< First Record Offset (FRO) */
uint64_t _ts_sec; /**< Time stamp (seconds part) */
uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
index a4ed6cbf19..991e26a3b9 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
@@ -33,7 +33,7 @@
#include <sstream>
#include <stdint.h>
-#include <iostream> // DEBUG
+//#include <iostream> // DEBUG
namespace qpid
{
@@ -99,7 +99,7 @@ wmgr::initialize(aio_callback* const cbp,
if (eo)
{
const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS;
- uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
+ uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); // exclude file header
_pg_cntr = data_dblks / wr_pg_size_dblks;
_pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
}
@@ -115,6 +115,7 @@ wmgr::enqueue(const void* const data_buff,
const bool transient,
const bool external)
{
+//std::cout << _lfc.status(10) << std::endl;
if (xid_len)
assert(xid_ptr != 0);
@@ -160,7 +161,7 @@ wmgr::enqueue(const void* const data_buff,
dtokp->clear_xid();
_enq_busy = true;
}
-//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG
+//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " po=0x" << _pg_offset_dblks << " cs=0x" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << " " << std::dec << std::flush; // DEBUG
bool done = false;
while (!done)
{
@@ -193,12 +194,12 @@ wmgr::enqueue(const void* const data_buff,
// message. AIO callbacks will then only process this token when entire message is
// enqueued.
_lfc.incrEnqueuedRecordCount(dtokp->fid());
-//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(dtokp->fid()) << std::dec << std::flush; // DEBUG
if (xid_len) // If part of transaction, add to transaction map
{
std::string xid((const char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
+ _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true));
}
else
{
@@ -213,7 +214,7 @@ wmgr::enqueue(const void* const data_buff,
done = true;
} else {
-//std::cout << "$" << std::endl << std::flush; // DEBUG
+//std::cout << "$" << std::flush; // DEBUG
dtokp->set_wstate(data_tok::ENQ_PART);
}
@@ -313,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp,
// If the enqueue is part of a pending txn, it will not yet be in emap
_emap.lock(dequeue_rid); // ignore rid not found error
std::string xid((const char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false));
+ _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false));
}
else
{
@@ -427,10 +428,10 @@ wmgr::abort(data_tok* dtokp,
txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- if (!itr->_enq_flag)
- _emap.unlock(itr->_drid); // ignore rid not found error
- if (itr->_enq_flag)
- _lfc.decrEnqueuedRecordCount(itr->_pfid);
+ if (!itr->enq_flag_)
+ _emap.unlock(itr->drid_); // ignore rid not found error
+ if (itr->enq_flag_)
+ _lfc.decrEnqueuedRecordCount(itr->pfid_);
}
std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
if (!res.second)
@@ -441,9 +442,9 @@ wmgr::abort(data_tok* dtokp,
}
done = true;
- }
- else
+ } else {
dtokp->set_wstate(data_tok::ABORT_PART);
+ }
file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
flush_check(res, cont, done, rid);
@@ -525,20 +526,20 @@ wmgr::commit(data_tok* dtokp,
txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- if (itr->_enq_flag) // txn enqueue
+ if (itr->enq_flag_) // txn enqueue
{
- if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail
+ if (_emap.insert_pfid(itr->rid_, itr->pfid_, 0) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
- oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+ oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
}
}
else // txn dequeue
{
uint64_t fid;
- short eres = _emap.get_remove_pfid(itr->_drid, fid, true);
+ short eres = _emap.get_remove_pfid(itr->drid_, fid, true);
if (eres < enq_map::EMAP_OK) // fail
{
if (eres == enq_map::EMAP_RID_NOT_FOUND)
@@ -566,9 +567,9 @@ wmgr::commit(data_tok* dtokp,
}
done = true;
- }
- else
+ } else {
dtokp->set_wstate(data_tok::COMMIT_PART);
+ }
file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
flush_check(res, cont, done, rid);
@@ -585,6 +586,7 @@ wmgr::file_header_check(const uint64_t rid,
{
if (_lfc.isEmpty()) // File never written (i.e. no header or data)
{
+//std::cout << "e" << std::flush;
std::size_t fro = 0;
if (cont) {
bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file
@@ -608,6 +610,7 @@ wmgr::flush_check(iores& res,
// Is page is full, flush
if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
{
+//std::cout << "^" << _pg_offset_dblks << ">=" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << std::flush;
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
@@ -621,6 +624,7 @@ wmgr::flush_check(iores& res,
uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
if (_pg_cntr >= fileSize_pgs)
{
+//std::cout << _pg_cntr << ">=" << fileSize_pgs << std::flush;
get_next_file();
if (!done) {
cont = true;
@@ -649,7 +653,7 @@ wmgr::write_flush()
if (_cached_offset_dblks)
{
if (_page_cb_arr[_pg_index]._state == AIO_PENDING) {
-//std::cout << "#"; // DEBUG
+//std::cout << "#" << std::flush; // DEBUG
res = RHM_IORES_PAGE_AIOWAIT;
} else {
if (_page_cb_arr[_pg_index]._state != IN_USE)
@@ -688,7 +692,7 @@ void
wmgr::get_next_file()
{
_pg_cntr = 0;
-//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG
+//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG
_lfc.pullEmptyFileFromEfp();
}
@@ -972,6 +976,7 @@ wmgr::dblk_roundup()
uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS;
while (_cached_offset_dblks < wdblks)
{
+//std::cout << "^0x" << std::hex << _cached_offset_dblks << "<0x" << wdblks << std::dec << std::flush;
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
#ifdef QLS_CLEAN