summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-08-23 18:07:15 +0000
committerKim van der Riet <kpvdr@apache.org>2013-08-23 18:07:15 +0000
commit7defd7eb7b11a0ff83ee0a7393477a453f4fb604 (patch)
tree1f5728f2f86afb454953fe64bf748a6c42b67c47 /qpid/cpp/src
parent2c39e4c4da4d727bdca8ce6d3ff8a9d570a04b22 (diff)
downloadqpid-python-7defd7eb7b11a0ff83ee0a7393477a453f4fb604.tar.gz
QPID-4984: WIP - compiles, but not functional.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1516958 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/linearstore/BufferValue.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/BufferValue.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/Cursor.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdDbt.cpp6
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdDbt.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdSequence.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdSequence.h1
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.cpp91
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.h62
-rw-r--r--qpid/cpp/src/qpid/linearstore/Log.h30
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp353
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h96
-rw-r--r--qpid/cpp/src/qpid/linearstore/PreparedTransaction.h5
-rw-r--r--qpid/cpp/src/qpid/linearstore/StorePlugin.cpp6
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp10
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jrec.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp3
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp28
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp34
22 files changed, 322 insertions, 430 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
index f78cb46af0..5115055375 100644
--- a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
+++ b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
@@ -26,7 +26,7 @@ namespace linearstore {
-BufferValue::BufferValue(u_int32_t size, u_int64_t offset)
+BufferValue::BufferValue(uint32_t size, uint64_t offset)
: data(new char[size]),
buffer(data, size)
{
diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.h b/qpid/cpp/src/qpid/linearstore/BufferValue.h
index bbd6a5b88d..5af1ac6a43 100644
--- a/qpid/cpp/src/qpid/linearstore/BufferValue.h
+++ b/qpid/cpp/src/qpid/linearstore/BufferValue.h
@@ -36,7 +36,7 @@ class BufferValue : public Dbt
public:
qpid::framing::Buffer buffer;
- BufferValue(u_int32_t size, u_int64_t offset);
+ BufferValue(uint32_t size, uint64_t offset);
BufferValue(const qpid::broker::Persistable& p);
virtual ~BufferValue();
};
diff --git a/qpid/cpp/src/qpid/linearstore/Cursor.h b/qpid/cpp/src/qpid/linearstore/Cursor.h
index 3f3023c46c..c7e7f34faa 100644
--- a/qpid/cpp/src/qpid/linearstore/Cursor.h
+++ b/qpid/cpp/src/qpid/linearstore/Cursor.h
@@ -37,7 +37,7 @@ public:
Cursor() : cursor(0) {}
virtual ~Cursor() { if(cursor) cursor->close(); }
- void open(db_ptr db, DbTxn* txn, u_int32_t flags = 0) { db->cursor(txn, &cursor, flags); }
+ void open(db_ptr db, DbTxn* txn, uint32_t flags = 0) { db->cursor(txn, &cursor, flags); }
void close() { if(cursor) cursor->close(); cursor = 0; }
Dbc* get() { return cursor; }
Dbc* operator->() { return cursor; }
diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
index 075234acb9..d427085bbe 100644
--- a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
@@ -28,7 +28,7 @@ IdDbt::IdDbt() : id(0)
init();
}
-IdDbt::IdDbt(u_int64_t _id) : id(_id)
+IdDbt::IdDbt(uint64_t _id) : id(_id)
{
init();
}
@@ -36,7 +36,7 @@ IdDbt::IdDbt(u_int64_t _id) : id(_id)
void IdDbt::init()
{
set_data(&id);
- set_size(sizeof(u_int64_t));
- set_ulen(sizeof(u_int64_t));
+ set_size(sizeof(uint64_t));
+ set_ulen(sizeof(uint64_t));
set_flags(DB_DBT_USERMEM);
}
diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.h b/qpid/cpp/src/qpid/linearstore/IdDbt.h
index 0a5b0d782d..f8bb0647db 100644
--- a/qpid/cpp/src/qpid/linearstore/IdDbt.h
+++ b/qpid/cpp/src/qpid/linearstore/IdDbt.h
@@ -31,9 +31,9 @@ class IdDbt : public Dbt
{
void init();
public:
- u_int64_t id;
+ uint64_t id;
- IdDbt(u_int64_t id);
+ IdDbt(uint64_t id);
IdDbt();
};
diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
index 4ad40f5329..4d3172ffe9 100644
--- a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
+++ b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
@@ -26,7 +26,7 @@ using qpid::sys::Mutex;
IdSequence::IdSequence() : id(1) {}
-u_int64_t IdSequence::next()
+uint64_t IdSequence::next()
{
Mutex::ScopedLock guard(lock);
if (!id) id++; // avoid 0 when folding around
diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.h b/qpid/cpp/src/qpid/linearstore/IdSequence.h
index 06f3db3f1e..11b31a2813 100644
--- a/qpid/cpp/src/qpid/linearstore/IdSequence.h
+++ b/qpid/cpp/src/qpid/linearstore/IdSequence.h
@@ -24,7 +24,6 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
-#include <sys/types.h>
namespace qpid{
namespace linearstore{
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
index ea091ae941..f8b7777269 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
@@ -32,6 +32,7 @@
#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
+#include "qpid/linearstore/Log.h"
#include "qpid/linearstore/StoreException.h"
using namespace qpid::qls_jrnl;
@@ -79,10 +80,10 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
initManagement(a);
- log(LOG_NOTICE, "Created");
+ QLS_LOG2(notice, _jid, "Created");
std::ostringstream oss;
oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\"";
- log(LOG_DEBUG, oss.str());
+ QLS_LOG2(debug, _jid, oss.str());
}
JournalImpl::~JournalImpl()
@@ -90,7 +91,7 @@ JournalImpl::~JournalImpl()
if (deleteCallback) deleteCallback(*this);
if (_init_flag && !_stop_flag){
try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
- catch (const jexception& e) { log(LOG_ERROR, e.what()); }
+ catch (const jexception& e) { QLS_LOG2(error, _jid, e.what()); }
}
getEventsFireEventsPtr->cancel();
inactivityFireEventPtr->cancel();
@@ -101,7 +102,7 @@ JournalImpl::~JournalImpl()
_mgmtObject.reset();
}
- log(LOG_NOTICE, "Destroyed");
+ QLS_LOG2(notice, _jid, "Destroyed");
}
void
@@ -116,7 +117,7 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
_mgmtObject->set_name(_jid);
_mgmtObject->set_directory(_jdir.dirname());
_mgmtObject->set_baseFileName(_base_filename);
- _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
_mgmtObject->set_readPages(JRNL_RMGR_PAGES);
// The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
@@ -132,12 +133,12 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
void
-JournalImpl::initialize(/*const u_int16_t num_jfiles,
+JournalImpl::initialize(/*const uint16_t num_jfiles,
const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,*/
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
+ const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp)
{
std::ostringstream oss;
@@ -145,9 +146,9 @@ JournalImpl::initialize(/*const u_int16_t num_jfiles,
oss << "Initialize;";
oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss << " wcache_num_pages=" << wcache_num_pages;
- log(LOG_DEBUG, oss.str());
+ QLS_LOG2(debug, _jid, oss.str());
jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp);
- log(LOG_DEBUG, "Initialization complete");
+ QLS_LOG2(debug, _jid, "Initialization complete");
// TODO: replace for linearstore: _lpmgr
/*
if (_mgmtObject.get() != 0)
@@ -156,27 +157,27 @@ JournalImpl::initialize(/*const u_int16_t num_jfiles,
_mgmtObject->set_autoExpand(_lpmgr.is_ae());
_mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
- _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE);
+ _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()),
+ _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()),
qpid::management::ManagementAgent::SEV_NOTE);
*/
}
void
-JournalImpl::recover(/*const u_int16_t num_jfiles,
+JournalImpl::recover(/*const uint16_t num_jfiles,
const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,*/
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
+ const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
- u_int64_t& highest_rid,
- u_int64_t queue_id)
+ uint64_t& highest_rid,
+ uint64_t queue_id)
{
std::ostringstream oss1;
// oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
@@ -184,7 +185,7 @@ JournalImpl::recover(/*const u_int16_t num_jfiles,
oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss1 << " wcache_num_pages=" << wcache_num_pages;
- log(LOG_DEBUG, oss1.str());
+ QLS_LOG2(debug, _jid, oss1.str());
// TODO: replace for linearstore: _lpmgr
/*
if (_mgmtObject.get() != 0)
@@ -193,8 +194,8 @@ JournalImpl::recover(/*const u_int16_t num_jfiles,
_mgmtObject->set_autoExpand(_lpmgr.is_ae());
_mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
- _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE);
+ _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
*/
@@ -231,7 +232,7 @@ JournalImpl::recover(/*const u_int16_t num_jfiles,
oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
oss2 << "; journal now read-only.";
- log(LOG_DEBUG, oss2.str());
+ QLS_LOG2(debug, _jid, oss2.str());
if (_mgmtObject.get() != 0)
{
@@ -247,11 +248,11 @@ void
JournalImpl::recover_complete()
{
jcntl::recover_complete();
- log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
+ QLS_LOG2(debug, _jid, "Recover phase 2 complete; journal now writable.");
// TODO: replace for linearstore: _lpmgr
/*
if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(),
+ _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles(),
_emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE);
*/
}
@@ -261,7 +262,7 @@ JournalImpl::recover_complete()
// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
// Throw exception for all errors.
bool
-JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
+JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset)
{
qpid::sys::Mutex::ScopedLock sl(_read_lock);
if (_dtok.rid() != rid)
@@ -271,7 +272,7 @@ JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, siz
// Last read encountered out-of-order rids, check if this rid is in that list
bool oooFlag = false;
- for (std::vector<u_int64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) {
+ for (std::vector<uint64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) {
if (*i == rid) {
oooFlag = true;
}
@@ -342,7 +343,7 @@ JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, siz
if (_external) return false;
- u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
+ uint32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(uint32_t)).getLong() + sizeof(uint32_t);
if (hdr_offs + offset + length > _dlen) {
data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
} else {
@@ -492,6 +493,7 @@ JournalImpl::flush(const bool block_till_aio_cmpl)
return res;
}
+/*
void
JournalImpl::log(qpid::qls_jrnl::log_level ll, const std::string& log_stmt) const
{
@@ -503,15 +505,16 @@ JournalImpl::log(qpid::qls_jrnl::log_level ll, const char* const log_stmt) const
{
switch (ll)
{
- case LOG_TRACE: QPID_LOG(trace, "Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_DEBUG: QPID_LOG(debug, "Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_INFO: QPID_LOG(info, "Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_NOTICE: QPID_LOG(notice, "Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_WARN: QPID_LOG(warning, "Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_ERROR: QPID_LOG(error, "Journal \"" << _jid << "\": " << log_stmt); break;
- case LOG_CRITICAL: QPID_LOG(critical, "Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_TRACE: QPID_LOG(trace, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_DEBUG: QPID_LOG(debug, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_INFO: QPID_LOG(info, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_NOTICE: QPID_LOG(notice, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_WARN: QPID_LOG(warning, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_ERROR: QPID_LOG(error, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
+ case LOG_CRITICAL: QPID_LOG(critical, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
}
}
+*/
void
JournalImpl::getEventsFire()
@@ -568,7 +571,7 @@ JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl)
}
void
-JournalImpl::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
+JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/)
{}
void
@@ -595,8 +598,8 @@ JournalImpl::handleIoResult(const iores r)
case qpid::qls_jrnl::RHM_IORES_ENQCAPTHRESH:
{
std::ostringstream oss;
- oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\".";
- log(LOG_WARN, oss.str());
+ oss << "Enqueue capacity threshold exceeded.";
+ QLS_LOG2(warning, _jid, oss.str());
if (_agent != 0)
_agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"),
qpid::management::ManagementAgent::SEV_WARN);
@@ -606,7 +609,7 @@ JournalImpl::handleIoResult(const iores r)
{
std::ostringstream oss;
oss << "Journal full on queue \"" << _jid << "\".";
- log(LOG_CRITICAL, oss.str());
+ QLS_LOG2(critical, _jid, "Journal full.");
if (_agent != 0)
_agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
THROW_STORE_FULL_EXCEPTION(oss.str());
@@ -614,8 +617,8 @@ JournalImpl::handleIoResult(const iores r)
default:
{
std::ostringstream oss;
- oss << "Unexpected I/O response (" << qpid::qls_jrnl::iores_str(r) << ") on queue " << _jid << "\".";
- log(LOG_ERROR, oss.str());
+ oss << "Unexpected I/O response (" << qpid::qls_jrnl::iores_str(r) << ").";
+ QLS_LOG2(error, _jid, oss.str());
THROW_STORE_FULL_EXCEPTION(oss.str());
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
index c70a044bf5..692bccc9b0 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
@@ -76,7 +76,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
private:
// static qpid::sys::Mutex _static_lock;
-// static u_int32_t cnt;
+// static uint32_t cnt;
qpid::sys::Timer& timer;
bool getEventsTimerSetFlag;
@@ -84,8 +84,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
qpid::sys::Mutex _getf_lock;
qpid::sys::Mutex _read_lock;
- u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
- std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
+ uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
+ std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
bool writeActivityFlag;
bool flushTriggeredFlag;
@@ -117,44 +117,44 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
void initManagement(qpid::management::ManagementAgent* agent);
- void initialize(/*const u_int16_t num_jfiles,
+ void initialize(/*const uint16_t num_jfiles,
const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,*/
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
+ const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp);
- inline void initialize(/*const u_int16_t num_jfiles,
+ inline void initialize(/*const uint16_t num_jfiles,
const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,*/
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks) {
+ const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks) {
initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
this);
}
- void recover(/*const u_int16_t num_jfiles,
+ void recover(/*const uint16_t num_jfiles,
const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,*/
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
+ const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
qpid::qls_jrnl::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
- u_int64_t& highest_rid,
- u_int64_t queue_id);
+ uint64_t& highest_rid,
+ uint64_t queue_id);
- inline void recover(/*const u_int16_t num_jfiles,
+ inline void recover(/*const uint16_t num_jfiles,
const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,*/
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
+ const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/
+ const uint16_t wcache_num_pages,
+ const uint32_t wcache_pgsize_sblks,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
- u_int64_t& highest_rid,
- u_int64_t queue_id) {
+ uint64_t& highest_rid,
+ uint64_t queue_id) {
recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
this, prep_tx_list_ptr, highest_rid, queue_id);
}
@@ -164,7 +164,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
// Temporary fn to read and save last msg read from journal so it can be assigned
// in chunks. To be replaced when coding to do this direct from the journal is ready.
// Returns true if the record is extern, false if local.
- bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
+ bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0);
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
@@ -192,8 +192,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
void stop(bool block_till_aio_cmpl = false);
// Logging
- void log(qpid::qls_jrnl::log_level level, const std::string& log_stmt) const;
- void log(qpid::qls_jrnl::log_level level, const char* const log_stmt) const;
+// void log(qpid::qls_jrnl::log_level level, const std::string& log_stmt) const;
+// void log(qpid::qls_jrnl::log_level level, const char* const log_stmt) const;
// Overrides for get_events timer
qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false);
@@ -204,7 +204,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr
// AIO callbacks
virtual void wr_aio_cb(std::vector<qpid::qls_jrnl::data_tok*>& dtokl);
- virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
+ virtual void rd_aio_cb(std::vector<uint16_t>& pil);
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return _mgmtObject; }
diff --git a/qpid/cpp/src/qpid/linearstore/Log.h b/qpid/cpp/src/qpid/linearstore/Log.h
new file mode 100644
index 0000000000..b03ea7ac9d
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/Log.h
@@ -0,0 +1,30 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LEGACYSTORE_LOG_H
+#define QPID_LEGACYSTORE_LOG_H
+
+#include "qpid/log/Statement.h"
+
+#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
+#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg)
+
+#endif // QPID_LEGACYSTORE_LOG_H
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index a51ca0bf37..f4b9a40455 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -26,8 +26,8 @@
#include "qpid/linearstore/BufferValue.h"
#include "qpid/linearstore/IdDbt.h"
#include "qpid/linearstore/jrnl/txn_map.h"
+#include "qpid/linearstore/Log.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/log/Statement.h"
#include "qmf/org/apache/qpid/linearstore/Package.h"
#include "qpid/linearstore/StoreException.h"
#include <dirent.h>
@@ -42,13 +42,13 @@ namespace qpid{
namespace linearstore{
-const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the top-level store dir name
+const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name
// FIXME aconway 2010-03-09: was 10
qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms
qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
+MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t _rid,
const bool _deq_flag,
const bool _commit_flag,
const bool _tpc_flag) :
@@ -78,48 +78,19 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en
agent(0)
{}
-/*
-u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName)
+uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const std::string paramName/*, const uint16_t jrnlFsizePgs*/)
{
- if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) {
- std::ostringstream oss;
- oss << "Parameter " << paramName << ": Illegal number of store journal files (" << param << "), must be " << JRNL_MIN_NUM_FILES << " to " << JRNL_MAX_NUM_FILES << " inclusive.";
- THROW_STORE_EXCEPTION(oss.str());
- }
- return param;
-}
-*/
-
-/*
-u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks)
-{
- if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param > JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) {
- std::ostringstream oss;
- oss << "Parameter " << paramName << ": Illegal store journal file size (" << param << "), must be " << JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " to " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " inclusive.";
- THROW_STORE_EXCEPTION(oss.str());
- }
- if (wCachePgSizeSblks > param * JRNL_RMGR_PAGE_SIZE) {
- std::ostringstream oss;
- oss << "Cannot create store with file size less than write page cache size. [file size = " << param << " (" << (param * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
- THROW_STORE_EXCEPTION(oss.str());
- }
- return param;
-}
-*/
-
-u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName/*, const u_int16_t jrnlFsizePgs*/)
-{
- u_int32_t p = param;
+ uint32_t p = param;
/* if (jrnlFsizePgs == 1 && p > 64 ) {
p = 64;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")");
+ QLS_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")");
}
else*/ if (p == 0) {
// For zero value, use default
- p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
- } else if ( p > 128 || p & (p-1) ) {
+ p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
+ QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
+ } else if ( p > 128 || (p & (p-1)) ) {
// For any positive value that is not a power of 2, use closest value
if (p < 6) p = 4;
else if (p < 12) p = 8;
@@ -127,15 +98,15 @@ u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const
else if (p < 48) p = 32;
else if (p < 96) p = 64;
else p = 128;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
+ QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
}
return p;
}
-u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib)
+uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib)
{
- u_int32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
- u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+ uint32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
switch (wrPageSizeKib)
{
case 1:
@@ -153,58 +124,6 @@ u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib)
}
}
-/*
-void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts,
- bool& autoJrnlExpand,
- u_int16_t& autoJrnlExpandMaxFiles,
- const std::string& autoJrnlExpandMaxFilesParamName,
- const u_int16_t numJrnlFiles,
- const std::string& numJrnlFilesParamName)
-{
- if (!opts->autoJrnlExpand) {
- // auto-expand disabled
- autoJrnlExpand = false;
- autoJrnlExpandMaxFiles = 0;
- return;
- }
- u_int16_t p = opts->autoJrnlExpandMaxFiles;
- if (numJrnlFiles == JRNL_MAX_NUM_FILES) {
- // num-jfiles at max; disable auto-expand
- autoJrnlExpand = false;
- autoJrnlExpandMaxFiles = 0;
- QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") must be higher than parameter "
- << numJrnlFilesParamName << " (" << numJrnlFiles << ") which is at the maximum allowable value; disabling auto-expand.");
- return;
- }
- if (p > JRNL_MAX_NUM_FILES) {
- // auto-expand-max-jfiles higher than max allowable, adjust
- autoJrnlExpand = true;
- autoJrnlExpandMaxFiles = JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is above allowable maximum ("
- << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
- return;
- }
- if (p && p == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) {
- // num-jfiles is different from the default AND max-auto-expand-jfiles is still at default
- // change value of max-auto-expand-jfiles
- autoJrnlExpand = true;
- if (2 * numJrnlFiles <= JRNL_MAX_NUM_FILES) {
- autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default value ("
- << defAutoJrnlExpandMaxFiles << ") to twice that of parameter " << numJrnlFilesParamName << " (" << autoJrnlExpandMaxFiles << ").");
- } else {
- autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default to maximum allowable value ("
- << JRNL_MAX_NUM_FILES << ") because of the value of " << numJrnlFilesParamName << " (" << numJrnlFiles << ").");
- }
- return;
- }
- // No adjustments req'd, set values
- autoJrnlExpand = true;
- autoJrnlExpandMaxFiles = p;
-}
-*/
-
void MessageStoreImpl::initManagement ()
{
if (broker != 0) {
@@ -219,10 +138,10 @@ void MessageStoreImpl::initManagement ()
mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
mgmtObject->set_tplIsInitialized(false);
mgmtObject->set_tplDirectory(getTplBaseDir());
- mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE);
mgmtObject->set_tplWritePages(tplWCacheNumPages);
mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles);
- mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE);
mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles);
agent->addObject(mgmtObject, 0, true);
@@ -239,43 +158,35 @@ bool MessageStoreImpl::init(const qpid::Options* options)
{
// Extract and check options
const StoreOptions* opts = static_cast<const StoreOptions*>(options);
-// u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles");
-// u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs");
- u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"/*, jrnlFsizePgs*/);
-// u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles");
-// u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs");
- u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"/*, tplJrnlFSizePgs*/);
-// bool autoJrnlExpand;
-// u_int16_t autoJrnlExpandMaxFiles;
-// chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles");
+ uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
+ uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
- // Pass option values to init(...)
- return init(opts->storeDir, /*numJrnlFiles, jrnlFsizePgs,*/ opts->truncateFlag, jrnlWrCachePageSizeKib,
- /*tplNumJrnlFiles, tplJrnlFSizePgs,*/ tplJrnlWrCachePageSizeKib/*, autoJrnlExpand, autoJrnlExpandMaxFiles*/);
+ // Pass option values to init()
+ return init(opts->storeDir, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib);
}
// These params, taken from options, are assumed to be correct and verified
bool MessageStoreImpl::init(const std::string& dir,
- /*u_int16_t jfiles,
- u_int32_t jfileSizePgs,*/
+ /*uint16_t jfiles,
+ uint32_t jfileSizePgs,*/
const bool truncateFlag,
- u_int32_t wCachePageSizeKib,
- /*u_int16_t tplJfiles,
- u_int32_t tplJfileSizePgs,*/
- u_int32_t tplWCachePageSizeKib/*,
+ uint32_t wCachePageSizeKib,
+ /*uint16_t tplJfiles,
+ uint32_t tplJfileSizePgs,*/
+ uint32_t tplWCachePageSizeKib/*,
bool autoJExpand,
- u_int16_t autoJExpandMaxFiles*/)
+ uint16_t autoJExpandMaxFiles*/)
{
if (isInit) return true;
// Set geometry members (converting to correct units where req'd)
// numJrnlFiles = jfiles;
// jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
- wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib);
// tplNumJrnlFiles = tplJfiles;
// tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
- tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks
tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
// autoJrnlExpand = autoJExpand;
// autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
@@ -286,18 +197,18 @@ bool MessageStoreImpl::init(const std::string& dir,
else
init();
- QPID_LOG(notice, "Store module initialized; store-dir=" << dir);
-// QPID_LOG(info, "> Default files per journal: " << jfiles);
+ QLS_LOG(notice, "Store module initialized; store-dir=" << dir);
+// QLS_LOG(info, "> Default files per journal: " << jfiles);
// TODO: Uncomment these lines when auto-expand is enabled.
-// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
-// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
-// QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
- QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)");
- QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
-// QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
-// QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
- QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)");
- QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
+// QLS_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
+// if (autoJrnlExpand) QLS_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
+// QLS_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
+ QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)");
+ QLS_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
+// QLS_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
+// QLS_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
+ QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)");
+ QLS_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
return isInit;
}
@@ -311,14 +222,14 @@ void MessageStoreImpl::init()
{
closeDbs();
::usleep(1000000); // 1 sec delay
- QPID_LOG(error, "Previoius BDB store initialization failed, retrying (" << bdbRetryCnt << " of " << retryMax << ")...");
+ QLS_LOG(error, "Previoius BDB store initialization failed, retrying (" << bdbRetryCnt << " of " << retryMax << ")...");
}
try {
qpid::qls_jrnl::jdir::create_dir(getBdbBaseDir());
dbenv.reset(new DbEnv(0));
- dbenv->set_errpfx("msgstore");
+ dbenv->set_errpfx("linearstore");
dbenv->set_lg_regionmax(256000); // default = 65000
dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
@@ -357,21 +268,21 @@ void MessageStoreImpl::init()
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
{
- QPID_LOG(error, "Database environment mismatch: This version of db4 does not match that which created the store database.: " << e.what());
+ QLS_LOG(error, "Database environment mismatch: This version of db4 does not match that which created the store database.: " << e.what());
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of db4 does not match that which created the store database. "
"(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
"db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
}
- QPID_LOG(error, "BDB exception occurred while initializing store: " << e.what());
+ QLS_LOG(error, "BDB exception occurred while initializing store: " << e.what());
if (bdbRetryCnt >= retryMax)
THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e);
} catch (const StoreException&) {
throw;
} catch (const qpid::qls_jrnl::jexception& e) {
- QPID_LOG(error, "Journal Exception occurred while initializing store: " << e);
+ QLS_LOG(error, "Journal Exception occurred while initializing store: " << e);
THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what());
} catch (...) {
- QPID_LOG(error, "Unknown exception occurred while initializing store.");
+ QLS_LOG(error, "Unknown exception occurred while initializing store.");
throw;
}
} while (!isInit);
@@ -417,10 +328,10 @@ void MessageStoreImpl::truncateInit(const bool saveStoreContent)
oss << storeDir << "/" << storeTopLevelDir;
if (saveStoreContent) {
std::string dir = qpid::qls_jrnl::jdir::push_down(storeDir, storeTopLevelDir, "cluster");
- QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << ".");
+ QLS_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << ".");
} else {
qpid::qls_jrnl::jdir::delete_dir(oss.str().c_str());
- QPID_LOG(notice, "Store directory " << oss.str() << " was truncated.");
+ QLS_LOG(notice, "Store directory " << oss.str() << " was truncated.");
}
init();
}
@@ -459,13 +370,13 @@ MessageStoreImpl::~MessageStoreImpl()
try {
closeDbs();
} catch (const DbException& e) {
- QPID_LOG(error, "Error closing BDB databases: " << e.what());
+ QLS_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const qpid::qls_jrnl::jexception& e) {
- QPID_LOG(error, "Error: " << e.what());
+ QLS_LOG(error, "Error: " << e.what());
} catch (const std::exception& e) {
- QPID_LOG(error, "Error: " << e.what());
+ QLS_LOG(error, "Error: " << e.what());
} catch (...) {
- QPID_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()");
+ QLS_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()");
}
if (mgmtObject.get() != 0) {
@@ -484,24 +395,22 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
JournalImpl* jQueue = 0;
qpid::framing::FieldTable::ValuePtr value;
-/*
- u_int16_t localFileCount = numJrnlFiles;
- bool localAutoExpandFlag = autoJrnlExpand;
- u_int16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles;
- u_int32_t localFileSizeSblks = jrnlFsizeSblks;
-
- value = args.get("qpid.file_count");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileCount = chkJrnlNumFilesParam((u_int16_t) value->get<int>(), "qpid.file_count");
-
- value = args.get("qpid.file_size");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE;
-*/
+// uint16_t localFileCount = numJrnlFiles;
+// bool localAutoExpandFlag = autoJrnlExpand;
+// uint16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles;
+// uint32_t localFileSizeSblks = jrnlFsizeSblks;
+//
+// value = args.get("qpid.file_count");
+// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+// localFileCount = chkJrnlNumFilesParam((uint16_t) value->get<int>(), "qpid.file_count");
+//
+// value = args.get("qpid.file_size");
+// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+// localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE;
if (queue.getName().size() == 0)
{
- QPID_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
+ QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
return;
}
@@ -513,15 +422,13 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
journalList[queue.getName()]=jQueue;
}
-/*
- value = args.get("qpid.auto_expand");
- if (value.get() != 0 && !value->empty() && value->convertsTo<bool>())
- localAutoExpandFlag = (bool) value->get<bool>();
-
- value = args.get("qpid.auto_expand_max_jfiles");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localAutoExpandMaxFileCount = (u_int16_t) value->get<int>();
-*/
+// value = args.get("qpid.auto_expand");
+// if (value.get() != 0 && !value->empty() && value->convertsTo<bool>())
+// localAutoExpandFlag = (bool) value->get<bool>();
+//
+// value = args.get("qpid.auto_expand_max_jfiles");
+// if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+// localAutoExpandMaxFileCount = (uint16_t) value->get<int>();
queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
@@ -606,7 +513,7 @@ bool MessageStoreImpl::create(db_ptr db,
IdSequence& seq,
const qpid::broker::Persistable& p)
{
- u_int64_t id (seq.next());
+ uint64_t id (seq.next());
Dbt key(&id, sizeof(id));
BufferValue value (p);
@@ -774,7 +681,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
Cursor queues;
queues.open(queueDb, txn.get());
- u_int64_t maxQueueId(1);
+ uint64_t maxQueueId(1);
IdDbt key;
Dbt value;
@@ -790,7 +697,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
JournalImpl* jQueue = 0;
if (queueName.size() == 0)
{
- QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
+ QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
break;
}
jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"),
@@ -806,7 +713,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
{
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
- u_int64_t thisHighestRid = 0ULL;
+ uint64_t thisHighestRid = 0ULL;
jQueue->recover(/*numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks,*/ wCacheNumPages,
wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
@@ -817,11 +724,11 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
qpid::framing::FieldTable::ValuePtr value;
value = storeargs.get("qpid.file_count");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int16_t)value->get<int>() != jQueue->num_jfiles()) {
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) {
queue->addArgument("qpid.file_count", jQueue->num_jfiles());
}
value = storeargs.get("qpid.file_size");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
}
*/
@@ -831,7 +738,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
- QPID_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
+ QLS_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
jQueue->recover_complete(); // start journal.
} catch (const qpid::qls_jrnl::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -845,7 +752,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
// NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
// the messageIdSequence is used for both queue journals and the tpl journal.
messageIdSequence.reset(highestRid + 1);
- QPID_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec);
+ QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec);
queueIdSequence.reset(maxQueueId + 1);
}
@@ -859,7 +766,7 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
Cursor exchanges;
exchanges.open(exchangeDb, txn.get());
- u_int64_t maxExchangeId(1);
+ uint64_t maxExchangeId(1);
IdDbt key;
Dbt value;
//read all exchanges
@@ -871,7 +778,7 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
//set the persistenceId and update max as required
exchange->setPersistenceId(key.id);
index[key.id] = exchange;
- QPID_LOG(info, "Recovered exchange \"" << exchange->getName() << '"');
+ QLS_LOG(info, "Recovered exchange \"" << exchange->getName() << '"');
}
maxExchangeId = std::max(key.id, maxExchangeId);
}
@@ -890,7 +797,7 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn,
while (bindings.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
- QPID_LOG(error, "Not enough data for binding: " << buffer.available());
+ QLS_LOG(error, "Not enough data for binding: " << buffer.available());
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
@@ -905,12 +812,12 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn,
if (exchange != exchanges.end() && queue != queues.end()) {
//could use the recoverable queue here rather than the name...
exchange->second->bind(queueName, routingkey, args);
- QPID_LOG(info, "Recovered binding exchange=" << exchange->second->getName()
+ QLS_LOG(info, "Recovered binding exchange=" << exchange->second->getName()
<< " key=" << routingkey
<< " queue=" << queueName);
} else {
//stale binding, delete it
- QPID_LOG(warning, "Deleting stale binding");
+ QLS_LOG(warning, "Deleting stale binding");
bindings->del(0);
}
}
@@ -922,7 +829,7 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn,
Cursor items;
items.open(generalDb, txn.get());
- u_int64_t maxGeneralId(1);
+ uint64_t maxGeneralId(1);
IdDbt key;
Dbt value;
//read all items
@@ -945,7 +852,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
long& rcnt,
long& idcnt)
{
- size_t preambleLength = sizeof(u_int32_t)/*header size*/;
+ size_t preambleLength = sizeof(uint32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtok;
@@ -994,8 +901,8 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
// Reset the TTL for the recovered message
msg->computeExpiration(broker->getExpiryPolicy());
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
+ uint32_t contentOffset = headerSize + preambleLength;
+ uint64_t contentSize = readSize - contentOffset;
if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
@@ -1007,7 +914,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
rcnt++;
queue->recover(msg);
} else {
- u_int64_t rid = dtok.rid();
+ uint64_t rid = dtok.rid();
std::string xid(i->xid);
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
@@ -1094,7 +1001,7 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn,
int count(0);
for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) {
if (index.find(value.id) == index.end()) {
- QPID_LOG(warning, "Recovered message for queue that no longer exists");
+ QLS_LOG(warning, "Recovered message for queue that no longer exists");
mappings->del(0);
} else {
qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id];
@@ -1139,7 +1046,7 @@ void MessageStoreImpl::readTplStore()
if (!txnList.empty()) { // xid found in tmap
unsigned enqCnt = 0;
unsigned deqCnt = 0;
- u_int64_t rid = 0;
+ uint64_t rid = 0;
// Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
// Note: will apply to both 1PC and 2PC transactions.
@@ -1185,7 +1092,7 @@ void MessageStoreImpl::readTplStore()
void MessageStoreImpl::recoverTplStore()
{
if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
- u_int64_t thisHighestRid = 0ULL;
+ uint64_t thisHighestRid = 0ULL;
tplStorePtr->recover(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
@@ -1248,11 +1155,11 @@ void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::brok
void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue,
const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
std::string& data,
- u_int64_t offset,
- u_int32_t length)
+ uint64_t offset,
+ uint32_t length)
{
checkInit();
- u_int64_t messageId (msg->getPersistenceId());
+ uint64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
try {
@@ -1297,8 +1204,8 @@ void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt,
const qpid::broker::PersistableQueue& queue)
{
checkInit();
- u_int64_t queueId (queue.getPersistenceId());
- u_int64_t messageId (msg->getPersistenceId());
+ uint64_t queueId (queue.getPersistenceId());
+ uint64_t messageId (msg->getPersistenceId());
if (queueId == 0) {
THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
}
@@ -1323,10 +1230,10 @@ void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt,
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
-u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message)
+uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message)
{
- u_int32_t headerSize = message->encodedHeaderSize();
- u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
+ uint32_t headerSize = message->encodedHeaderSize();
+ uint64_t size = message->encodedSize() + sizeof(uint32_t);
try { buff = std::vector<char>(size); } // long + headers + content
catch (const std::exception& e) {
std::ostringstream oss;
@@ -1345,7 +1252,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue,
bool /*newId*/)
{
std::vector<char> buff;
- u_int64_t size = msgEncode(buff, message);
+ uint64_t size = msgEncode(buff, message);
try {
if (queue) {
@@ -1375,8 +1282,8 @@ void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt,
const qpid::broker::PersistableQueue& queue)
{
checkInit();
- u_int64_t queueId (queue.getPersistenceId());
- u_int64_t messageId (msg->getPersistenceId());
+ uint64_t queueId (queue.getPersistenceId());
+ uint64_t messageId (msg->getPersistenceId());
if (queueId == 0) {
THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)");
}
@@ -1429,7 +1336,7 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt,
}
}
-u_int32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
+uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
{
checkInit();
return 0;
@@ -1458,7 +1365,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn,
mgmtObject->inc_tplTxnAborts();
}
} catch (const std::exception& e) {
- QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
+ QLS_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
throw;
}
}
@@ -1509,7 +1416,7 @@ void MessageStoreImpl::localPrepare(TxnCtxt* ctxt)
mgmtObject->inc_tplTxnPrepares();
}
} catch (const std::exception& e) {
- QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
+ QLS_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
throw;
}
}
@@ -1579,7 +1486,7 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue
uint64_t queueId = buffer.getLongLong();
if (queue.getPersistenceId() == queueId) {
bindings->del(0);
- QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
}
}
}
@@ -1591,7 +1498,7 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue
txn.abort();
throw;
}
- QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
+ QLS_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
}
void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange,
@@ -1621,7 +1528,7 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex
buffer.getShortString(k);
if (bkey == k) {
bindings->del(0);
- QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
}
}
}
@@ -1662,10 +1569,10 @@ std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& q
return getJrnlHashDir(queue.getName().c_str());
}
-u_int32_t MessageStoreImpl::bHash(const std::string str)
+uint32_t MessageStoreImpl::bHash(const std::string str)
{
// Daniel Bernstein hash fn
- u_int32_t h = 0;
+ uint32_t h = 0;
for (std::string::const_iterator i = str.begin(); i < str.end(); i++)
h = 33*h + *i;
return h;
@@ -1689,42 +1596,16 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j) {
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
qpid::Options(name),
- /*numJrnlFiles(defNumJrnlFiles),
- autoJrnlExpand(defAutoJrnlExpand),
- autoJrnlExpandMaxFiles(defAutoJrnlExpandMaxFiles),
- jrnlFsizePgs(defJrnlFileSizePgs),*/
truncateFlag(defTruncateFlag),
wCachePageSizeKib(defWCachePageSize),
- /*tplNumJrnlFiles(defTplNumJrnlFiles),
- tplJrnlFsizePgs(defTplJrnlFileSizePgs),*/
- tplWCachePageSizeKib(defTplWCachePageSize)
+ tplWCachePageSizeKib(defTplWCachePageSize),
+ efpPartition(defEfpPartition),
+ efpFileSize(defEfpFileSize)
{
-/*
- std::ostringstream oss1;
- oss1 << "Default number of files for each journal instance (queue). [Allowable values: " <<
- JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]";
- std::ostringstream oss2;
- oss2 << "Default size for each journal file in multiples of read pages (1 read page = 64KiB). [Allowable values: " <<
- JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]";
- std::ostringstream oss3;
- oss3 << "Number of files for transaction prepared list journal instance. [Allowable values: " <<
- JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]";
- std::ostringstream oss4;
- oss4 << "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64KiB) [Allowable values: " <<
- JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]";
-*/
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Required if --no-data-dir is also used.")
-// ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str())
-// ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str())
-// TODO: Uncomment these lines when auto-expand is enabled.
-// ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"),
-// "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. "
-// "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
-// ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
-// "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
("truncate", qpid::optValue(truncateFlag, "yes|no"),
"If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve "
"the existing store files for recovery.")
@@ -1732,12 +1613,14 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
"Size of the pages in the write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
"Lower values decrease latency at the expense of throughput.")
-// ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str())
-// ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str())
("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"),
"Size of the pages in the transaction prepared list write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
"Lower values decrease latency at the expense of throughput.")
+ ("efp-partition", qpid::optValue(efpPartition, "N"),
+ "Empty File Pool partition to use for finding empty journal files")
+ ("efp-file-size", qpid::optValue(efpFileSize, "N"),
+ "Empty File Pool file size to use for journal files")
;
}
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 26e4962ead..5d5fa28ff8 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -60,35 +60,31 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
typedef boost::shared_ptr<DbEnv> dbEnv_ptr;
struct StoreOptions : public qpid::Options {
- StoreOptions(const std::string& name="Store Options");
+ StoreOptions(const std::string& name="Linear Store Options");
std::string clusterName;
std::string storeDir;
-// u_int16_t numJrnlFiles;
-// bool autoJrnlExpand;
-// u_int16_t autoJrnlExpandMaxFiles;
-// u_int32_t jrnlFsizePgs;
- bool truncateFlag;
- u_int32_t wCachePageSizeKib;
-// u_int16_t tplNumJrnlFiles;
-// u_int32_t tplJrnlFsizePgs;
- u_int32_t tplWCachePageSizeKib;
+ bool truncateFlag;
+ uint32_t wCachePageSizeKib;
+ uint32_t tplWCachePageSizeKib;
+ uint16_t efpPartition;
+ uint64_t efpFileSize;
};
protected:
- typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
+ typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
+ typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
+ typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
// Structs for Transaction Recover List (TPL) recover state
struct TplRecoverStruct {
- u_int64_t rid; // rid of TPL record
+ uint64_t rid; // rid of TPL record
bool deq_flag;
bool commit_flag;
bool tpc_flag;
- TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
+ TplRecoverStruct(const uint64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
};
typedef TplRecoverStruct TplRecover;
typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
@@ -99,16 +95,11 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
typedef JournalListMap::iterator JournalListMapItr;
// Default store settings
-// static const u_int16_t defNumJrnlFiles = 8;
-// static const u_int32_t defJrnlFileSizePgs = 24;
- static const bool defTruncateFlag = false;
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
-// static const u_int16_t defTplNumJrnlFiles = 8;
-// static const u_int32_t defTplJrnlFileSizePgs = 24;
- static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8;
- // TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line
-// static const bool defAutoJrnlExpand = false;
-// static const u_int16_t defAutoJrnlExpandMaxFiles = 0;
+ static const bool defTruncateFlag = false;
+ static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024;
+ static const uint32_t defTplWCachePageSize = defWCachePageSize / 8;
+ static const uint16_t defEfpPartition = 0;
+ static const uint64_t defEfpFileSize = 512 * JRNL_SBLK_SIZE;
static const std::string storeTopLevelDir;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -136,18 +127,18 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
IdSequence generalIdSequence;
IdSequence messageIdSequence;
std::string storeDir;
- u_int16_t numJrnlFiles;
+ uint16_t numJrnlFiles;
bool autoJrnlExpand;
- u_int16_t autoJrnlExpandMaxFiles;
- u_int32_t jrnlFsizeSblks;
+ uint16_t autoJrnlExpandMaxFiles;
+ uint32_t jrnlFsizeSblks;
bool truncateFlag;
- u_int32_t wCachePgSizeSblks;
- u_int16_t wCacheNumPages;
- u_int16_t tplNumJrnlFiles;
- u_int32_t tplJrnlFsizeSblks;
- u_int32_t tplWCachePgSizeSblks;
- u_int16_t tplWCacheNumPages;
- u_int64_t highestRid;
+ uint32_t wCachePgSizeSblks;
+ uint16_t wCacheNumPages;
+ uint16_t tplNumJrnlFiles;
+ uint32_t tplJrnlFsizeSblks;
+ uint32_t tplWCachePgSizeSblks;
+ uint16_t tplWCacheNumPages;
+ uint64_t highestRid;
bool isInit;
const char* envPath;
qpid::broker::Broker* broker;
@@ -157,21 +148,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// Parameter validation and calculation
-// static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
-// const std::string paramName);
-// static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
-// const std::string paramName,
-// const u_int32_t wCachePgSizeSblks = 0);
- static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param,
+ static uint32_t chkJrnlWrPageCacheSize(const uint32_t param,
const std::string paramName/*,
- const u_int16_t jrnlFsizePgs*/);
- static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib);
-// void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts,
-// bool& autoJrnlExpand,
-// u_int16_t& autoJrnlExpandMaxFiles,
-// const std::string& autoJrnlExpandMaxFilesParamName,
-// const u_int16_t numJrnlFiles,
-// const std::string& numJrnlFilesParamName);
+ const uint16_t jrnlFsizePgs*/);
+ static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKib);
void init();
@@ -213,7 +193,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
void recoverTplStore();
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- u_int64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
+ uint64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
void store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
@@ -245,7 +225,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- u_int32_t bHash(const std::string str);
+ uint32_t bHash(const std::string str);
std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
std::string getJrnlHashDir(const std::string& queueName);
std::string getJrnlBaseDir();
@@ -280,15 +260,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
bool init(const qpid::Options* options);
bool init(const std::string& dir,
- /*u_int16_t jfiles = defNumJrnlFiles,
- u_int32_t jfileSizePgs = defJrnlFileSizePgs,*/
const bool truncateFlag = false,
- u_int32_t wCachePageSize = defWCachePageSize,
- /*u_int16_t tplJfiles = defTplNumJrnlFiles,
- u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,*/
- u_int32_t tplWCachePageSize = defTplWCachePageSize/*,
- bool autoJExpand = defAutoJrnlExpand,
- u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles*/);
+ uint32_t wCachePageSize = defWCachePageSize,
+ uint32_t tplWCachePageSize = defTplWCachePageSize);
void truncateInit(const bool saveStoreContent = false);
@@ -345,7 +319,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
void flush(const qpid::broker::PersistableQueue& queue);
- u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+ uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
void collectPreparedXids(std::set<std::string>& xids);
@@ -364,7 +338,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
- inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&)
+ inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&)
{ return qpid::management::Manageable::STATUS_OK; }
std::string getStoreDir() const;
diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
index 74b93abfde..9888342579 100644
--- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
+++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h
@@ -25,6 +25,7 @@
#include <list>
#include <map>
#include <set>
+#include <stdint.h>
#include <string>
#include <boost/shared_ptr.hpp>
#include <boost/ptr_container/ptr_list.hpp>
@@ -32,8 +33,8 @@
namespace qpid{
namespace linearstore{
-typedef u_int64_t queue_id;
-typedef u_int64_t message_id;
+typedef uint64_t queue_id;
+typedef uint64_t message_id;
class LockedMappings
{
diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
index 98b2275c96..840468d8bd 100644
--- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
+++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
@@ -23,7 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/DataDir.h"
-#include "qpid/log/Statement.h"
+#include "qpid/linearstore/Log.h"
#include "qpid/linearstore/MessageStoreImpl.h"
using qpid::linearstore::MessageStoreImpl;
@@ -49,7 +49,7 @@ struct StorePlugin : public Plugin {
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
- throw Exception ("msgstore: If --data-dir is blank or --no-data-dir is specified, --store-dir must be present.");
+ throw Exception ("linearstore: If broker option --data-dir is blank or --no-data-dir is specified, linearstore option --store-dir must be present.");
options.storeDir = dataDir.getPath ();
}
@@ -64,7 +64,7 @@ struct StorePlugin : public Plugin {
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
if (!store) return;
- QPID_LOG(info, "Enabling management instrumentation for the store.");
+ QLS_LOG(info, "Enabling management instrumentation.");
store->initManagement();
}
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
index 4084fa992d..022d9c13e2 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
@@ -78,15 +78,15 @@ TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTok
// // Human-readable tid: 53 bytes
// // uuit_t is a char[16]
// tid.reserve(53);
-// u_int64_t* u1 = (u_int64_t*)uuid;
-// u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t));
+// uint64_t* u1 = (uint64_t*)uuid;
+// uint64_t* u2 = (uint64_t*)(uuid + sizeof(uint64_t));
// std::stringstream s;
// s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2;
// tid.assign(s.str());
// Binary tid: 24 bytes
tid.reserve(24);
- u_int64_t c = uuidSeq.next();
+ uint64_t c = uuidSeq.next();
tid.append((char*)&c, sizeof(c));
tid.append((char*)&uuid, sizeof(uuid));
}
@@ -172,7 +172,7 @@ DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); }
void TxnCtxt::incrDtokRef() { dtokp->addRef(); }
-void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) {
+void TxnCtxt::recoverDtok(const uint64_t rid, const std::string xid) {
dtokp->set_rid(rid);
dtokp->set_wstate(DataTokenImpl::ENQ);
dtokp->set_xid(xid);
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
index f66b9fd74f..4c009d7a94 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
@@ -95,7 +95,7 @@ class TxnCtxt : public qpid::broker::TransactionContext
bool impactedQueuesEmpty();
DataTokenImpl* getDtok();
void incrDtokRef();
- void recoverDtok(const u_int64_t rid, const std::string xid);
+ void recoverDtok(const uint64_t rid, const std::string xid);
};
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
index f4048fd3d3..fc44e35331 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
@@ -49,7 +49,8 @@
* (The disk softblock size is 512 for Linux kernels >= 2.6)
*/
#define JRNL_DBLK_SIZE 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
-#define JRNL_SBLK_SIZE 32 /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
+#define JRNL_SBLK_SIZE_DBLKS 32 /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
+#define JRNL_SBLK_SIZE JRNL_SBLK_SIZE_DBLKS * JRNL_DBLK_SIZE /**< Disk softblock size in bytes */
//#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr)
//#define JRNL_MAX_FILE_SIZE 4194176 ///< Max. jrnl file size in sblks (excl. file_hdr)
//#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
index cabb43191a..552aa92b9c 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
@@ -644,7 +644,7 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, const std::vector<std::string>* /*prep_txn_
}
// Check for file full condition - add one to _jfsize_sblks to account for file header
- rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
+ rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE;
// Check for journal full condition
uint16_t next_wr_fid = (rd._lfid + 1) % rd._njf;
@@ -893,7 +893,7 @@ jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& r
// assert(fhdr._lfid == fid);
if (!rd._fro)
rd._fro = fhdr._fro;
- std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+ std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE;
ifsp->seekg(foffs);
}
else
@@ -938,14 +938,14 @@ jcntl::check_owi(const uint16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::st
void
jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcvdat& rd)
{
- unsigned sblk_offs = file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+ unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE;
if (sblk_offs)
{
{
std::ostringstream oss;
oss << std::hex << "Bad record alignment found at fid=0x" << fid;
oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
- oss << (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
+ oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
this->log(LOG_WARN, oss.str());
}
const uint32_t xmagic = QLS_EMPTY_MAGIC;
@@ -965,7 +965,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv
// clear should inspection of the file be required.
std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
- while (file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
+ while (file_pos % JRNL_SBLK_SIZE)
{
ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
assert(!ofsp.fail());
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
index e1253ce9c4..59c5df2ff2 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
@@ -150,7 +150,7 @@ namespace qls_jrnl
static inline uint32_t size_dblks(const std::size_t size)
{ return size_blks(size, JRNL_DBLK_SIZE); }
static inline uint32_t size_sblks(const std::size_t size)
- { return size_blks(size, JRNL_DBLK_SIZE * JRNL_SBLK_SIZE); }
+ { return size_blks(size, JRNL_SBLK_SIZE); }
static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
{ return (size + blksize - 1)/blksize; }
virtual uint64_t rid() const = 0;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
index 7f9d0aed98..40611692cf 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
@@ -38,6 +38,7 @@ namespace qls_jrnl
pmgr::page_cb::page_cb(uint16_t index):
_index(index),
_state(UNUSED),
+ _frid(0),
_wdblks(0),
_rdblks(0),
_pdtokl(0),
@@ -63,7 +64,7 @@ pmgr::page_cb::state_str() const
return "<unknown>";
}
-const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
+const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE;
pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
_cache_pgsize_sblks(0),
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
index f49066c38d..b6d0f0f2ad 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
@@ -266,7 +266,7 @@ rmgr::get_events(page_state /*state*/, timespec* const timeout, bool flush)
{
// TODO: replace for linear store: _rfh
/*
- if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE) // Detects if write reset of this fcntl obj has occurred.
+ if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE_DBLKS) // Detects if write reset of this fcntl obj has occurred.
{
// Increment the completed read offset
// NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
@@ -281,15 +281,15 @@ rmgr::get_events(page_state /*state*/, timespec* const timeout, bool flush)
else // File header reads have no pcb
{
std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr_t));
- /*_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);*/ // TODO: replace for linear store: _rrfc
+ /*_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);*/ // TODO: replace for linear store: _rrfc
- uint32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+ uint32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE_DBLKS;
// Check fro_dblks does not exceed the write pointers which can happen in some corrupted journal recoveries
// TODO: replace for linear store: _fhdr._pfid, _rrfc
-// if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE)
-// fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE;
- _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
- uint32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+// if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE_DBLKS)
+// fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE_DBLKS;
+ _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS);
+ uint32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS;
_pg_index = _pg_cntr % JRNL_RMGR_PAGES;
_pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
// _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
@@ -601,16 +601,16 @@ rmgr::init_aio_reads(const int16_t /*first_uninit*/, const uint16_t /*num_uninit
if (_rrfc.subm_offs() == 0)
{
- _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
- _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+ _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
+ _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
}
// TODO: Future perf improvement: Do a single AIO read for all available file
// space into all contiguous empty pages in one AIO operation.
uint32_t file_rem_dblks = _rrfc.remaining_dblks();
- file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary
- uint32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+ file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE_DBLKS; // round down to closest sblk boundary
+ uint32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS;
uint32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
if (rd_size)
{
@@ -618,7 +618,7 @@ rmgr::init_aio_reads(const int16_t /*first_uninit*/, const uint16_t /*num_uninit
// TODO: For perf, combine contiguous pages into single read
// 1 or 2 AIOs needed depending on whether read block folds
aio_cb* aiocbp = &_aio_cb_arr[pi];
- aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
+ aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE_DBLKS, _rrfc.subm_offs());
if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
_rrfc.add_subm_cnt_dblks(rd_size);
@@ -640,7 +640,7 @@ rmgr::rotate_page()
{
_page_cb_arr[_pg_index]._rdblks = 0;
_page_cb_arr[_pg_index]._state = UNUSED;
- if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS)
{
_pg_offset_dblks = 0;
_pg_cntr++;
@@ -682,7 +682,7 @@ rmgr::init_file_header_read()
if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
_aio_evt_rem++;
- _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+ _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
_fhdr_rd_outstanding = true;
*/
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
index 8ea129a945..ccc9bf6bab 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
@@ -94,13 +94,13 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks,
initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
- _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE;
+ _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS;
_jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
if (eo)
{
- const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE;
+ const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS;
uint32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr
_pg_cntr = data_dblks / wr_pg_size_dblks;
_pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
@@ -154,11 +154,11 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
// TODO: replace for linearstore: _wrfc
@@ -259,11 +259,11 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
// TODO: replace for linearstore: _wrfc
@@ -361,11 +361,11 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
// TODO: replace for linearstore: _wrfc
@@ -453,11 +453,11 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
// TODO: replace for linearstore: _wrfc
@@ -545,10 +545,10 @@ wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint3
if (cont)
{
if (file_fit && !file_full)
- fro = (rec_dblks_rem + JRNL_SBLK_SIZE) * JRNL_DBLK_SIZE;
+ fro = (rec_dblks_rem + JRNL_SBLK_SIZE_DBLKS) * JRNL_DBLK_SIZE;
}
else
- fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
+ fro = JRNL_SBLK_SIZE;
write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc
}
*/
@@ -558,7 +558,7 @@ void
wmgr::flush_check(iores& res, bool& cont, bool& done)
{
// Is page is full, flush
- if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
{
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
@@ -814,7 +814,7 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf;
uint32_t lfid = fhp->_lfid;
fcntl* fcntlp = _jc->get_fcntlp(lfid);
- fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+ fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
fcntlp->decr_aio_cnt();
fcntlp->set_wr_fhdr_aio_outstanding(false);
*/
@@ -971,7 +971,7 @@ void
wmgr::dblk_roundup()
{
const uint32_t xmagic = QLS_EMPTY_MAGIC;
- uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE) * JRNL_SBLK_SIZE;
+ uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS;
while (_cached_offset_dblks < wdblks)
{
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
@@ -1000,7 +1000,7 @@ wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro)
_aio_evt_rem++;
// TODO: replace for linearstore: _wrfc
/*
- _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+ _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);
_wrfc.incr_aio_cnt();
_wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true);
*/
@@ -1010,7 +1010,7 @@ void
wmgr::rotate_page()
{
_page_cb_arr[_pg_index]._state = AIO_PENDING;
- if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS)
{
_pg_offset_dblks = 0;
_pg_cntr++;