diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-08-21 17:42:23 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-08-21 17:42:23 +0000 |
commit | 341696dbb845dc620e2c916b30e312e7f47c3388 (patch) | |
tree | 462cacee4843aa64dc917e6c0e76de2e4d945c40 | |
parent | 7cfb37d4117ed1726795cd5e30f5a11ea1a555c9 (diff) | |
download | qpid-python-341696dbb845dc620e2c916b30e312e7f47c3388.tar.gz |
QPID-4984: WIP - compiles, but not functional.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1516229 13f79535-47bb-0310-9956-ffa450edef68
25 files changed, 277 insertions, 222 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 3a02b516a2..dcbb17d1e0 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -210,6 +210,7 @@ execute_process(COMMAND ${RUBY_EXECUTABLE} -I ${rgen_dir} ${rgen_dir}/generate $ ${CMAKE_CURRENT_SOURCE_DIR}/qpid/acl/management-schema.xml ${CMAKE_CURRENT_SOURCE_DIR}/qpid/ha/management-schema.xml ${CMAKE_CURRENT_SOURCE_DIR}/qpid/legacystore/management-schema.xml + ${CMAKE_CURRENT_SOURCE_DIR}/qpid/linearstore/management-schema.xml ) set(mgen_dir ${qpid-cpp_SOURCE_DIR}/managementgen) set(regen_mgmt OFF) diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp index c92c9828f4..3b42c7bd5d 100644 --- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp @@ -115,7 +115,7 @@ u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const // For zero value, use default p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); - } else if ( p > 128 || p & (p-1) ) { + } else if ( p > 128 || (p & (p-1)) ) { // For any positive value that is not a power of 2, use closest value if (p < 6) p = 4; else if (p < 12) p = 8; diff --git a/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp b/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp index a48c156e71..d50009c82e 100644 --- a/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp +++ b/qpid/cpp/src/qpid/linearstore/BindingDbt.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/legacystore/BindingDbt.h" +#include "qpid/linearstore/BindingDbt.h" namespace mrg { namespace msgstore { diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp index fb2c471cd7..4bed7d426a 100644 --- a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp +++ b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/legacystore/BufferValue.h" +#include "qpid/linearstore/BufferValue.h" namespace mrg { namespace msgstore { diff --git a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp index 796d4c02f0..c16466b05e 100644 --- a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/legacystore/DataTokenImpl.h" +#include "qpid/linearstore/DataTokenImpl.h" using namespace mrg::msgstore; diff --git a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h index e01d471e1b..3c8b6cbd12 100644 --- a/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h +++ b/qpid/cpp/src/qpid/linearstore/DataTokenImpl.h @@ -22,7 +22,7 @@ #ifndef QPID_LEGACYSTORE_DATATOKENIMPL_H #define QPID_LEGACYSTORE_DATATOKENIMPL_H -#include "qpid/legacystore/jrnl/data_tok.h" +#include "qpid/linearstore/jrnl/data_tok.h" #include "qpid/broker/PersistableMessage.h" #include <boost/intrusive_ptr.hpp> diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp index d9edaf80e6..2308d074bc 100644 --- a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp +++ b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/legacystore/IdDbt.h" +#include "qpid/linearstore/IdDbt.h" using namespace mrg::msgstore; diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp index 975b1107e7..9034a11fd2 100644 --- a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp +++ b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/legacystore/IdSequence.h" +#include "qpid/linearstore/IdSequence.h" using namespace mrg::msgstore; using qpid::sys::Mutex; diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index ba3f2aecae..1cc9f3f08c 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -19,25 +19,25 @@ * */ -#include "qpid/legacystore/JournalImpl.h" +#include "qpid/linearstore/JournalImpl.h" -#include "qpid/legacystore/jrnl/jerrno.h" -#include "qpid/legacystore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/jerrno.h" +#include "qpid/linearstore/jrnl/jexception.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" -#include "qmf/org/apache/qpid/legacystore/ArgsJournalExpand.h" -#include "qmf/org/apache/qpid/legacystore/EventCreated.h" -#include "qmf/org/apache/qpid/legacystore/EventEnqThresholdExceeded.h" -#include "qmf/org/apache/qpid/legacystore/EventFull.h" -#include "qmf/org/apache/qpid/legacystore/EventRecovered.h" +#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h" +#include "qmf/org/apache/qpid/linearstore/EventCreated.h" +#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h" +#include "qmf/org/apache/qpid/linearstore/EventFull.h" +#include "qmf/org/apache/qpid/linearstore/EventRecovered.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" -#include "qpid/legacystore/StoreException.h" +#include "qpid/linearstore/StoreException.h" using namespace mrg::msgstore; using namespace mrg::journal; using qpid::management::ManagementAgent; -namespace _qmf = qmf::org::apache::qpid::legacystore; +namespace _qmf = qmf::org::apache::qpid::linearstore; InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {} @@ -132,22 +132,24 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) void -JournalImpl::initialize(const u_int16_t num_jfiles, +JournalImpl::initialize(/*const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, + const u_int32_t jfsize_sblks,*/ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, mrg::journal::aio_callback* const cbp) { std::ostringstream oss; - oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; +// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; + oss << "Initialize;"; oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; oss << " wcache_num_pages=" << wcache_num_pages; log(LOG_DEBUG, oss.str()); - jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp); + jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp); log(LOG_DEBUG, "Initialization complete"); - + // TODO: replace for linearstore: _lpmgr +/* if (_mgmtObject.get() != 0) { _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); @@ -159,15 +161,16 @@ JournalImpl::initialize(const u_int16_t num_jfiles, _mgmtObject->set_writePages(wcache_num_pages); } if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()), + _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()), qpid::management::ManagementAgent::SEV_NOTE); +*/ } void -JournalImpl::recover(const u_int16_t num_jfiles, +JournalImpl::recover(/*const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, + const u_int32_t jfsize_sblks,*/ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, mrg::journal::aio_callback* const cbp, @@ -176,12 +179,14 @@ JournalImpl::recover(const u_int16_t num_jfiles, u_int64_t queue_id) { std::ostringstream oss1; - oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; +// oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; + oss1 << "Recover;"; oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec; oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; oss1 << " wcache_num_pages=" << wcache_num_pages; log(LOG_DEBUG, oss1.str()); - + // TODO: replace for linearstore: _lpmgr +/* if (_mgmtObject.get() != 0) { _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); @@ -192,6 +197,7 @@ JournalImpl::recover(const u_int16_t num_jfiles, _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); _mgmtObject->set_writePages(wcache_num_pages); } +*/ if (prep_tx_list_ptr) { // Create list of prepared xids @@ -200,10 +206,10 @@ JournalImpl::recover(const u_int16_t num_jfiles, prep_xid_list.push_back(i->xid); } - jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid); } else { - jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid); } @@ -242,9 +248,12 @@ JournalImpl::recover_complete() { jcntl::recover_complete(); log(LOG_DEBUG, "Recover phase 2 complete; journal now writable."); + // TODO: replace for linearstore: _lpmgr +/* if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(), + _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(), _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE); +*/ } //#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec @@ -589,7 +598,7 @@ JournalImpl::handleIoResult(const iores r) oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\"."; log(LOG_WARN, oss.str()); if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"), + _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"), qpid::management::ManagementAgent::SEV_WARN); THROW_STORE_FULL_EXCEPTION(oss.str()); } @@ -599,7 +608,7 @@ JournalImpl::handleIoResult(const iores r) oss << "Journal full on queue \"" << _jid << "\"."; log(LOG_CRITICAL, oss.str()); if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR); + _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR); THROW_STORE_FULL_EXCEPTION(oss.str()); } default: diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index 7227b2ffd4..1beb469b3f 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -23,17 +23,17 @@ #define QPID_LEGACYSTORE_JOURNALIMPL_H #include <set> -#include "qpid/legacystore/jrnl/enums.h" -#include "qpid/legacystore/jrnl/jcntl.h" -#include "qpid/legacystore/DataTokenImpl.h" -#include "qpid/legacystore/PreparedTransaction.h" +#include "qpid/linearstore/jrnl/enums.h" +#include "qpid/linearstore/jrnl/jcntl.h" +#include "qpid/linearstore/DataTokenImpl.h" +#include "qpid/linearstore/PreparedTransaction.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/sys/Timer.h" #include "qpid/sys/Time.h" #include <boost/ptr_container/ptr_list.hpp> #include <boost/intrusive_ptr.hpp> #include "qpid/management/Manageable.h" -#include "qmf/org/apache/qpid/legacystore/Journal.h" +#include "qmf/org/apache/qpid/linearstore/Journal.h" namespace qpid { namespace sys { class Timer; @@ -98,7 +98,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal bool _external; qpid::management::ManagementAgent* _agent; - qmf::org::apache::qpid::legacystore::Journal::shared_ptr _mgmtObject; + qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject; DeleteCallback deleteCallback; public: @@ -116,28 +116,28 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal void initManagement(qpid::management::ManagementAgent* agent); - void initialize(const u_int16_t num_jfiles, + void initialize(/*const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, + const u_int32_t jfsize_sblks,*/ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, mrg::journal::aio_callback* const cbp); - inline void initialize(const u_int16_t num_jfiles, + inline void initialize(/*const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, + const u_int32_t jfsize_sblks,*/ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks) { - initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, this); } - void recover(const u_int16_t num_jfiles, + void recover(/*const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, + const u_int32_t jfsize_sblks,*/ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, mrg::journal::aio_callback* const cbp, @@ -145,16 +145,16 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal u_int64_t& highest_rid, u_int64_t queue_id); - inline void recover(const u_int16_t num_jfiles, + inline void recover(/*const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, + const u_int32_t jfsize_sblks,*/ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr, u_int64_t& highest_rid, u_int64_t queue_id) { - recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, this, prep_tx_list_ptr, highest_rid, queue_id); } diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index c92c9828f4..c5a1a949c4 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -19,24 +19,24 @@ * */ -#include "qpid/legacystore/MessageStoreImpl.h" +#include "qpid/linearstore/MessageStoreImpl.h" #include "qpid/broker/QueueSettings.h" -#include "qpid/legacystore/BindingDbt.h" -#include "qpid/legacystore/BufferValue.h" -#include "qpid/legacystore/IdDbt.h" -#include "qpid/legacystore/jrnl/txn_map.h" +#include "qpid/linearstore/BindingDbt.h" +#include "qpid/linearstore/BufferValue.h" +#include "qpid/linearstore/IdDbt.h" +#include "qpid/linearstore/jrnl/txn_map.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" -#include "qmf/org/apache/qpid/legacystore/Package.h" -#include "qpid/legacystore/StoreException.h" +#include "qmf/org/apache/qpid/linearstore/Package.h" +#include "qpid/linearstore/StoreException.h" #include <dirent.h> #include <db.h> #define MAX_AIO_SLEEPS 100000 // tot: ~1 sec #define AIO_SLEEP_TIME_US 10 // 0.01 ms -namespace _qmf = qmf::org::apache::qpid::legacystore; +namespace _qmf = qmf::org::apache::qpid::linearstore; namespace mrg { namespace msgstore { @@ -78,6 +78,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en agent(0) {} +/* u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName) { if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) { @@ -87,7 +88,9 @@ u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const st } return param; } +*/ +/* u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks) { if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param > JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) { @@ -102,16 +105,17 @@ u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const st } return param; } +*/ -u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs) +u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName/*, const u_int16_t jrnlFsizePgs*/) { u_int32_t p = param; - if (jrnlFsizePgs == 1 && p > 64 ) { +/* if (jrnlFsizePgs == 1 && p > 64 ) { p = 64; QPID_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")"); } - else if (p == 0) { + else*/ if (p == 0) { // For zero value, use default p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); @@ -149,6 +153,7 @@ u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib) } } +/* void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts, bool& autoJrnlExpand, u_int16_t& autoJrnlExpandMaxFiles, @@ -198,6 +203,7 @@ void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts, autoJrnlExpand = true; autoJrnlExpandMaxFiles = p; } +*/ void MessageStoreImpl::initManagement () { @@ -233,45 +239,46 @@ bool MessageStoreImpl::init(const qpid::Options* options) { // Extract and check options const StoreOptions* opts = static_cast<const StoreOptions*>(options); - u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles"); - u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs"); - u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size", jrnlFsizePgs); - u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles"); - u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs"); - u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size", tplJrnlFSizePgs); - bool autoJrnlExpand; - u_int16_t autoJrnlExpandMaxFiles; - chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles"); +// u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles"); +// u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs"); + u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"/*, jrnlFsizePgs*/); +// u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles"); +// u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs"); + u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"/*, tplJrnlFSizePgs*/); +// bool autoJrnlExpand; +// u_int16_t autoJrnlExpandMaxFiles; +// chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles"); // Pass option values to init(...) - return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, opts->truncateFlag, jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib, autoJrnlExpand, autoJrnlExpandMaxFiles); + return init(opts->storeDir, /*numJrnlFiles, jrnlFsizePgs,*/ opts->truncateFlag, jrnlWrCachePageSizeKib, + /*tplNumJrnlFiles, tplJrnlFSizePgs,*/ tplJrnlWrCachePageSizeKib/*, autoJrnlExpand, autoJrnlExpandMaxFiles*/); } // These params, taken from options, are assumed to be correct and verified bool MessageStoreImpl::init(const std::string& dir, - u_int16_t jfiles, - u_int32_t jfileSizePgs, + /*u_int16_t jfiles, + u_int32_t jfileSizePgs,*/ const bool truncateFlag, u_int32_t wCachePageSizeKib, - u_int16_t tplJfiles, - u_int32_t tplJfileSizePgs, - u_int32_t tplWCachePageSizeKib, + /*u_int16_t tplJfiles, + u_int32_t tplJfileSizePgs,*/ + u_int32_t tplWCachePageSizeKib/*, bool autoJExpand, - u_int16_t autoJExpandMaxFiles) + u_int16_t autoJExpandMaxFiles*/) { if (isInit) return true; // Set geometry members (converting to correct units where req'd) - numJrnlFiles = jfiles; - jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE; +// numJrnlFiles = jfiles; +// jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE; wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib); - tplNumJrnlFiles = tplJfiles; - tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE; +// tplNumJrnlFiles = tplJfiles; +// tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE; tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib); - autoJrnlExpand = autoJExpand; - autoJrnlExpandMaxFiles = autoJExpandMaxFiles; +// autoJrnlExpand = autoJExpand; +// autoJrnlExpandMaxFiles = autoJExpandMaxFiles; if (dir.size()>0) storeDir = dir; if (truncateFlag) @@ -280,15 +287,15 @@ bool MessageStoreImpl::init(const std::string& dir, init(); QPID_LOG(notice, "Store module initialized; store-dir=" << dir); - QPID_LOG(info, "> Default files per journal: " << jfiles); +// QPID_LOG(info, "> Default files per journal: " << jfiles); // TODO: Uncomment these lines when auto-expand is enabled. // QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled")); // if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles); - QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)"); +// QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)"); QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)"); QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages); - QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles); - QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)"); +// QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles); +// QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)"); QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)"); QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages); @@ -424,7 +431,7 @@ void MessageStoreImpl::chkTplStoreInit() qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { journal::jdir::create_dir(getTplBaseDir()); - tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks); + tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } } @@ -468,7 +475,7 @@ MessageStoreImpl::~MessageStoreImpl() } void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, - const qpid::framing::FieldTable& args) + const qpid::framing::FieldTable& /*args*/) { checkInit(); if (queue.getPersistenceId()) { @@ -477,6 +484,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, JournalImpl* jQueue = 0; qpid::framing::FieldTable::ValuePtr value; +/* u_int16_t localFileCount = numJrnlFiles; bool localAutoExpandFlag = autoJrnlExpand; u_int16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles; @@ -489,6 +497,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, value = args.get("qpid.file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; +*/ if (queue.getName().size() == 0) { @@ -504,6 +513,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, journalList[queue.getName()]=jQueue; } +/* value = args.get("qpid.auto_expand"); if (value.get() != 0 && !value->empty() && value->convertsTo<bool>()) localAutoExpandFlag = (bool) value->get<bool>(); @@ -511,11 +521,12 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, value = args.get("qpid.auto_expand_max_jfiles"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) localAutoExpandMaxFileCount = (u_int16_t) value->get<int>(); +*/ queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); try { // init will create the deque's for the init... - jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks); + jQueue->initialize(/*localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks,*/ wCacheNumPages, wCachePgSizeSblks); } catch (const journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what()); } @@ -796,8 +807,10 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, long rcnt = 0L; // recovered msg count long idcnt = 0L; // in-doubt msg count u_int64_t thisHighestRid = 0ULL; - jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery + jQueue->recover(/*numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks,*/ wCacheNumPages, + wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery +/* // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting // from recovery of a store that has had its size changed externally by the resize utility. // If so, update the queue store settings so that QMF queries will reflect the new values. @@ -811,6 +824,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); } +*/ if (highestRid == 0ULL) highestRid = thisHighestRid; @@ -1172,7 +1186,7 @@ void MessageStoreImpl::recoverTplStore() { if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) { u_int64_t thisHighestRid = 0ULL; - tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); + tplStorePtr->recover(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit @@ -1675,16 +1689,17 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j) { MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : qpid::Options(name), - numJrnlFiles(defNumJrnlFiles), + /*numJrnlFiles(defNumJrnlFiles), autoJrnlExpand(defAutoJrnlExpand), autoJrnlExpandMaxFiles(defAutoJrnlExpandMaxFiles), - jrnlFsizePgs(defJrnlFileSizePgs), + jrnlFsizePgs(defJrnlFileSizePgs),*/ truncateFlag(defTruncateFlag), wCachePageSizeKib(defWCachePageSize), - tplNumJrnlFiles(defTplNumJrnlFiles), - tplJrnlFsizePgs(defTplJrnlFileSizePgs), + /*tplNumJrnlFiles(defTplNumJrnlFiles), + tplJrnlFsizePgs(defTplJrnlFileSizePgs),*/ tplWCachePageSizeKib(defTplWCachePageSize) { +/* std::ostringstream oss1; oss1 << "Default number of files for each journal instance (queue). [Allowable values: " << JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]"; @@ -1697,12 +1712,13 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : std::ostringstream oss4; oss4 << "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64KiB) [Allowable values: " << JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]"; +*/ addOptions() ("store-dir", qpid::optValue(storeDir, "DIR"), "Store directory location for persistence (instead of using --data-dir value). " "Required if --no-data-dir is also used.") - ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str()) - ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str()) +// ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str()) +// ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str()) // TODO: Uncomment these lines when auto-expand is enabled. // ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"), // "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. " @@ -1716,8 +1732,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : "Size of the pages in the write page cache in KiB. " "Allowable values - powers of 2: 1, 2, 4, ... , 128. " "Lower values decrease latency at the expense of throughput.") - ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str()) - ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str()) +// ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str()) +// ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str()) ("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"), "Size of the pages in the transaction prepared list write page cache in KiB. " "Allowable values - powers of 2: 1, 2, 4, ... , 128. " diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 68aceedfbb..24e31f1be6 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -25,17 +25,17 @@ #include <string> #include "db-inc.h" -#include "qpid/legacystore/Cursor.h" -#include "qpid/legacystore/IdDbt.h" -#include "qpid/legacystore/IdSequence.h" -#include "qpid/legacystore/JournalImpl.h" -#include "qpid/legacystore/jrnl/jcfg.h" -#include "qpid/legacystore/PreparedTransaction.h" +#include "qpid/linearstore/Cursor.h" +#include "qpid/linearstore/IdDbt.h" +#include "qpid/linearstore/IdSequence.h" +#include "qpid/linearstore/JournalImpl.h" +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/PreparedTransaction.h" #include "qpid/broker/Broker.h" #include "qpid/broker/MessageStore.h" #include "qpid/management/Manageable.h" -#include "qmf/org/apache/qpid/legacystore/Store.h" -#include "qpid/legacystore/TxnCtxt.h" +#include "qmf/org/apache/qpid/linearstore/Store.h" +#include "qpid/linearstore/TxnCtxt.h" // Assume DB_VERSION_MAJOR == 4 #if (DB_VERSION_MINOR == 2) @@ -63,14 +63,14 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem StoreOptions(const std::string& name="Store Options"); std::string clusterName; std::string storeDir; - u_int16_t numJrnlFiles; - bool autoJrnlExpand; - u_int16_t autoJrnlExpandMaxFiles; - u_int32_t jrnlFsizePgs; +// u_int16_t numJrnlFiles; +// bool autoJrnlExpand; +// u_int16_t autoJrnlExpandMaxFiles; +// u_int32_t jrnlFsizePgs; bool truncateFlag; u_int32_t wCachePageSizeKib; - u_int16_t tplNumJrnlFiles; - u_int32_t tplJrnlFsizePgs; +// u_int16_t tplNumJrnlFiles; +// u_int32_t tplJrnlFsizePgs; u_int32_t tplWCachePageSizeKib; }; @@ -99,16 +99,16 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem typedef JournalListMap::iterator JournalListMapItr; // Default store settings - static const u_int16_t defNumJrnlFiles = 8; - static const u_int32_t defJrnlFileSizePgs = 24; +// static const u_int16_t defNumJrnlFiles = 8; +// static const u_int32_t defJrnlFileSizePgs = 24; static const bool defTruncateFlag = false; static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; - static const u_int16_t defTplNumJrnlFiles = 8; - static const u_int32_t defTplJrnlFileSizePgs = 24; +// static const u_int16_t defTplNumJrnlFiles = 8; +// static const u_int32_t defTplJrnlFileSizePgs = 24; static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8; // TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line - static const bool defAutoJrnlExpand = false; - static const u_int16_t defAutoJrnlExpandMaxFiles = 0; +// static const bool defAutoJrnlExpand = false; +// static const u_int16_t defAutoJrnlExpandMaxFiles = 0; static const std::string storeTopLevelDir; static qpid::sys::Duration defJournalGetEventsTimeout; @@ -152,26 +152,26 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem const char* envPath; qpid::broker::Broker* broker; - qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject; + qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; // Parameter validation and calculation - static u_int16_t chkJrnlNumFilesParam(const u_int16_t param, - const std::string paramName); - static u_int32_t chkJrnlFileSizeParam(const u_int32_t param, - const std::string paramName, - const u_int32_t wCachePgSizeSblks = 0); +// static u_int16_t chkJrnlNumFilesParam(const u_int16_t param, +// const std::string paramName); +// static u_int32_t chkJrnlFileSizeParam(const u_int32_t param, +// const std::string paramName, +// const u_int32_t wCachePgSizeSblks = 0); static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param, - const std::string paramName, - const u_int16_t jrnlFsizePgs); + const std::string paramName/*, + const u_int16_t jrnlFsizePgs*/); static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib); - void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts, - bool& autoJrnlExpand, - u_int16_t& autoJrnlExpandMaxFiles, - const std::string& autoJrnlExpandMaxFilesParamName, - const u_int16_t numJrnlFiles, - const std::string& numJrnlFilesParamName); +// void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts, +// bool& autoJrnlExpand, +// u_int16_t& autoJrnlExpandMaxFiles, +// const std::string& autoJrnlExpandMaxFilesParamName, +// const u_int16_t numJrnlFiles, +// const std::string& numJrnlFilesParamName); void init(); @@ -280,15 +280,15 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem bool init(const qpid::Options* options); bool init(const std::string& dir, - u_int16_t jfiles = defNumJrnlFiles, - u_int32_t jfileSizePgs = defJrnlFileSizePgs, + /*u_int16_t jfiles = defNumJrnlFiles, + u_int32_t jfileSizePgs = defJrnlFileSizePgs,*/ const bool truncateFlag = false, u_int32_t wCachePageSize = defWCachePageSize, - u_int16_t tplJfiles = defTplNumJrnlFiles, - u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs, - u_int32_t tplWCachePageSize = defTplWCachePageSize, + /*u_int16_t tplJfiles = defTplNumJrnlFiles, + u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,*/ + u_int32_t tplWCachePageSize = defTplWCachePageSize/*, bool autoJExpand = defAutoJrnlExpand, - u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles); + u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles*/); void truncateInit(const bool saveStoreContent = false); diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp index 50b81e2824..ddf907d1f4 100644 --- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp +++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/legacystore/PreparedTransaction.h" +#include "qpid/linearstore/PreparedTransaction.h" #include <algorithm> using namespace mrg::msgstore; diff --git a/qpid/cpp/src/qpid/linearstore/StoreException.h b/qpid/cpp/src/qpid/linearstore/StoreException.h index 6624aafd5a..0f1474f102 100644 --- a/qpid/cpp/src/qpid/linearstore/StoreException.h +++ b/qpid/cpp/src/qpid/linearstore/StoreException.h @@ -22,7 +22,7 @@ #ifndef QPID_LEGACYSTORE_STOREEXCEPTION_H #define QPID_LEGACYSTORE_STOREEXCEPTION_H -#include "qpid/legacystore/IdDbt.h" +#include "qpid/linearstore/IdDbt.h" #include <boost/format.hpp> namespace mrg{ diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp index f9b77ce02c..eaaaa87f26 100644 --- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp @@ -24,7 +24,7 @@ #include "qpid/Options.h" #include "qpid/DataDir.h" #include "qpid/log/Statement.h" -#include "qpid/legacystore/MessageStoreImpl.h" +#include "qpid/linearstore/MessageStoreImpl.h" using mrg::msgstore::MessageStoreImpl; diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp index 1db41f4c70..5b1fd81275 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -19,12 +19,12 @@ * */ -#include "qpid/legacystore/TxnCtxt.h" +#include "qpid/linearstore/TxnCtxt.h" #include <sstream> -#include "qpid/legacystore/jrnl/jexception.h" -#include "qpid/legacystore/StoreException.h" +#include "qpid/linearstore/jrnl/jexception.h" +#include "qpid/linearstore/StoreException.h" namespace mrg { namespace msgstore { diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h index 77eaa27cd7..961efa5fe8 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h @@ -27,9 +27,9 @@ #include <set> #include <string> -#include "qpid/legacystore/DataTokenImpl.h" -#include "qpid/legacystore/IdSequence.h" -#include "qpid/legacystore/JournalImpl.h" +#include "qpid/linearstore/DataTokenImpl.h" +#include "qpid/linearstore/IdSequence.h" +#include "qpid/linearstore/JournalImpl.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/sys/Mutex.h" diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h index ab94b6914d..f4048fd3d3 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h @@ -59,8 +59,8 @@ #define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks #define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr // -//#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default) -//#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default) +#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default) +#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default) // #define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr #define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp index 08752b27c1..8ae2cd3dd7 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp @@ -90,8 +90,8 @@ jcntl::~jcntl() } void -jcntl::initialize(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_max_jfiles*/, - const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, +jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, aio_callback* const cbp) { _init_flag = false; @@ -106,12 +106,12 @@ jcntl::initialize(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_ // Set new file geometry parameters // assert(num_jfiles >= JRNL_MIN_NUM_FILES); // assert(num_jfiles <= JRNL_MAX_NUM_FILES); - _emap.set_num_jfiles(num_jfiles); - _tmap.set_num_jfiles(num_jfiles); +// _emap.set_num_jfiles(num_jfiles); +// _tmap.set_num_jfiles(num_jfiles); // assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE); // assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE); - _jfsize_sblks = jfsize_sblks; +// _jfsize_sblks = jfsize_sblks; // Clear any existing journal files _jdir.clear_dir(); @@ -130,8 +130,8 @@ jcntl::initialize(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_ } void -jcntl::recover(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_max_jfiles*/, - const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, +jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, // const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const std::vector<std::string>* prep_txn_list_ptr, aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, uint64_t& highest_rid) @@ -149,11 +149,11 @@ jcntl::recover(const uint16_t num_jfiles/*, const bool ae, const uint16_t ae_max // assert(num_jfiles <= JRNL_MAX_NUM_FILES); // assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE); // assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE); - _jfsize_sblks = jfsize_sblks; +// _jfsize_sblks = jfsize_sblks; // Verify journal dir and journal files _jdir.verify_dir(); - _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/); +// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/); rcvr_janalyze(_rcvdat, prep_txn_list_ptr); highest_rid = _rcvdat._h_rid; @@ -444,6 +444,7 @@ jcntl::fhdr_wr_sync(const uint16_t /*lid*/) */ } +/* fcntl* jcntl::new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp) { @@ -452,6 +453,7 @@ jcntl::new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const oss << jcp->jrnl_dir() << "/" << jcp->base_filename(); return new fcntl(oss.str(), fid, lid, jcp->jfsize_sblks(), rdp); } +*/ // Protected/Private functions diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h index 868cc5be31..0e59c1e8a0 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h @@ -190,8 +190,8 @@ namespace journal * * \exception TODO */ - void initialize(const uint16_t num_jfiles/*, const bool auto_expand, const uint16_t ae_max_jfiles*/, - const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, + void initialize(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, aio_callback* const cbp); /** @@ -227,8 +227,8 @@ namespace journal * * \exception TODO */ - void recover(const uint16_t num_jfiles/*, const bool auto_expand, const uint16_t ae_max_jfiles*/, - const uint32_t jfsize_sblks, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, + void recover(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, uint64_t& highest_rid); /** @@ -654,7 +654,7 @@ namespace journal /** * /brief Static function for creating new fcntl objects for use with obj_arr. */ - static fcntl* new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp); +// static fcntl* new_fcntl(jcntl* const jcp, const uint16_t lid, const uint16_t fid, const rcvdat* const rdp); protected: static bool _init; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c index c17fe0acec..e8263ea281 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c @@ -21,16 +21,15 @@ #include "file_hdr.h" -void file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t fro, const uint64_t ts_sec, const uint64_t ts_nsec, - const uint32_t file_count, const uint64_t file_size, const uint64_t file_number) { +int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, + const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size, + const uint64_t file_number) { rec_hdr_init(&dest->_rhdr, magic, version, uflag, rid); dest->_fro = fro; - dest->_ts_sec = ts_sec; - dest->_ts_nsec = ts_nsec; dest->_file_count = file_count; dest->_file_size = file_size; dest->_file_number = file_number; + return set_time_now(dest); } void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src) { diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h index a60dc43636..5076a184b1 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h @@ -78,9 +78,9 @@ typedef struct file_hdr_t { uint16_t _name_length; /**< Length of the queue name in octets, which follows this struct in the header */ } file_hdr_t; -void file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, - const uint64_t rid, const uint64_t fro, const uint64_t ts_sec, const uint64_t ts_nsec, - const uint32_t file_count, const uint64_t file_size, const uint64_t file_number); +int file_hdr_init(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, + const uint64_t rid, const uint64_t fro, const uint32_t file_count, const uint64_t file_size, + const uint64_t file_number); void file_hdr_copy(file_hdr_t* dest, const file_hdr_t* src); int set_time_now(file_hdr_t *fh); void set_time(file_hdr_t *fh, struct timespec *ts); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index acb09dc2c3..54d20edb5c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -19,15 +19,15 @@ * */ -#include "qpid/legacystore/jrnl/wmgr.h" +#include "qpid/linearstore/jrnl/wmgr.h" #include <cassert> #include <cerrno> #include <cstdlib> #include <cstring> -#include "qpid/legacystore/jrnl/file_hdr.h" -#include "qpid/legacystore/jrnl/jcntl.h" -#include "qpid/legacystore/jrnl/jerrno.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" +#include "qpid/linearstore/jrnl/jcntl.h" +#include "qpid/linearstore/jrnl/jerrno.h" #include <sstream> #include <stdint.h> @@ -36,9 +36,9 @@ namespace mrg namespace journal { -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc): +wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/): pmgr(jc, emap, tmap), - _wrfc(wrfc), +// _wrfc(wrfc), _max_dtokpp(0), _max_io_wait_us(0), _fhdr_base_ptr(0), @@ -55,10 +55,10 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc): _txn_pending_set() {} -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc, +wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/, const uint32_t max_dtokpp, const uint32_t max_iowait_us): pmgr(jc, emap, tmap /* , dtoklp */), - _wrfc(wrfc), +// _wrfc(wrfc), _max_dtokpp(max_dtokpp), _max_io_wait_us(max_iowait_us), _fhdr_base_ptr(0), @@ -138,8 +138,8 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); - _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient, + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient, external); if (!cont) { @@ -161,8 +161,9 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - if (data_offs_dblks == 0) - dtokp->set_fid(_wrfc.index()); + // TODO: replace for linearstore: _wrfc +// if (data_offs_dblks == 0) +// dtokp->set_fid(_wrfc.index()); _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -179,7 +180,7 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, // long multi-page messages have their token on the page containing the END of the // message. AIO callbacks will then only process this token when entire message is // enqueued. - _wrfc.incr_enqcnt(dtokp->fid()); + //_wrfc.incr_enqcnt(dtokp->fid()); // TODO: replace for linearstore: _wrfc if (xid_len) // If part of transaction, add to transaction map { @@ -237,9 +238,9 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ } const bool ext_rid = dtokp->external_rid(); - uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); + uint64_t rid = (ext_rid | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid(); - _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit); + _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit); if (!cont) { if (!ext_rid) @@ -265,8 +266,9 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - if (data_offs_dblks == 0) - dtokp->set_fid(_wrfc.index()); + // TODO: replace for linearstore: _wrfc +// if (data_offs_dblks == 0) +// dtokp->set_fid(_wrfc.index()); _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -304,7 +306,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } - _wrfc.decr_enqcnt(fid); +// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc } done = true; @@ -346,8 +348,8 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); - _txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi()); + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() :/* _wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/); if (!cont) { dtokp->set_rid(rid); @@ -366,8 +368,9 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - if (data_offs_dblks == 0) - dtokp->set_fid(_wrfc.index()); + // TODO: replace for linearstore: _wrfc +// if (data_offs_dblks == 0) +// dtokp->set_fid(_wrfc.index()); _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -386,8 +389,9 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le { if (!itr->_enq_flag) _emap.unlock(itr->_drid); // ignore rid not found error - if (itr->_enq_flag) - _wrfc.decr_enqcnt(itr->_pfid); + // TODO: replace for linearstore: _wrfc +// if (itr->_enq_flag) +// _wrfc.decr_enqcnt(itr->_pfid); } std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); if (!res.second) @@ -436,8 +440,8 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); - _txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi()); + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/); if (!cont) { dtokp->set_rid(rid); @@ -456,8 +460,9 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - if (data_offs_dblks == 0) - dtokp->set_fid(_wrfc.index()); + // TODO: replace for linearstore: _wrfc +// if (data_offs_dblks == 0) +// dtokp->set_fid(_wrfc.index()); _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -502,7 +507,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } - _wrfc.decr_enqcnt(fid); +// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc } } std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); @@ -527,10 +532,12 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l } void -wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem) +wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint32_t /*rec_dblks_rem*/) { // Has the file header been written (i.e. write pointers still at 0)? - if (_wrfc.is_void()) + // TODO: replace for linearstore: _wrfc +/* + if (_wrfc.is_void()) // TODO: replace for linearstore: _wrfc { bool file_fit = rec_dblks_rem <= _jfsize_dblks; bool file_full = rec_dblks_rem == _jfsize_dblks; @@ -542,8 +549,9 @@ wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_ } else fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE; - write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); + write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc } +*/ } void @@ -595,6 +603,7 @@ iores wmgr::write_flush() { iores res = RHM_IORES_SUCCESS; +/* // Don't bother flushing an empty page or one that is still in state AIO_PENDING if (_cached_offset_dblks) { @@ -640,16 +649,21 @@ wmgr::write_flush() get_events(UNUSED, 0); if (_page_cb_arr[_pg_index]._state == UNUSED) _page_cb_arr[_pg_index]._state = IN_USE; +*/ return res; } iores wmgr::rotate_file() { + // TODO: replace for linearstore: _wrfc +/* _pg_cntr = 0; iores res = _wrfc.rotate(); _jc->chk_wr_frot(); return res; +*/ + return RHM_IORES_SUCCESS; } int32_t @@ -692,8 +706,9 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) oss << "pg=" << pcbp->_index; else { - file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf; - oss << "fid=" << fhp->_pfid; + // TODO: replace for linearstore: fhp->_pfid +// file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf; +// oss << "fid=" << fhp->_pfid; } oss << " size=" << aiocbp->u.c.nbytes; oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]"; @@ -778,8 +793,9 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) // Increment the completed write offset // NOTE: We cannot use _wrfc here, as it may have rotated since submitting count. // Use stored pointer to fcntl in the pcb instead. - pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks); - pcbp->_wfh->decr_aio_cnt(); + // TODO: replace for linearstore: pcbp->_wfh +// pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks); +// pcbp->_wfh->decr_aio_cnt(); _jc->instr_decr_outstanding_aio_cnt(); // Clean up this pcb's data_tok list @@ -793,12 +809,15 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) else // File header writes have no pcb { // get lfid from original file header record, update info for that lfid - file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf; + // TODO: replace for linearstore: lfid +/* + file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf; uint32_t lfid = fhp->_lfid; fcntl* fcntlp = _jc->get_fcntlp(lfid); fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE); fcntlp->decr_aio_cnt(); fcntlp->set_wr_fhdr_aio_outstanding(false); +*/ } } @@ -821,8 +840,8 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co { pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); wmgr::clean(); - _num_jfiles = _jc->num_jfiles(); - if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles)) + /*_num_jfiles = _jc->num_jfiles();*/ // TODO: replace for linearstore: _jc->num_jfiles() + if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize /** _num_jfiles*/)) { wmgr::clean(); std::ostringstream oss; @@ -848,15 +867,18 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co iores wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, - const std::size_t xidsize, const std::size_t dsize, const bool external + const std::size_t /*xidsize*/, const std::size_t /*dsize*/, const bool /*external*/ ) const { // Check status of current file + // TODO: replace for linearstore: _wrfc +/* if (!_wrfc.is_wr_reset()) { if (!_wrfc.wr_reset()) return RHM_IORES_FULL; } +*/ // Check status of current page is ok for writing if (_page_cb_arr[_pg_index]._state != IN_USE) @@ -880,10 +902,12 @@ wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, case WMGR_ENQUEUE: { // Check for enqueue reaching cutoff threshold - uint32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize, - external)); + // TODO: replace for linearstore: _wrfc +/* + uint32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize, external)); if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks)) return RHM_IORES_ENQCAPTHRESH; +*/ if (!dtokp->is_writable()) { std::ostringstream oss; @@ -946,7 +970,7 @@ wmgr::dequeue_check(const std::string& xid, const uint64_t drid) void wmgr::dblk_roundup() { - const uint32_t xmagic = RHM_JDAT_EMPTY_MAGIC; + const uint32_t xmagic = QLS_EMPTY_MAGIC; uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE) * JRNL_SBLK_SIZE; while (_cached_offset_dblks < wdblks) { @@ -961,21 +985,25 @@ wmgr::dblk_roundup() } void -wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t lid, std::size_t fro) +wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro) { - file_hdr fhdr(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, rid, fid, lid, fro, _wrfc.owi(), true); + file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/; + /*int err =*/ ::file_hdr_init(&fhdr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, rid, fro, 0, 0, 0); std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr)); #ifdef RHM_CLEAN std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr)); #endif aio_cb* aiocbp = _fhdr_aio_cb_arr[fid]; - aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0); +// aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0); // TODO: replace for linearstore: _wrfc if (aio::submit(_ioctx, 1, &aiocbp) < 0) throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr"); _aio_evt_rem++; + // TODO: replace for linearstore: _wrfc +/* _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); _wrfc.incr_aio_cnt(); _wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true); +*/ } void @@ -1029,7 +1057,7 @@ wmgr::status_str() const default: oss << _page_cb_arr[i]._state; } } - oss << "] " << _wrfc.status_str(); + oss << "] " /*<< _wrfc.status_str()*/; // TODO: replace for linearstore: _wrfc return oss.str(); } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h index 39687226a5..7d29ac675f 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h @@ -31,9 +31,9 @@ class wmgr; } #include <cstring> -#include "qpid/legacystore/jrnl/enums.h" -#include "qpid/legacystore/jrnl/pmgr.h" -#include "qpid/legacystore/jrnl/wrfc.h" +#include "qpid/linearstore/jrnl/enums.h" +#include "qpid/linearstore/jrnl/pmgr.h" +//#include "qpid/linearstore/jrnl/wrfc.h" #include <set> namespace mrg diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml index 65969f0fb2..a9f7b143ad 100644 --- a/qpid/cpp/src/qpid/linearstore/management-schema.xml +++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml @@ -1,4 +1,4 @@ -<schema package="org.apache.qpid.legacystore"> +<schema package="org.apache.qpid.linearstore"> <!-- Licensed to the Apache Software Foundation (ASF) under one |