summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-08-21 17:42:23 +0000
committerKim van der Riet <kpvdr@apache.org>2013-08-21 17:42:23 +0000
commit341696dbb845dc620e2c916b30e312e7f47c3388 (patch)
tree462cacee4843aa64dc917e6c0e76de2e4d945c40
parent7cfb37d4117ed1726795cd5e30f5a11ea1a555c9 (diff)
downloadqpid-python-341696dbb845dc620e2c916b30e312e7f47c3388.tar.gz
QPID-4984: WIP - compiles, but not functional.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1516229 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/BindingDbt.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/BufferValue.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/DataTokenImpl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdDbt.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/IdSequence.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.cpp59
-rw-r--r--qpid/cpp/src/qpid/linearstore/JournalImpl.h32
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp116
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h80
-rw-r--r--qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/StoreException.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/StorePlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp6
-rw-r--r--qpid/cpp/src/qpid/linearstore/TxnCtxt.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp20
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h10
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c9
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp122
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/management-schema.xml2
25 files changed, 277 insertions, 222 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 3a02b516a2..dcbb17d1e0 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -210,6 +210,7 @@ execute_process(COMMAND ${RUBY_EXECUTABLE} -I ${rgen_dir} ${rgen_dir}/generate $
${CMAKE_CURRENT_SOURCE_DIR}/qpid/acl/management-schema.xml
${CMAKE_CURRENT_SOURCE_DIR}/qpid/ha/management-schema.xml
${CMAKE_CURRENT_SOURCE_DIR}/qpid/legacystore/management-schema.xml
+ ${CMAKE_CURRENT_SOURCE_DIR}/qpid/linearstore/management-schema.xml
)
set(mgen_dir ${qpid-cpp_SOURCE_DIR}/managementgen)
set(regen_mgmt OFF)
diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
index c92c9828f4..3b42c7bd5d 100644
--- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
@@ -115,7 +115,7 @@ u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const
// For zero value, use default
p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
- } else if ( p > 128 || p & (p-1) ) {
+ } else if ( p > 128 || (p & (p-1)) ) {
// For any positive value that is not a power of 2, use closest value
if (p < 6) p = 4;
else if (p < 12) p = 8;
diff --git a/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp b/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp
index a48c156e71..d50009c82e 100644
--- a/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/legacystore/BindingDbt.h"
+#include "qpid/linearstore/BindingDbt.h"
namespace mrg {
namespace msgstore {
diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
index fb2c471cd7..4bed7d426a 100644
--- a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
+++ b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/legacystore/BufferValue.h"
+#include "qpid/linearstore/BufferValue.h"
namespace mrg {
namespace msgstore {
diff --git a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp
index 796d4c02f0..c16466b05e 100644
--- a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/legacystore/DataTokenImpl.h"
+#include "qpid/linearstore/DataTokenImpl.h"
using namespace mrg::msgstore;
diff --git a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
index e01d471e1b..3c8b6cbd12 100644
--- a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h
@@ -22,7 +22,7 @@
#ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H
#define QPID_LEGACYSTORE_DATATOKENIMPL_H
-#include "qpid/legacystore/jrnl/data_tok.h"
+#include "qpid/linearstore/jrnl/data_tok.h"
#include "qpid/broker/PersistableMessage.h"
#include <boost/intrusive_ptr.hpp>
diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
index d9edaf80e6..2308d074bc 100644
--- a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/legacystore/IdDbt.h"
+#include "qpid/linearstore/IdDbt.h"
using namespace mrg::msgstore;
diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
index 975b1107e7..9034a11fd2 100644
--- a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
+++ b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/legacystore/IdSequence.h"
+#include "qpid/linearstore/IdSequence.h"
using namespace mrg::msgstore;
using qpid::sys::Mutex;
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
index ba3f2aecae..1cc9f3f08c 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
@@ -19,25 +19,25 @@
*
*/
-#include "qpid/legacystore/JournalImpl.h"
+#include "qpid/linearstore/JournalImpl.h"
-#include "qpid/legacystore/jrnl/jerrno.h"
-#include "qpid/legacystore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-#include "qmf/org/apache/qpid/legacystore/ArgsJournalExpand.h"
-#include "qmf/org/apache/qpid/legacystore/EventCreated.h"
-#include "qmf/org/apache/qpid/legacystore/EventEnqThresholdExceeded.h"
-#include "qmf/org/apache/qpid/legacystore/EventFull.h"
-#include "qmf/org/apache/qpid/legacystore/EventRecovered.h"
+#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h"
+#include "qmf/org/apache/qpid/linearstore/EventCreated.h"
+#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h"
+#include "qmf/org/apache/qpid/linearstore/EventFull.h"
+#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
-#include "qpid/legacystore/StoreException.h"
+#include "qpid/linearstore/StoreException.h"
using namespace mrg::msgstore;
using namespace mrg::journal;
using qpid::management::ManagementAgent;
-namespace _qmf = qmf::org::apache::qpid::legacystore;
+namespace _qmf = qmf::org::apache::qpid::linearstore;
InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
@@ -132,22 +132,24 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
void
-JournalImpl::initialize(const u_int16_t num_jfiles,
+JournalImpl::initialize(/*const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
+ const u_int32_t jfsize_sblks,*/
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
mrg::journal::aio_callback* const cbp)
{
std::ostringstream oss;
- oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+ oss << "Initialize;";
oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss.str());
- jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp);
+ jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp);
log(LOG_DEBUG, "Initialization complete");
-
+ // TODO: replace for linearstore: _lpmgr
+/*
if (_mgmtObject.get() != 0)
{
_mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
@@ -159,15 +161,16 @@ JournalImpl::initialize(const u_int16_t num_jfiles,
_mgmtObject->set_writePages(wcache_num_pages);
}
if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()),
+ _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()),
qpid::management::ManagementAgent::SEV_NOTE);
+*/
}
void
-JournalImpl::recover(const u_int16_t num_jfiles,
+JournalImpl::recover(/*const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
+ const u_int32_t jfsize_sblks,*/
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
mrg::journal::aio_callback* const cbp,
@@ -176,12 +179,14 @@ JournalImpl::recover(const u_int16_t num_jfiles,
u_int64_t queue_id)
{
std::ostringstream oss1;
- oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+// oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+ oss1 << "Recover;";
oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
-
+ // TODO: replace for linearstore: _lpmgr
+/*
if (_mgmtObject.get() != 0)
{
_mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
@@ -192,6 +197,7 @@ JournalImpl::recover(const u_int16_t num_jfiles,
_mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
+*/
if (prep_tx_list_ptr) {
// Create list of prepared xids
@@ -200,10 +206,10 @@ JournalImpl::recover(const u_int16_t num_jfiles,
prep_xid_list.push_back(i->xid);
}
- jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
cbp, &prep_xid_list, highest_rid);
} else {
- jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
cbp, 0, highest_rid);
}
@@ -242,9 +248,12 @@ JournalImpl::recover_complete()
{
jcntl::recover_complete();
log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
+ // TODO: replace for linearstore: _lpmgr
+/*
if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(),
+ _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(),
_emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE);
+*/
}
//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
@@ -589,7 +598,7 @@ JournalImpl::handleIoResult(const iores r)
oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\".";
log(LOG_WARN, oss.str());
if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"),
+ _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"),
qpid::management::ManagementAgent::SEV_WARN);
THROW_STORE_FULL_EXCEPTION(oss.str());
}
@@ -599,7 +608,7 @@ JournalImpl::handleIoResult(const iores r)
oss << "Journal full on queue \"" << _jid << "\".";
log(LOG_CRITICAL, oss.str());
if (_agent != 0)
- _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
+ _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
THROW_STORE_FULL_EXCEPTION(oss.str());
}
default:
diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
index 7227b2ffd4..1beb469b3f 100644
--- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h
@@ -23,17 +23,17 @@
#define QPID_LEGACYSTORE_JOURNALIMPL_H
#include <set>
-#include "qpid/legacystore/jrnl/enums.h"
-#include "qpid/legacystore/jrnl/jcntl.h"
-#include "qpid/legacystore/DataTokenImpl.h"
-#include "qpid/legacystore/PreparedTransaction.h"
+#include "qpid/linearstore/jrnl/enums.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/DataTokenImpl.h"
+#include "qpid/linearstore/PreparedTransaction.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/sys/Timer.h"
#include "qpid/sys/Time.h"
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/intrusive_ptr.hpp>
#include "qpid/management/Manageable.h"
-#include "qmf/org/apache/qpid/legacystore/Journal.h"
+#include "qmf/org/apache/qpid/linearstore/Journal.h"
namespace qpid { namespace sys {
class Timer;
@@ -98,7 +98,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
bool _external;
qpid::management::ManagementAgent* _agent;
- qmf::org::apache::qpid::legacystore::Journal::shared_ptr _mgmtObject;
+ qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
DeleteCallback deleteCallback;
public:
@@ -116,28 +116,28 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
void initManagement(qpid::management::ManagementAgent* agent);
- void initialize(const u_int16_t num_jfiles,
+ void initialize(/*const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
+ const u_int32_t jfsize_sblks,*/
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
mrg::journal::aio_callback* const cbp);
- inline void initialize(const u_int16_t num_jfiles,
+ inline void initialize(/*const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
+ const u_int32_t jfsize_sblks,*/
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks) {
- initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
this);
}
- void recover(const u_int16_t num_jfiles,
+ void recover(/*const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
+ const u_int32_t jfsize_sblks,*/
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
mrg::journal::aio_callback* const cbp,
@@ -145,16 +145,16 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
u_int64_t& highest_rid,
u_int64_t queue_id);
- inline void recover(const u_int16_t num_jfiles,
+ inline void recover(/*const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
+ const u_int32_t jfsize_sblks,*/
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id) {
- recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks,
this, prep_tx_list_ptr, highest_rid, queue_id);
}
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index c92c9828f4..c5a1a949c4 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -19,24 +19,24 @@
*
*/
-#include "qpid/legacystore/MessageStoreImpl.h"
+#include "qpid/linearstore/MessageStoreImpl.h"
#include "qpid/broker/QueueSettings.h"
-#include "qpid/legacystore/BindingDbt.h"
-#include "qpid/legacystore/BufferValue.h"
-#include "qpid/legacystore/IdDbt.h"
-#include "qpid/legacystore/jrnl/txn_map.h"
+#include "qpid/linearstore/BindingDbt.h"
+#include "qpid/linearstore/BufferValue.h"
+#include "qpid/linearstore/IdDbt.h"
+#include "qpid/linearstore/jrnl/txn_map.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
-#include "qmf/org/apache/qpid/legacystore/Package.h"
-#include "qpid/legacystore/StoreException.h"
+#include "qmf/org/apache/qpid/linearstore/Package.h"
+#include "qpid/linearstore/StoreException.h"
#include <dirent.h>
#include <db.h>
#define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
#define AIO_SLEEP_TIME_US 10 // 0.01 ms
-namespace _qmf = qmf::org::apache::qpid::legacystore;
+namespace _qmf = qmf::org::apache::qpid::linearstore;
namespace mrg {
namespace msgstore {
@@ -78,6 +78,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en
agent(0)
{}
+/*
u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName)
{
if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) {
@@ -87,7 +88,9 @@ u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const st
}
return param;
}
+*/
+/*
u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks)
{
if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param > JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) {
@@ -102,16 +105,17 @@ u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const st
}
return param;
}
+*/
-u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs)
+u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName/*, const u_int16_t jrnlFsizePgs*/)
{
u_int32_t p = param;
- if (jrnlFsizePgs == 1 && p > 64 ) {
+/* if (jrnlFsizePgs == 1 && p > 64 ) {
p = 64;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")");
}
- else if (p == 0) {
+ else*/ if (p == 0) {
// For zero value, use default
p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
@@ -149,6 +153,7 @@ u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib)
}
}
+/*
void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts,
bool& autoJrnlExpand,
u_int16_t& autoJrnlExpandMaxFiles,
@@ -198,6 +203,7 @@ void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts,
autoJrnlExpand = true;
autoJrnlExpandMaxFiles = p;
}
+*/
void MessageStoreImpl::initManagement ()
{
@@ -233,45 +239,46 @@ bool MessageStoreImpl::init(const qpid::Options* options)
{
// Extract and check options
const StoreOptions* opts = static_cast<const StoreOptions*>(options);
- u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles");
- u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs");
- u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size", jrnlFsizePgs);
- u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles");
- u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs");
- u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size", tplJrnlFSizePgs);
- bool autoJrnlExpand;
- u_int16_t autoJrnlExpandMaxFiles;
- chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles");
+// u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles");
+// u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs");
+ u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"/*, jrnlFsizePgs*/);
+// u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles");
+// u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs");
+ u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"/*, tplJrnlFSizePgs*/);
+// bool autoJrnlExpand;
+// u_int16_t autoJrnlExpandMaxFiles;
+// chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles");
// Pass option values to init(...)
- return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, opts->truncateFlag, jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib, autoJrnlExpand, autoJrnlExpandMaxFiles);
+ return init(opts->storeDir, /*numJrnlFiles, jrnlFsizePgs,*/ opts->truncateFlag, jrnlWrCachePageSizeKib,
+ /*tplNumJrnlFiles, tplJrnlFSizePgs,*/ tplJrnlWrCachePageSizeKib/*, autoJrnlExpand, autoJrnlExpandMaxFiles*/);
}
// These params, taken from options, are assumed to be correct and verified
bool MessageStoreImpl::init(const std::string& dir,
- u_int16_t jfiles,
- u_int32_t jfileSizePgs,
+ /*u_int16_t jfiles,
+ u_int32_t jfileSizePgs,*/
const bool truncateFlag,
u_int32_t wCachePageSizeKib,
- u_int16_t tplJfiles,
- u_int32_t tplJfileSizePgs,
- u_int32_t tplWCachePageSizeKib,
+ /*u_int16_t tplJfiles,
+ u_int32_t tplJfileSizePgs,*/
+ u_int32_t tplWCachePageSizeKib/*,
bool autoJExpand,
- u_int16_t autoJExpandMaxFiles)
+ u_int16_t autoJExpandMaxFiles*/)
{
if (isInit) return true;
// Set geometry members (converting to correct units where req'd)
- numJrnlFiles = jfiles;
- jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+// numJrnlFiles = jfiles;
+// jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib);
- tplNumJrnlFiles = tplJfiles;
- tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+// tplNumJrnlFiles = tplJfiles;
+// tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
- autoJrnlExpand = autoJExpand;
- autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
+// autoJrnlExpand = autoJExpand;
+// autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
if (dir.size()>0) storeDir = dir;
if (truncateFlag)
@@ -280,15 +287,15 @@ bool MessageStoreImpl::init(const std::string& dir,
init();
QPID_LOG(notice, "Store module initialized; store-dir=" << dir);
- QPID_LOG(info, "> Default files per journal: " << jfiles);
+// QPID_LOG(info, "> Default files per journal: " << jfiles);
// TODO: Uncomment these lines when auto-expand is enabled.
// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
- QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
+// QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)");
QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
- QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
- QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
+// QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
+// QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)");
QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
@@ -424,7 +431,7 @@ void MessageStoreImpl::chkTplStoreInit()
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
journal::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -468,7 +475,7 @@ MessageStoreImpl::~MessageStoreImpl()
}
void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
- const qpid::framing::FieldTable& args)
+ const qpid::framing::FieldTable& /*args*/)
{
checkInit();
if (queue.getPersistenceId()) {
@@ -477,6 +484,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
JournalImpl* jQueue = 0;
qpid::framing::FieldTable::ValuePtr value;
+/*
u_int16_t localFileCount = numJrnlFiles;
bool localAutoExpandFlag = autoJrnlExpand;
u_int16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles;
@@ -489,6 +497,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
value = args.get("qpid.file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE;
+*/
if (queue.getName().size() == 0)
{
@@ -504,6 +513,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
journalList[queue.getName()]=jQueue;
}
+/*
value = args.get("qpid.auto_expand");
if (value.get() != 0 && !value->empty() && value->convertsTo<bool>())
localAutoExpandFlag = (bool) value->get<bool>();
@@ -511,11 +521,12 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
value = args.get("qpid.auto_expand_max_jfiles");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
localAutoExpandMaxFileCount = (u_int16_t) value->get<int>();
+*/
queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
- jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
+ jQueue->initialize(/*localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks,*/ wCacheNumPages, wCachePgSizeSblks);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
@@ -796,8 +807,10 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
u_int64_t thisHighestRid = 0ULL;
- jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
+ jQueue->recover(/*numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks,*/ wCacheNumPages,
+ wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
+/*
// Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
// from recovery of a store that has had its size changed externally by the resize utility.
// If so, update the queue store settings so that QMF queries will reflect the new values.
@@ -811,6 +824,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
}
+*/
if (highestRid == 0ULL)
highestRid = thisHighestRid;
@@ -1172,7 +1186,7 @@ void MessageStoreImpl::recoverTplStore()
{
if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid = 0ULL;
- tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
@@ -1675,16 +1689,17 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j) {
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
qpid::Options(name),
- numJrnlFiles(defNumJrnlFiles),
+ /*numJrnlFiles(defNumJrnlFiles),
autoJrnlExpand(defAutoJrnlExpand),
autoJrnlExpandMaxFiles(defAutoJrnlExpandMaxFiles),
- jrnlFsizePgs(defJrnlFileSizePgs),
+ jrnlFsizePgs(defJrnlFileSizePgs),*/
truncateFlag(defTruncateFlag),
wCachePageSizeKib(defWCachePageSize),
- tplNumJrnlFiles(defTplNumJrnlFiles),
- tplJrnlFsizePgs(defTplJrnlFileSizePgs),
+ /*tplNumJrnlFiles(defTplNumJrnlFiles),
+ tplJrnlFsizePgs(defTplJrnlFileSizePgs),*/
tplWCachePageSizeKib(defTplWCachePageSize)
{
+/*
std::ostringstream oss1;
oss1 << "Default number of files for each journal instance (queue). [Allowable values: " <<
JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]";
@@ -1697,12 +1712,13 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
std::ostringstream oss4;
oss4 << "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64KiB) [Allowable values: " <<
JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]";
+*/
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Required if --no-data-dir is also used.")
- ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str())
- ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str())
+// ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str())
+// ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str())
// TODO: Uncomment these lines when auto-expand is enabled.
// ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"),
// "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. "
@@ -1716,8 +1732,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
"Size of the pages in the write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
"Lower values decrease latency at the expense of throughput.")
- ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str())
- ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str())
+// ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str())
+// ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str())
("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"),
"Size of the pages in the transaction prepared list write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 68aceedfbb..24e31f1be6 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -25,17 +25,17 @@
#include <string>
#include "db-inc.h"
-#include "qpid/legacystore/Cursor.h"
-#include "qpid/legacystore/IdDbt.h"
-#include "qpid/legacystore/IdSequence.h"
-#include "qpid/legacystore/JournalImpl.h"
-#include "qpid/legacystore/jrnl/jcfg.h"
-#include "qpid/legacystore/PreparedTransaction.h"
+#include "qpid/linearstore/Cursor.h"
+#include "qpid/linearstore/IdDbt.h"
+#include "qpid/linearstore/IdSequence.h"
+#include "qpid/linearstore/JournalImpl.h"
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/PreparedTransaction.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/management/Manageable.h"
-#include "qmf/org/apache/qpid/legacystore/Store.h"
-#include "qpid/legacystore/TxnCtxt.h"
+#include "qmf/org/apache/qpid/linearstore/Store.h"
+#include "qpid/linearstore/TxnCtxt.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -63,14 +63,14 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
StoreOptions(const std::string& name="Store Options");
std::string clusterName;
std::string storeDir;
- u_int16_t numJrnlFiles;
- bool autoJrnlExpand;
- u_int16_t autoJrnlExpandMaxFiles;
- u_int32_t jrnlFsizePgs;
+// u_int16_t numJrnlFiles;
+// bool autoJrnlExpand;
+// u_int16_t autoJrnlExpandMaxFiles;
+// u_int32_t jrnlFsizePgs;
bool truncateFlag;
u_int32_t wCachePageSizeKib;
- u_int16_t tplNumJrnlFiles;
- u_int32_t tplJrnlFsizePgs;
+// u_int16_t tplNumJrnlFiles;
+// u_int32_t tplJrnlFsizePgs;
u_int32_t tplWCachePageSizeKib;
};
@@ -99,16 +99,16 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
typedef JournalListMap::iterator JournalListMapItr;
// Default store settings
- static const u_int16_t defNumJrnlFiles = 8;
- static const u_int32_t defJrnlFileSizePgs = 24;
+// static const u_int16_t defNumJrnlFiles = 8;
+// static const u_int32_t defJrnlFileSizePgs = 24;
static const bool defTruncateFlag = false;
static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- static const u_int16_t defTplNumJrnlFiles = 8;
- static const u_int32_t defTplJrnlFileSizePgs = 24;
+// static const u_int16_t defTplNumJrnlFiles = 8;
+// static const u_int32_t defTplJrnlFileSizePgs = 24;
static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8;
// TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line
- static const bool defAutoJrnlExpand = false;
- static const u_int16_t defAutoJrnlExpandMaxFiles = 0;
+// static const bool defAutoJrnlExpand = false;
+// static const u_int16_t defAutoJrnlExpandMaxFiles = 0;
static const std::string storeTopLevelDir;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -152,26 +152,26 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
const char* envPath;
qpid::broker::Broker* broker;
- qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject;
+ qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
// Parameter validation and calculation
- static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
- const std::string paramName);
- static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
- const std::string paramName,
- const u_int32_t wCachePgSizeSblks = 0);
+// static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
+// const std::string paramName);
+// static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
+// const std::string paramName,
+// const u_int32_t wCachePgSizeSblks = 0);
static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param,
- const std::string paramName,
- const u_int16_t jrnlFsizePgs);
+ const std::string paramName/*,
+ const u_int16_t jrnlFsizePgs*/);
static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib);
- void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts,
- bool& autoJrnlExpand,
- u_int16_t& autoJrnlExpandMaxFiles,
- const std::string& autoJrnlExpandMaxFilesParamName,
- const u_int16_t numJrnlFiles,
- const std::string& numJrnlFilesParamName);
+// void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts,
+// bool& autoJrnlExpand,
+// u_int16_t& autoJrnlExpandMaxFiles,
+// const std::string& autoJrnlExpandMaxFilesParamName,
+// const u_int16_t numJrnlFiles,
+// const std::string& numJrnlFilesParamName);
void init();
@@ -280,15 +280,15 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
bool init(const qpid::Options* options);
bool init(const std::string& dir,
- u_int16_t jfiles = defNumJrnlFiles,
- u_int32_t jfileSizePgs = defJrnlFileSizePgs,
+ /*u_int16_t jfiles = defNumJrnlFiles,
+ u_int32_t jfileSizePgs = defJrnlFileSizePgs,*/
const bool truncateFlag = false,
u_int32_t wCachePageSize = defWCachePageSize,
- u_int16_t tplJfiles = defTplNumJrnlFiles,
- u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,
- u_int32_t tplWCachePageSize = defTplWCachePageSize,
+ /*u_int16_t tplJfiles = defTplNumJrnlFiles,
+ u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,*/
+ u_int32_t tplWCachePageSize = defTplWCachePageSize/*,
bool autoJExpand = defAutoJrnlExpand,
- u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles);
+ u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles*/);
void truncateInit(const bool saveStoreContent = false);
diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
index 50b81e2824..ddf907d1f4 100644
--- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
+++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/legacystore/PreparedTransaction.h"
+#include "qpid/linearstore/PreparedTransaction.h"
#include <algorithm>
using namespace mrg::msgstore;
diff --git a/qpid/cpp/src/qpid/linearstore/StoreException.h b/qpid/cpp/src/qpid/linearstore/StoreException.h
index 6624aafd5a..0f1474f102 100644
--- a/qpid/cpp/src/qpid/linearstore/StoreException.h
+++ b/qpid/cpp/src/qpid/linearstore/StoreException.h
@@ -22,7 +22,7 @@
#ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H
#define QPID_LEGACYSTORE_STOREEXCEPTION_H
-#include "qpid/legacystore/IdDbt.h"
+#include "qpid/linearstore/IdDbt.h"
#include <boost/format.hpp>
namespace mrg{
diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
index f9b77ce02c..eaaaa87f26 100644
--- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
+++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
@@ -24,7 +24,7 @@
#include "qpid/Options.h"
#include "qpid/DataDir.h"
#include "qpid/log/Statement.h"
-#include "qpid/legacystore/MessageStoreImpl.h"
+#include "qpid/linearstore/MessageStoreImpl.h"
using mrg::msgstore::MessageStoreImpl;
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
index 1db41f4c70..5b1fd81275 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
@@ -19,12 +19,12 @@
*
*/
-#include "qpid/legacystore/TxnCtxt.h"
+#include "qpid/linearstore/TxnCtxt.h"
#include <sstream>
-#include "qpid/legacystore/jrnl/jexception.h"
-#include "qpid/legacystore/StoreException.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/StoreException.h"
namespace mrg {
namespace msgstore {
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
index 77eaa27cd7..961efa5fe8 100644
--- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
+++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h
@@ -27,9 +27,9 @@
#include <set>
#include <string>
-#include "qpid/legacystore/DataTokenImpl.h"
-#include "qpid/legacystore/IdSequence.h"
-#include "qpid/legacystore/JournalImpl.h"
+#include "qpid/linearstore/DataTokenImpl.h"
+#include "qpid/linearstore/IdSequence.h"
+#include "qpid/linearstore/JournalImpl.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/sys/Mutex.h"
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
index ab94b6914d..f4048fd3d3 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
@@ -59,8 +59,8 @@
#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
//
-//#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default)
-//#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default)
+#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default)
+#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default)
//
#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr
#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
index 08752b27c1..8ae2cd3dd7 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
@@ -90,8 +90,8 @@ jcntl::~jcntl()
}
void
-jcntl::initialize(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_max_jfiles*/,
- const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp)
{
_init_flag = false;
@@ -106,12 +106,12 @@ jcntl::initialize(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_
// Set new file geometry parameters
// assert(num_jfiles >= JRNL_MIN_NUM_FILES);
// assert(num_jfiles <= JRNL_MAX_NUM_FILES);
- _emap.set_num_jfiles(num_jfiles);
- _tmap.set_num_jfiles(num_jfiles);
+// _emap.set_num_jfiles(num_jfiles);
+// _tmap.set_num_jfiles(num_jfiles);
// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
- _jfsize_sblks = jfsize_sblks;
+// _jfsize_sblks = jfsize_sblks;
// Clear any existing journal files
_jdir.clear_dir();
@@ -130,8 +130,8 @@ jcntl::initialize(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_
}
void
-jcntl::recover(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_max_jfiles*/,
- const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
// const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const std::vector<std::string>* prep_txn_list_ptr,
aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr,
uint64_t& highest_rid)
@@ -149,11 +149,11 @@ jcntl::recover(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_max
// assert(num_jfiles <= JRNL_MAX_NUM_FILES);
// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
- _jfsize_sblks = jfsize_sblks;
+// _jfsize_sblks = jfsize_sblks;
// Verify journal dir and journal files
_jdir.verify_dir();
- _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
+// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
highest_rid = _rcvdat._h_rid;
@@ -444,6 +444,7 @@ jcntl::fhdr_wr_sync(const uint16_t /*lid*/)
*/
}
+/*
fcntl*
jcntl::new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp)
{
@@ -452,6 +453,7 @@ jcntl::new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const
oss << jcp->jrnl_dir() << "/" << jcp->base_filename();
return new fcntl(oss.str(), fid, lid, jcp->jfsize_sblks(), rdp);
}
+*/
// Protected/Private functions
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
index 868cc5be31..0e59c1e8a0 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
@@ -190,8 +190,8 @@ namespace journal
*
* \exception TODO
*/
- void initialize(const uint16_t num_jfiles/*, const bool auto_expand, const uint16_t ae_max_jfiles*/,
- const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+ void initialize(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp);
/**
@@ -227,8 +227,8 @@ namespace journal
*
* \exception TODO
*/
- void recover(const uint16_t num_jfiles/*, const bool auto_expand, const uint16_t ae_max_jfiles*/,
- const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
+ void recover(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
+ const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, uint64_t& highest_rid);
/**
@@ -654,7 +654,7 @@ namespace journal
/**
* /brief Static function for creating new fcntl objects for use with obj_arr.
*/
- static fcntl* new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp);
+// static fcntl* new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp);
protected:
static bool _init;
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
index c17fe0acec..e8263ea281 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
@@ -21,16 +21,15 @@
#include "file_hdr.h"
-void file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t fro, const uint64_t ts_sec, const uint64_t ts_nsec,
- const uint32_t file_count, const uint64_t file_size, const uint64_t file_number) {
+int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size,
+ const uint64_t file_number) {
rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid);
dest->_fro = fro;
- dest->_ts_sec = ts_sec;
- dest->_ts_nsec = ts_nsec;
dest->_file_count = file_count;
dest->_file_size = file_size;
dest->_file_number = file_number;
+ return set_time_now(dest);
}
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) {
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
index a60dc43636..5076a184b1 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
@@ -78,9 +78,9 @@ typedef struct file_hdr_t {
uint16_t _name_length; /**< Length of the queue name in octets, which follows this struct in the header */
} file_hdr_t;
-void file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
- const uint64_t rid, const uint64_t fro, const uint64_t ts_sec, const uint64_t ts_nsec,
- const uint32_t file_count, const uint64_t file_size, const uint64_t file_number);
+int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag,
+ const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size,
+ const uint64_t file_number);
void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src);
int set_time_now(file_hdr_t *fh);
void set_time(file_hdr_t *fh, struct timespec *ts);
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
index acb09dc2c3..54d20edb5c 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
@@ -19,15 +19,15 @@
*
*/
-#include "qpid/legacystore/jrnl/wmgr.h"
+#include "qpid/linearstore/jrnl/wmgr.h"
#include <cassert>
#include <cerrno>
#include <cstdlib>
#include <cstring>
-#include "qpid/legacystore/jrnl/file_hdr.h"
-#include "qpid/legacystore/jrnl/jcntl.h"
-#include "qpid/legacystore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
#include <sstream>
#include <stdint.h>
@@ -36,9 +36,9 @@ namespace mrg
namespace journal
{
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc):
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/):
pmgr(jc, emap, tmap),
- _wrfc(wrfc),
+// _wrfc(wrfc),
_max_dtokpp(0),
_max_io_wait_us(0),
_fhdr_base_ptr(0),
@@ -55,10 +55,10 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc):
_txn_pending_set()
{}
-wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc,
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/,
const uint32_t max_dtokpp, const uint32_t max_iowait_us):
pmgr(jc, emap, tmap /* , dtoklp */),
- _wrfc(wrfc),
+// _wrfc(wrfc),
_max_dtokpp(max_dtokpp),
_max_io_wait_us(max_iowait_us),
_fhdr_base_ptr(0),
@@ -138,8 +138,8 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
}
}
- uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
- _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient,
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+ _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient,
external);
if (!cont)
{
@@ -161,8 +161,9 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- if (data_offs_dblks == 0)
- dtokp->set_fid(_wrfc.index());
+ // TODO: replace for linearstore: _wrfc
+// if (data_offs_dblks == 0)
+// dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -179,7 +180,7 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
// long multi-page messages have their token on the page containing the END of the
// message. AIO callbacks will then only process this token when entire message is
// enqueued.
- _wrfc.incr_enqcnt(dtokp->fid());
+ //_wrfc.incr_enqcnt(dtokp->fid()); // TODO: replace for linearstore: _wrfc
if (xid_len) // If part of transaction, add to transaction map
{
@@ -237,9 +238,9 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
}
const bool ext_rid = dtokp->external_rid();
- uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
+ uint64_t rid = (ext_rid | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
- _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit);
+ _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit);
if (!cont)
{
if (!ext_rid)
@@ -265,8 +266,9 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- if (data_offs_dblks == 0)
- dtokp->set_fid(_wrfc.index());
+ // TODO: replace for linearstore: _wrfc
+// if (data_offs_dblks == 0)
+// dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -304,7 +306,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
- _wrfc.decr_enqcnt(fid);
+// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc
}
done = true;
@@ -346,8 +348,8 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le
}
}
- uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
- _txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() :/* _wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+ _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
if (!cont)
{
dtokp->set_rid(rid);
@@ -366,8 +368,9 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- if (data_offs_dblks == 0)
- dtokp->set_fid(_wrfc.index());
+ // TODO: replace for linearstore: _wrfc
+// if (data_offs_dblks == 0)
+// dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -386,8 +389,9 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le
{
if (!itr->_enq_flag)
_emap.unlock(itr->_drid); // ignore rid not found error
- if (itr->_enq_flag)
- _wrfc.decr_enqcnt(itr->_pfid);
+ // TODO: replace for linearstore: _wrfc
+// if (itr->_enq_flag)
+// _wrfc.decr_enqcnt(itr->_pfid);
}
std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
if (!res.second)
@@ -436,8 +440,8 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
}
}
- uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
- _txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
+ uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc
+ _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/);
if (!cont)
{
dtokp->set_rid(rid);
@@ -456,8 +460,9 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
- if (data_offs_dblks == 0)
- dtokp->set_fid(_wrfc.index());
+ // TODO: replace for linearstore: _wrfc
+// if (data_offs_dblks == 0)
+// dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -502,7 +507,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
- _wrfc.decr_enqcnt(fid);
+// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc
}
}
std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
@@ -527,10 +532,12 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l
}
void
-wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem)
+wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint32_t /*rec_dblks_rem*/)
{
// Has the file header been written (i.e. write pointers still at 0)?
- if (_wrfc.is_void())
+ // TODO: replace for linearstore: _wrfc
+/*
+ if (_wrfc.is_void()) // TODO: replace for linearstore: _wrfc
{
bool file_fit = rec_dblks_rem <= _jfsize_dblks;
bool file_full = rec_dblks_rem == _jfsize_dblks;
@@ -542,8 +549,9 @@ wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_
}
else
fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
- write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro);
+ write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc
}
+*/
}
void
@@ -595,6 +603,7 @@ iores
wmgr::write_flush()
{
iores res = RHM_IORES_SUCCESS;
+/*
// Don't bother flushing an empty page or one that is still in state AIO_PENDING
if (_cached_offset_dblks)
{
@@ -640,16 +649,21 @@ wmgr::write_flush()
get_events(UNUSED, 0);
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
+*/
return res;
}
iores
wmgr::rotate_file()
{
+ // TODO: replace for linearstore: _wrfc
+/*
_pg_cntr = 0;
iores res = _wrfc.rotate();
_jc->chk_wr_frot();
return res;
+*/
+ return RHM_IORES_SUCCESS;
}
int32_t
@@ -692,8 +706,9 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
oss << "pg=" << pcbp->_index;
else
{
- file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
- oss << "fid=" << fhp->_pfid;
+ // TODO: replace for linearstore: fhp->_pfid
+// file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf;
+// oss << "fid=" << fhp->_pfid;
}
oss << " size=" << aiocbp->u.c.nbytes;
oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]";
@@ -778,8 +793,9 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
// Increment the completed write offset
// NOTE: We cannot use _wrfc here, as it may have rotated since submitting count.
// Use stored pointer to fcntl in the pcb instead.
- pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks);
- pcbp->_wfh->decr_aio_cnt();
+ // TODO: replace for linearstore: pcbp->_wfh
+// pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks);
+// pcbp->_wfh->decr_aio_cnt();
_jc->instr_decr_outstanding_aio_cnt();
// Clean up this pcb's data_tok list
@@ -793,12 +809,15 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush)
else // File header writes have no pcb
{
// get lfid from original file header record, update info for that lfid
- file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
+ // TODO: replace for linearstore: lfid
+/*
+ file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf;
uint32_t lfid = fhp->_lfid;
fcntl* fcntlp = _jc->get_fcntlp(lfid);
fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
fcntlp->decr_aio_cnt();
fcntlp->set_wr_fhdr_aio_outstanding(false);
+*/
}
}
@@ -821,8 +840,8 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co
{
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
- _num_jfiles = _jc->num_jfiles();
- if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles))
+ /*_num_jfiles = _jc->num_jfiles();*/ // TODO: replace for linearstore: _jc->num_jfiles()
+ if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize /** _num_jfiles*/))
{
wmgr::clean();
std::ostringstream oss;
@@ -848,15 +867,18 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co
iores
wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
- const std::size_t xidsize, const std::size_t dsize, const bool external
+ const std::size_t /*xidsize*/, const std::size_t /*dsize*/, const bool /*external*/
) const
{
// Check status of current file
+ // TODO: replace for linearstore: _wrfc
+/*
if (!_wrfc.is_wr_reset())
{
if (!_wrfc.wr_reset())
return RHM_IORES_FULL;
}
+*/
// Check status of current page is ok for writing
if (_page_cb_arr[_pg_index]._state != IN_USE)
@@ -880,10 +902,12 @@ wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
case WMGR_ENQUEUE:
{
// Check for enqueue reaching cutoff threshold
- uint32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
- external));
+ // TODO: replace for linearstore: _wrfc
+/*
+ uint32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize, external));
if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
return RHM_IORES_ENQCAPTHRESH;
+*/
if (!dtokp->is_writable())
{
std::ostringstream oss;
@@ -946,7 +970,7 @@ wmgr::dequeue_check(const std::string& xid, const uint64_t drid)
void
wmgr::dblk_roundup()
{
- const uint32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+ const uint32_t xmagic = QLS_EMPTY_MAGIC;
uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE) * JRNL_SBLK_SIZE;
while (_cached_offset_dblks < wdblks)
{
@@ -961,21 +985,25 @@ wmgr::dblk_roundup()
}
void
-wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t lid, std::size_t fro)
+wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro)
{
- file_hdr fhdr(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, rid, fid, lid, fro, _wrfc.owi(), true);
+ file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/;
+ /*int err =*/ ::file_hdr_init(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, rid, fro, 0, 0, 0);
std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr));
#ifdef RHM_CLEAN
std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
#endif
aio_cb* aiocbp = _fhdr_aio_cb_arr[fid];
- aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0);
+// aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0); // TODO: replace for linearstore: _wrfc
if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
_aio_evt_rem++;
+ // TODO: replace for linearstore: _wrfc
+/*
_wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
_wrfc.incr_aio_cnt();
_wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true);
+*/
}
void
@@ -1029,7 +1057,7 @@ wmgr::status_str() const
default: oss << _page_cb_arr[i]._state;
}
}
- oss << "] " << _wrfc.status_str();
+ oss << "] " /*<< _wrfc.status_str()*/; // TODO: replace for linearstore: _wrfc
return oss.str();
}
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
index 39687226a5..7d29ac675f 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h
@@ -31,9 +31,9 @@ class wmgr;
}
#include <cstring>
-#include "qpid/legacystore/jrnl/enums.h"
-#include "qpid/legacystore/jrnl/pmgr.h"
-#include "qpid/legacystore/jrnl/wrfc.h"
+#include "qpid/linearstore/jrnl/enums.h"
+#include "qpid/linearstore/jrnl/pmgr.h"
+//#include "qpid/linearstore/jrnl/wrfc.h"
#include <set>
namespace mrg
diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml
index 65969f0fb2..a9f7b143ad 100644
--- a/qpid/cpp/src/qpid/linearstore/management-schema.xml
+++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml
@@ -1,4 +1,4 @@
-<schema package="org.apache.qpid.legacystore">
+<schema package="org.apache.qpid.linearstore">
<!--
Licensed to the Apache Software Foundation (ASF) under one