diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-08-23 18:07:15 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-08-23 18:07:15 +0000 |
commit | 7defd7eb7b11a0ff83ee0a7393477a453f4fb604 (patch) | |
tree | 1f5728f2f86afb454953fe64bf748a6c42b67c47 /qpid/cpp/src | |
parent | 2c39e4c4da4d727bdca8ce6d3ff8a9d570a04b22 (diff) | |
download | qpid-python-7defd7eb7b11a0ff83ee0a7393477a453f4fb604.tar.gz |
QPID-4984: WIP - compiles, but not functional.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1516958 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
22 files changed, 322 insertions, 430 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp index f78cb46af0..5115055375 100644 --- a/qpid/cpp/src/qpid/linearstore/BufferValue.cpp +++ b/qpid/cpp/src/qpid/linearstore/BufferValue.cpp @@ -26,7 +26,7 @@ namespace linearstore { -BufferValue::BufferValue(u_int32_t size, u_int64_t offset) +BufferValue::BufferValue(uint32_t size, uint64_t offset) : data(new char[size]), buffer(data, size) { diff --git a/qpid/cpp/src/qpid/linearstore/BufferValue.h b/qpid/cpp/src/qpid/linearstore/BufferValue.h index bbd6a5b88d..5af1ac6a43 100644 --- a/qpid/cpp/src/qpid/linearstore/BufferValue.h +++ b/qpid/cpp/src/qpid/linearstore/BufferValue.h @@ -36,7 +36,7 @@ class BufferValue : public Dbt public: qpid::framing::Buffer buffer; - BufferValue(u_int32_t size, u_int64_t offset); + BufferValue(uint32_t size, uint64_t offset); BufferValue(const qpid::broker::Persistable& p); virtual ~BufferValue(); }; diff --git a/qpid/cpp/src/qpid/linearstore/Cursor.h b/qpid/cpp/src/qpid/linearstore/Cursor.h index 3f3023c46c..c7e7f34faa 100644 --- a/qpid/cpp/src/qpid/linearstore/Cursor.h +++ b/qpid/cpp/src/qpid/linearstore/Cursor.h @@ -37,7 +37,7 @@ public: Cursor() : cursor(0) {} virtual ~Cursor() { if(cursor) cursor->close(); } - void open(db_ptr db, DbTxn* txn, u_int32_t flags = 0) { db->cursor(txn, &cursor, flags); } + void open(db_ptr db, DbTxn* txn, uint32_t flags = 0) { db->cursor(txn, &cursor, flags); } void close() { if(cursor) cursor->close(); cursor = 0; } Dbc* get() { return cursor; } Dbc* operator->() { return cursor; } diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp index 075234acb9..d427085bbe 100644 --- a/qpid/cpp/src/qpid/linearstore/IdDbt.cpp +++ b/qpid/cpp/src/qpid/linearstore/IdDbt.cpp @@ -28,7 +28,7 @@ IdDbt::IdDbt() : id(0) init(); } -IdDbt::IdDbt(u_int64_t _id) : id(_id) +IdDbt::IdDbt(uint64_t _id) : id(_id) { init(); } @@ -36,7 +36,7 @@ IdDbt::IdDbt(u_int64_t _id) : id(_id) void IdDbt::init() { set_data(&id); - set_size(sizeof(u_int64_t)); - set_ulen(sizeof(u_int64_t)); + set_size(sizeof(uint64_t)); + set_ulen(sizeof(uint64_t)); set_flags(DB_DBT_USERMEM); } diff --git a/qpid/cpp/src/qpid/linearstore/IdDbt.h b/qpid/cpp/src/qpid/linearstore/IdDbt.h index 0a5b0d782d..f8bb0647db 100644 --- a/qpid/cpp/src/qpid/linearstore/IdDbt.h +++ b/qpid/cpp/src/qpid/linearstore/IdDbt.h @@ -31,9 +31,9 @@ class IdDbt : public Dbt { void init(); public: - u_int64_t id; + uint64_t id; - IdDbt(u_int64_t id); + IdDbt(uint64_t id); IdDbt(); }; diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp index 4ad40f5329..4d3172ffe9 100644 --- a/qpid/cpp/src/qpid/linearstore/IdSequence.cpp +++ b/qpid/cpp/src/qpid/linearstore/IdSequence.cpp @@ -26,7 +26,7 @@ using qpid::sys::Mutex; IdSequence::IdSequence() : id(1) {} -u_int64_t IdSequence::next() +uint64_t IdSequence::next() { Mutex::ScopedLock guard(lock); if (!id) id++; // avoid 0 when folding around diff --git a/qpid/cpp/src/qpid/linearstore/IdSequence.h b/qpid/cpp/src/qpid/linearstore/IdSequence.h index 06f3db3f1e..11b31a2813 100644 --- a/qpid/cpp/src/qpid/linearstore/IdSequence.h +++ b/qpid/cpp/src/qpid/linearstore/IdSequence.h @@ -24,7 +24,6 @@ #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" -#include <sys/types.h> namespace qpid{ namespace linearstore{ diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index ea091ae941..f8b7777269 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -32,6 +32,7 @@ #include "qmf/org/apache/qpid/linearstore/EventRecovered.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" +#include "qpid/linearstore/Log.h" #include "qpid/linearstore/StoreException.h" using namespace qpid::qls_jrnl; @@ -79,10 +80,10 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_, initManagement(a); - log(LOG_NOTICE, "Created"); + QLS_LOG2(notice, _jid, "Created"); std::ostringstream oss; oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\""; - log(LOG_DEBUG, oss.str()); + QLS_LOG2(debug, _jid, oss.str()); } JournalImpl::~JournalImpl() @@ -90,7 +91,7 @@ JournalImpl::~JournalImpl() if (deleteCallback) deleteCallback(*this); if (_init_flag && !_stop_flag){ try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete! - catch (const jexception& e) { log(LOG_ERROR, e.what()); } + catch (const jexception& e) { QLS_LOG2(error, _jid, e.what()); } } getEventsFireEventsPtr->cancel(); inactivityFireEventPtr->cancel(); @@ -101,7 +102,7 @@ JournalImpl::~JournalImpl() _mgmtObject.reset(); } - log(LOG_NOTICE, "Destroyed"); + QLS_LOG2(notice, _jid, "Destroyed"); } void @@ -116,7 +117,7 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) _mgmtObject->set_name(_jid); _mgmtObject->set_directory(_jdir.dirname()); _mgmtObject->set_baseFileName(_base_filename); - _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); _mgmtObject->set_readPages(JRNL_RMGR_PAGES); // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime @@ -132,12 +133,12 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) void -JournalImpl::initialize(/*const u_int16_t num_jfiles, +JournalImpl::initialize(/*const uint16_t num_jfiles, const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks,*/ - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, + const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp) { std::ostringstream oss; @@ -145,9 +146,9 @@ JournalImpl::initialize(/*const u_int16_t num_jfiles, oss << "Initialize;"; oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; oss << " wcache_num_pages=" << wcache_num_pages; - log(LOG_DEBUG, oss.str()); + QLS_LOG2(debug, _jid, oss.str()); jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp); - log(LOG_DEBUG, "Initialization complete"); + QLS_LOG2(debug, _jid, "Initialization complete"); // TODO: replace for linearstore: _lpmgr /* if (_mgmtObject.get() != 0) @@ -156,27 +157,27 @@ JournalImpl::initialize(/*const u_int16_t num_jfiles, _mgmtObject->set_autoExpand(_lpmgr.is_ae()); _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); - _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); - _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE); + _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE); _mgmtObject->set_writePages(wcache_num_pages); } if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()), + _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()), qpid::management::ManagementAgent::SEV_NOTE); */ } void -JournalImpl::recover(/*const u_int16_t num_jfiles, +JournalImpl::recover(/*const uint16_t num_jfiles, const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks,*/ - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, + const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, - u_int64_t& highest_rid, - u_int64_t queue_id) + uint64_t& highest_rid, + uint64_t queue_id) { std::ostringstream oss1; // oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; @@ -184,7 +185,7 @@ JournalImpl::recover(/*const u_int16_t num_jfiles, oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec; oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; oss1 << " wcache_num_pages=" << wcache_num_pages; - log(LOG_DEBUG, oss1.str()); + QLS_LOG2(debug, _jid, oss1.str()); // TODO: replace for linearstore: _lpmgr /* if (_mgmtObject.get() != 0) @@ -193,8 +194,8 @@ JournalImpl::recover(/*const u_int16_t num_jfiles, _mgmtObject->set_autoExpand(_lpmgr.is_ae()); _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); - _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); - _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE); + _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE); _mgmtObject->set_writePages(wcache_num_pages); } */ @@ -231,7 +232,7 @@ JournalImpl::recover(/*const u_int16_t num_jfiles, oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid; oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size(); oss2 << "; journal now read-only."; - log(LOG_DEBUG, oss2.str()); + QLS_LOG2(debug, _jid, oss2.str()); if (_mgmtObject.get() != 0) { @@ -247,11 +248,11 @@ void JournalImpl::recover_complete() { jcntl::recover_complete(); - log(LOG_DEBUG, "Recover phase 2 complete; journal now writable."); + QLS_LOG2(debug, _jid, "Recover phase 2 complete; journal now writable."); // TODO: replace for linearstore: _lpmgr /* if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(), + _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles(), _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE); */ } @@ -261,7 +262,7 @@ JournalImpl::recover_complete() // Return true if content is recovered from store; false if content is external and must be recovered from an external store. // Throw exception for all errors. bool -JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset) +JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset) { qpid::sys::Mutex::ScopedLock sl(_read_lock); if (_dtok.rid() != rid) @@ -271,7 +272,7 @@ JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, siz // Last read encountered out-of-order rids, check if this rid is in that list bool oooFlag = false; - for (std::vector<u_int64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) { + for (std::vector<uint64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) { if (*i == rid) { oooFlag = true; } @@ -342,7 +343,7 @@ JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, siz if (_external) return false; - u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t); + uint32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(uint32_t)).getLong() + sizeof(uint32_t); if (hdr_offs + offset + length > _dlen) { data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset); } else { @@ -492,6 +493,7 @@ JournalImpl::flush(const bool block_till_aio_cmpl) return res; } +/* void JournalImpl::log(qpid::qls_jrnl::log_level ll, const std::string& log_stmt) const { @@ -503,15 +505,16 @@ JournalImpl::log(qpid::qls_jrnl::log_level ll, const char* const log_stmt) const { switch (ll) { - case LOG_TRACE: QPID_LOG(trace, "Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_DEBUG: QPID_LOG(debug, "Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_INFO: QPID_LOG(info, "Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_NOTICE: QPID_LOG(notice, "Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_WARN: QPID_LOG(warning, "Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_ERROR: QPID_LOG(error, "Journal \"" << _jid << "\": " << log_stmt); break; - case LOG_CRITICAL: QPID_LOG(critical, "Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_TRACE: QPID_LOG(trace, "QLS Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_DEBUG: QPID_LOG(debug, "QLS Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_INFO: QPID_LOG(info, "QLS Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_NOTICE: QPID_LOG(notice, "QLS Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_WARN: QPID_LOG(warning, "QLS Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_ERROR: QPID_LOG(error, "QLS Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_CRITICAL: QPID_LOG(critical, "QLS Journal \"" << _jid << "\": " << log_stmt); break; } } +*/ void JournalImpl::getEventsFire() @@ -568,7 +571,7 @@ JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl) } void -JournalImpl::rd_aio_cb(std::vector<u_int16_t>& /*pil*/) +JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/) {} void @@ -595,8 +598,8 @@ JournalImpl::handleIoResult(const iores r) case qpid::qls_jrnl::RHM_IORES_ENQCAPTHRESH: { std::ostringstream oss; - oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\"."; - log(LOG_WARN, oss.str()); + oss << "Enqueue capacity threshold exceeded."; + QLS_LOG2(warning, _jid, oss.str()); if (_agent != 0) _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"), qpid::management::ManagementAgent::SEV_WARN); @@ -606,7 +609,7 @@ JournalImpl::handleIoResult(const iores r) { std::ostringstream oss; oss << "Journal full on queue \"" << _jid << "\"."; - log(LOG_CRITICAL, oss.str()); + QLS_LOG2(critical, _jid, "Journal full."); if (_agent != 0) _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR); THROW_STORE_FULL_EXCEPTION(oss.str()); @@ -614,8 +617,8 @@ JournalImpl::handleIoResult(const iores r) default: { std::ostringstream oss; - oss << "Unexpected I/O response (" << qpid::qls_jrnl::iores_str(r) << ") on queue " << _jid << "\"."; - log(LOG_ERROR, oss.str()); + oss << "Unexpected I/O response (" << qpid::qls_jrnl::iores_str(r) << ")."; + QLS_LOG2(error, _jid, oss.str()); THROW_STORE_FULL_EXCEPTION(oss.str()); } } diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index c70a044bf5..692bccc9b0 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -76,7 +76,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr private: // static qpid::sys::Mutex _static_lock; -// static u_int32_t cnt; +// static uint32_t cnt; qpid::sys::Timer& timer; bool getEventsTimerSetFlag; @@ -84,8 +84,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr qpid::sys::Mutex _getf_lock; qpid::sys::Mutex _read_lock; - u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests - std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence + uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests + std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence bool writeActivityFlag; bool flushTriggeredFlag; @@ -117,44 +117,44 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void initManagement(qpid::management::ManagementAgent* agent); - void initialize(/*const u_int16_t num_jfiles, + void initialize(/*const uint16_t num_jfiles, const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks,*/ - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, + const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp); - inline void initialize(/*const u_int16_t num_jfiles, + inline void initialize(/*const uint16_t num_jfiles, const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks,*/ - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks) { + const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks) { initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, this); } - void recover(/*const u_int16_t num_jfiles, + void recover(/*const uint16_t num_jfiles, const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks,*/ - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, + const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, - u_int64_t& highest_rid, - u_int64_t queue_id); + uint64_t& highest_rid, + uint64_t queue_id); - inline void recover(/*const u_int16_t num_jfiles, + inline void recover(/*const uint16_t num_jfiles, const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks,*/ - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, + const uint16_t ae_max_jfiles, + const uint32_t jfsize_sblks,*/ + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, - u_int64_t& highest_rid, - u_int64_t queue_id) { + uint64_t& highest_rid, + uint64_t queue_id) { recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, this, prep_tx_list_ptr, highest_rid, queue_id); } @@ -164,7 +164,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr // Temporary fn to read and save last msg read from journal so it can be assigned // in chunks. To be replaced when coding to do this direct from the journal is ready. // Returns true if the record is extern, false if local. - bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0); + bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0); // Overrides for write inactivity timer void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, @@ -192,8 +192,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void stop(bool block_till_aio_cmpl = false); // Logging - void log(qpid::qls_jrnl::log_level level, const std::string& log_stmt) const; - void log(qpid::qls_jrnl::log_level level, const char* const log_stmt) const; +// void log(qpid::qls_jrnl::log_level level, const std::string& log_stmt) const; +// void log(qpid::qls_jrnl::log_level level, const char* const log_stmt) const; // Overrides for get_events timer qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false); @@ -204,7 +204,7 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr // AIO callbacks virtual void wr_aio_cb(std::vector<qpid::qls_jrnl::data_tok*>& dtokl); - virtual void rd_aio_cb(std::vector<u_int16_t>& pil); + virtual void rd_aio_cb(std::vector<uint16_t>& pil); qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const { return _mgmtObject; } diff --git a/qpid/cpp/src/qpid/linearstore/Log.h b/qpid/cpp/src/qpid/linearstore/Log.h new file mode 100644 index 0000000000..b03ea7ac9d --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/Log.h @@ -0,0 +1,30 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LEGACYSTORE_LOG_H +#define QPID_LEGACYSTORE_LOG_H + +#include "qpid/log/Statement.h" + +#define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg) +#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg) + +#endif // QPID_LEGACYSTORE_LOG_H diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index a51ca0bf37..f4b9a40455 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -26,8 +26,8 @@ #include "qpid/linearstore/BufferValue.h" #include "qpid/linearstore/IdDbt.h" #include "qpid/linearstore/jrnl/txn_map.h" +#include "qpid/linearstore/Log.h" #include "qpid/framing/FieldValue.h" -#include "qpid/log/Statement.h" #include "qmf/org/apache/qpid/linearstore/Package.h" #include "qpid/linearstore/StoreException.h" #include <dirent.h> @@ -42,13 +42,13 @@ namespace qpid{ namespace linearstore{ -const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the top-level store dir name +const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name // FIXME aconway 2010-03-09: was 10 qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s qpid::sys::Mutex TxnCtxt::globalSerialiser; -MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid, +MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag) : @@ -78,48 +78,19 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en agent(0) {} -/* -u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName) +uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param, const std::string paramName/*, const uint16_t jrnlFsizePgs*/) { - if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) { - std::ostringstream oss; - oss << "Parameter " << paramName << ": Illegal number of store journal files (" << param << "), must be " << JRNL_MIN_NUM_FILES << " to " << JRNL_MAX_NUM_FILES << " inclusive."; - THROW_STORE_EXCEPTION(oss.str()); - } - return param; -} -*/ - -/* -u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks) -{ - if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param > JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) { - std::ostringstream oss; - oss << "Parameter " << paramName << ": Illegal store journal file size (" << param << "), must be " << JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " to " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " inclusive."; - THROW_STORE_EXCEPTION(oss.str()); - } - if (wCachePgSizeSblks > param * JRNL_RMGR_PAGE_SIZE) { - std::ostringstream oss; - oss << "Cannot create store with file size less than write page cache size. [file size = " << param << " (" << (param * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]"; - THROW_STORE_EXCEPTION(oss.str()); - } - return param; -} -*/ - -u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName/*, const u_int16_t jrnlFsizePgs*/) -{ - u_int32_t p = param; + uint32_t p = param; /* if (jrnlFsizePgs == 1 && p > 64 ) { p = 64; - QPID_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")"); + QLS_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")"); } else*/ if (p == 0) { // For zero value, use default - p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; - QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); - } else if ( p > 128 || p & (p-1) ) { + p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; + QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); + } else if ( p > 128 || (p & (p-1)) ) { // For any positive value that is not a power of 2, use closest value if (p < 6) p = 4; else if (p < 12) p = 8; @@ -127,15 +98,15 @@ u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const else if (p < 48) p = 32; else if (p < 96) p = 64; else p = 128; - QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")"); + QLS_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")"); } return p; } -u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib) +uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib) { - u_int32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks - u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB). + uint32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB). switch (wrPageSizeKib) { case 1: @@ -153,58 +124,6 @@ u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib) } } -/* -void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts, - bool& autoJrnlExpand, - u_int16_t& autoJrnlExpandMaxFiles, - const std::string& autoJrnlExpandMaxFilesParamName, - const u_int16_t numJrnlFiles, - const std::string& numJrnlFilesParamName) -{ - if (!opts->autoJrnlExpand) { - // auto-expand disabled - autoJrnlExpand = false; - autoJrnlExpandMaxFiles = 0; - return; - } - u_int16_t p = opts->autoJrnlExpandMaxFiles; - if (numJrnlFiles == JRNL_MAX_NUM_FILES) { - // num-jfiles at max; disable auto-expand - autoJrnlExpand = false; - autoJrnlExpandMaxFiles = 0; - QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") must be higher than parameter " - << numJrnlFilesParamName << " (" << numJrnlFiles << ") which is at the maximum allowable value; disabling auto-expand."); - return; - } - if (p > JRNL_MAX_NUM_FILES) { - // auto-expand-max-jfiles higher than max allowable, adjust - autoJrnlExpand = true; - autoJrnlExpandMaxFiles = JRNL_MAX_NUM_FILES; - QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is above allowable maximum (" - << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value."); - return; - } - if (p && p == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) { - // num-jfiles is different from the default AND max-auto-expand-jfiles is still at default - // change value of max-auto-expand-jfiles - autoJrnlExpand = true; - if (2 * numJrnlFiles <= JRNL_MAX_NUM_FILES) { - autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES; - QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default value (" - << defAutoJrnlExpandMaxFiles << ") to twice that of parameter " << numJrnlFilesParamName << " (" << autoJrnlExpandMaxFiles << ")."); - } else { - autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES; - QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default to maximum allowable value (" - << JRNL_MAX_NUM_FILES << ") because of the value of " << numJrnlFilesParamName << " (" << numJrnlFiles << ")."); - } - return; - } - // No adjustments req'd, set values - autoJrnlExpand = true; - autoJrnlExpandMaxFiles = p; -} -*/ - void MessageStoreImpl::initManagement () { if (broker != 0) { @@ -219,10 +138,10 @@ void MessageStoreImpl::initManagement () mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); - mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE); mgmtObject->set_tplWritePages(tplWCacheNumPages); mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles); - mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE); mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles); agent->addObject(mgmtObject, 0, true); @@ -239,43 +158,35 @@ bool MessageStoreImpl::init(const qpid::Options* options) { // Extract and check options const StoreOptions* opts = static_cast<const StoreOptions*>(options); -// u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles"); -// u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs"); - u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"/*, jrnlFsizePgs*/); -// u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles"); -// u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs"); - u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"/*, tplJrnlFSizePgs*/); -// bool autoJrnlExpand; -// u_int16_t autoJrnlExpandMaxFiles; -// chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles"); + uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); + uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); - // Pass option values to init(...) - return init(opts->storeDir, /*numJrnlFiles, jrnlFsizePgs,*/ opts->truncateFlag, jrnlWrCachePageSizeKib, - /*tplNumJrnlFiles, tplJrnlFSizePgs,*/ tplJrnlWrCachePageSizeKib/*, autoJrnlExpand, autoJrnlExpandMaxFiles*/); + // Pass option values to init() + return init(opts->storeDir, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); } // These params, taken from options, are assumed to be correct and verified bool MessageStoreImpl::init(const std::string& dir, - /*u_int16_t jfiles, - u_int32_t jfileSizePgs,*/ + /*uint16_t jfiles, + uint32_t jfileSizePgs,*/ const bool truncateFlag, - u_int32_t wCachePageSizeKib, - /*u_int16_t tplJfiles, - u_int32_t tplJfileSizePgs,*/ - u_int32_t tplWCachePageSizeKib/*, + uint32_t wCachePageSizeKib, + /*uint16_t tplJfiles, + uint32_t tplJfileSizePgs,*/ + uint32_t tplWCachePageSizeKib/*, bool autoJExpand, - u_int16_t autoJExpandMaxFiles*/) + uint16_t autoJExpandMaxFiles*/) { if (isInit) return true; // Set geometry members (converting to correct units where req'd) // numJrnlFiles = jfiles; // jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE; - wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks + wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib); // tplNumJrnlFiles = tplJfiles; // tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE; - tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks + tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib); // autoJrnlExpand = autoJExpand; // autoJrnlExpandMaxFiles = autoJExpandMaxFiles; @@ -286,18 +197,18 @@ bool MessageStoreImpl::init(const std::string& dir, else init(); - QPID_LOG(notice, "Store module initialized; store-dir=" << dir); -// QPID_LOG(info, "> Default files per journal: " << jfiles); + QLS_LOG(notice, "Store module initialized; store-dir=" << dir); +// QLS_LOG(info, "> Default files per journal: " << jfiles); // TODO: Uncomment these lines when auto-expand is enabled. -// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled")); -// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles); -// QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)"); - QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)"); - QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages); -// QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles); -// QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)"); - QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)"); - QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages); +// QLS_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled")); +// if (autoJrnlExpand) QLS_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles); +// QLS_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)"); + QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)"); + QLS_LOG(info, "> Default number of write cache pages: " << wCacheNumPages); +// QLS_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles); +// QLS_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)"); + QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)"); + QLS_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages); return isInit; } @@ -311,14 +222,14 @@ void MessageStoreImpl::init() { closeDbs(); ::usleep(1000000); // 1 sec delay - QPID_LOG(error, "Previoius BDB store initialization failed, retrying (" << bdbRetryCnt << " of " << retryMax << ")..."); + QLS_LOG(error, "Previoius BDB store initialization failed, retrying (" << bdbRetryCnt << " of " << retryMax << ")..."); } try { qpid::qls_jrnl::jdir::create_dir(getBdbBaseDir()); dbenv.reset(new DbEnv(0)); - dbenv->set_errpfx("msgstore"); + dbenv->set_errpfx("linearstore"); dbenv->set_lg_regionmax(256000); // default = 65000 dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0); @@ -357,21 +268,21 @@ void MessageStoreImpl::init() } catch (const DbException& e) { if (e.get_errno() == DB_VERSION_MISMATCH) { - QPID_LOG(error, "Database environment mismatch: This version of db4 does not match that which created the store database.: " << e.what()); + QLS_LOG(error, "Database environment mismatch: This version of db4 does not match that which created the store database.: " << e.what()); THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of db4 does not match that which created the store database. " "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using " "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e); } - QPID_LOG(error, "BDB exception occurred while initializing store: " << e.what()); + QLS_LOG(error, "BDB exception occurred while initializing store: " << e.what()); if (bdbRetryCnt >= retryMax) THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e); } catch (const StoreException&) { throw; } catch (const qpid::qls_jrnl::jexception& e) { - QPID_LOG(error, "Journal Exception occurred while initializing store: " << e); + QLS_LOG(error, "Journal Exception occurred while initializing store: " << e); THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what()); } catch (...) { - QPID_LOG(error, "Unknown exception occurred while initializing store."); + QLS_LOG(error, "Unknown exception occurred while initializing store."); throw; } } while (!isInit); @@ -417,10 +328,10 @@ void MessageStoreImpl::truncateInit(const bool saveStoreContent) oss << storeDir << "/" << storeTopLevelDir; if (saveStoreContent) { std::string dir = qpid::qls_jrnl::jdir::push_down(storeDir, storeTopLevelDir, "cluster"); - QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << "."); + QLS_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << "."); } else { qpid::qls_jrnl::jdir::delete_dir(oss.str().c_str()); - QPID_LOG(notice, "Store directory " << oss.str() << " was truncated."); + QLS_LOG(notice, "Store directory " << oss.str() << " was truncated."); } init(); } @@ -459,13 +370,13 @@ MessageStoreImpl::~MessageStoreImpl() try { closeDbs(); } catch (const DbException& e) { - QPID_LOG(error, "Error closing BDB databases: " << e.what()); + QLS_LOG(error, "Error closing BDB databases: " << e.what()); } catch (const qpid::qls_jrnl::jexception& e) { - QPID_LOG(error, "Error: " << e.what()); + QLS_LOG(error, "Error: " << e.what()); } catch (const std::exception& e) { - QPID_LOG(error, "Error: " << e.what()); + QLS_LOG(error, "Error: " << e.what()); } catch (...) { - QPID_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()"); + QLS_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()"); } if (mgmtObject.get() != 0) { @@ -484,24 +395,22 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, JournalImpl* jQueue = 0; qpid::framing::FieldTable::ValuePtr value; -/* - u_int16_t localFileCount = numJrnlFiles; - bool localAutoExpandFlag = autoJrnlExpand; - u_int16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles; - u_int32_t localFileSizeSblks = jrnlFsizeSblks; - - value = args.get("qpid.file_count"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) - localFileCount = chkJrnlNumFilesParam((u_int16_t) value->get<int>(), "qpid.file_count"); - - value = args.get("qpid.file_size"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) - localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; -*/ +// uint16_t localFileCount = numJrnlFiles; +// bool localAutoExpandFlag = autoJrnlExpand; +// uint16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles; +// uint32_t localFileSizeSblks = jrnlFsizeSblks; +// +// value = args.get("qpid.file_count"); +// if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) +// localFileCount = chkJrnlNumFilesParam((uint16_t) value->get<int>(), "qpid.file_count"); +// +// value = args.get("qpid.file_size"); +// if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) +// localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; if (queue.getName().size() == 0) { - QPID_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); + QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); return; } @@ -513,15 +422,13 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue, journalList[queue.getName()]=jQueue; } -/* - value = args.get("qpid.auto_expand"); - if (value.get() != 0 && !value->empty() && value->convertsTo<bool>()) - localAutoExpandFlag = (bool) value->get<bool>(); - - value = args.get("qpid.auto_expand_max_jfiles"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) - localAutoExpandMaxFileCount = (u_int16_t) value->get<int>(); -*/ +// value = args.get("qpid.auto_expand"); +// if (value.get() != 0 && !value->empty() && value->convertsTo<bool>()) +// localAutoExpandFlag = (bool) value->get<bool>(); +// +// value = args.get("qpid.auto_expand_max_jfiles"); +// if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) +// localAutoExpandMaxFileCount = (uint16_t) value->get<int>(); queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); try { @@ -606,7 +513,7 @@ bool MessageStoreImpl::create(db_ptr db, IdSequence& seq, const qpid::broker::Persistable& p) { - u_int64_t id (seq.next()); + uint64_t id (seq.next()); Dbt key(&id, sizeof(id)); BufferValue value (p); @@ -774,7 +681,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, Cursor queues; queues.open(queueDb, txn.get()); - u_int64_t maxQueueId(1); + uint64_t maxQueueId(1); IdDbt key; Dbt value; @@ -790,7 +697,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, JournalImpl* jQueue = 0; if (queueName.size() == 0) { - QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); + QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); break; } jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"), @@ -806,7 +713,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, { long rcnt = 0L; // recovered msg count long idcnt = 0L; // in-doubt msg count - u_int64_t thisHighestRid = 0ULL; + uint64_t thisHighestRid = 0ULL; jQueue->recover(/*numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks,*/ wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery @@ -817,11 +724,11 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; qpid::framing::FieldTable::ValuePtr value; value = storeargs.get("qpid.file_count"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int16_t)value->get<int>() != jQueue->num_jfiles()) { + if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) { queue->addArgument("qpid.file_count", jQueue->num_jfiles()); } value = storeargs.get("qpid.file_size"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { + if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); } */ @@ -831,7 +738,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit highestRid = thisHighestRid; recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt); - QPID_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt."); + QLS_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt."); jQueue->recover_complete(); // start journal. } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what()); @@ -845,7 +752,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as // the messageIdSequence is used for both queue journals and the tpl journal. messageIdSequence.reset(highestRid + 1); - QPID_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec); + QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec); queueIdSequence.reset(maxQueueId + 1); } @@ -859,7 +766,7 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, Cursor exchanges; exchanges.open(exchangeDb, txn.get()); - u_int64_t maxExchangeId(1); + uint64_t maxExchangeId(1); IdDbt key; Dbt value; //read all exchanges @@ -871,7 +778,7 @@ void MessageStoreImpl::recoverExchanges(TxnCtxt& txn, //set the persistenceId and update max as required exchange->setPersistenceId(key.id); index[key.id] = exchange; - QPID_LOG(info, "Recovered exchange \"" << exchange->getName() << '"'); + QLS_LOG(info, "Recovered exchange \"" << exchange->getName() << '"'); } maxExchangeId = std::max(key.id, maxExchangeId); } @@ -890,7 +797,7 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn, while (bindings.next(key, value)) { qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size()); if (buffer.available() < 8) { - QPID_LOG(error, "Not enough data for binding: " << buffer.available()); + QLS_LOG(error, "Not enough data for binding: " << buffer.available()); THROW_STORE_EXCEPTION("Not enough data for binding"); } uint64_t queueId = buffer.getLongLong(); @@ -905,12 +812,12 @@ void MessageStoreImpl::recoverBindings(TxnCtxt& txn, if (exchange != exchanges.end() && queue != queues.end()) { //could use the recoverable queue here rather than the name... exchange->second->bind(queueName, routingkey, args); - QPID_LOG(info, "Recovered binding exchange=" << exchange->second->getName() + QLS_LOG(info, "Recovered binding exchange=" << exchange->second->getName() << " key=" << routingkey << " queue=" << queueName); } else { //stale binding, delete it - QPID_LOG(warning, "Deleting stale binding"); + QLS_LOG(warning, "Deleting stale binding"); bindings->del(0); } } @@ -922,7 +829,7 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn, Cursor items; items.open(generalDb, txn.get()); - u_int64_t maxGeneralId(1); + uint64_t maxGeneralId(1); IdDbt key; Dbt value; //read all items @@ -945,7 +852,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, long& rcnt, long& idcnt) { - size_t preambleLength = sizeof(u_int32_t)/*header size*/; + size_t preambleLength = sizeof(uint32_t)/*header size*/; JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); DataTokenImpl dtok; @@ -994,8 +901,8 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, // Reset the TTL for the recovered message msg->computeExpiration(broker->getExpiryPolicy()); - u_int32_t contentOffset = headerSize + preambleLength; - u_int64_t contentSize = readSize - contentOffset; + uint32_t contentOffset = headerSize + preambleLength; + uint64_t contentSize = readSize - contentOffset; if (msg->loadContent(contentSize) && !externalFlag) { //now read the content qpid::framing::Buffer contentBuff(data + contentOffset, contentSize); @@ -1007,7 +914,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, rcnt++; queue->recover(msg); } else { - u_int64_t rid = dtok.rid(); + uint64_t rid = dtok.rid(); std::string xid(i->xid); TplRecoverMapCitr citr = tplRecoverMap.find(xid); if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap"); @@ -1094,7 +1001,7 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn, int count(0); for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) { if (index.find(value.id) == index.end()) { - QPID_LOG(warning, "Recovered message for queue that no longer exists"); + QLS_LOG(warning, "Recovered message for queue that no longer exists"); mappings->del(0); } else { qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id]; @@ -1139,7 +1046,7 @@ void MessageStoreImpl::readTplStore() if (!txnList.empty()) { // xid found in tmap unsigned enqCnt = 0; unsigned deqCnt = 0; - u_int64_t rid = 0; + uint64_t rid = 0; // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists. // Note: will apply to both 1PC and 2PC transactions. @@ -1185,7 +1092,7 @@ void MessageStoreImpl::readTplStore() void MessageStoreImpl::recoverTplStore() { if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) { - u_int64_t thisHighestRid = 0ULL; + uint64_t thisHighestRid = 0ULL; tplStorePtr->recover(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; @@ -1248,11 +1155,11 @@ void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::brok void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue, const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, std::string& data, - u_int64_t offset, - u_int32_t length) + uint64_t offset, + uint32_t length) { checkInit(); - u_int64_t messageId (msg->getPersistenceId()); + uint64_t messageId (msg->getPersistenceId()); if (messageId != 0) { try { @@ -1297,8 +1204,8 @@ void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt, const qpid::broker::PersistableQueue& queue) { checkInit(); - u_int64_t queueId (queue.getPersistenceId()); - u_int64_t messageId (msg->getPersistenceId()); + uint64_t queueId (queue.getPersistenceId()); + uint64_t messageId (msg->getPersistenceId()); if (queueId == 0) { THROW_STORE_EXCEPTION("Queue not created: " + queue.getName()); } @@ -1323,10 +1230,10 @@ void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt, if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); } -u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message) +uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message) { - u_int32_t headerSize = message->encodedHeaderSize(); - u_int64_t size = message->encodedSize() + sizeof(u_int32_t); + uint32_t headerSize = message->encodedHeaderSize(); + uint64_t size = message->encodedSize() + sizeof(uint32_t); try { buff = std::vector<char>(size); } // long + headers + content catch (const std::exception& e) { std::ostringstream oss; @@ -1345,7 +1252,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue, bool /*newId*/) { std::vector<char> buff; - u_int64_t size = msgEncode(buff, message); + uint64_t size = msgEncode(buff, message); try { if (queue) { @@ -1375,8 +1282,8 @@ void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt, const qpid::broker::PersistableQueue& queue) { checkInit(); - u_int64_t queueId (queue.getPersistenceId()); - u_int64_t messageId (msg->getPersistenceId()); + uint64_t queueId (queue.getPersistenceId()); + uint64_t messageId (msg->getPersistenceId()); if (queueId == 0) { THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)"); } @@ -1429,7 +1336,7 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt, } } -u_int32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) +uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) { checkInit(); return 0; @@ -1458,7 +1365,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn, mgmtObject->inc_tplTxnAborts(); } } catch (const std::exception& e) { - QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what()); + QLS_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what()); throw; } } @@ -1509,7 +1416,7 @@ void MessageStoreImpl::localPrepare(TxnCtxt* ctxt) mgmtObject->inc_tplTxnPrepares(); } } catch (const std::exception& e) { - QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what()); + QLS_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what()); throw; } } @@ -1579,7 +1486,7 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue uint64_t queueId = buffer.getLongLong(); if (queue.getPersistenceId() == queueId) { bindings->del(0); - QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); } } } @@ -1591,7 +1498,7 @@ void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQue txn.abort(); throw; } - QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId()); + QLS_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId()); } void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange, @@ -1621,7 +1528,7 @@ void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& ex buffer.getShortString(k); if (bkey == k) { bindings->del(0); - QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); + QLS_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId); } } } @@ -1662,10 +1569,10 @@ std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& q return getJrnlHashDir(queue.getName().c_str()); } -u_int32_t MessageStoreImpl::bHash(const std::string str) +uint32_t MessageStoreImpl::bHash(const std::string str) { // Daniel Bernstein hash fn - u_int32_t h = 0; + uint32_t h = 0; for (std::string::const_iterator i = str.begin(); i < str.end(); i++) h = 33*h + *i; return h; @@ -1689,42 +1596,16 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j) { MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : qpid::Options(name), - /*numJrnlFiles(defNumJrnlFiles), - autoJrnlExpand(defAutoJrnlExpand), - autoJrnlExpandMaxFiles(defAutoJrnlExpandMaxFiles), - jrnlFsizePgs(defJrnlFileSizePgs),*/ truncateFlag(defTruncateFlag), wCachePageSizeKib(defWCachePageSize), - /*tplNumJrnlFiles(defTplNumJrnlFiles), - tplJrnlFsizePgs(defTplJrnlFileSizePgs),*/ - tplWCachePageSizeKib(defTplWCachePageSize) + tplWCachePageSizeKib(defTplWCachePageSize), + efpPartition(defEfpPartition), + efpFileSize(defEfpFileSize) { -/* - std::ostringstream oss1; - oss1 << "Default number of files for each journal instance (queue). [Allowable values: " << - JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]"; - std::ostringstream oss2; - oss2 << "Default size for each journal file in multiples of read pages (1 read page = 64KiB). [Allowable values: " << - JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]"; - std::ostringstream oss3; - oss3 << "Number of files for transaction prepared list journal instance. [Allowable values: " << - JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]"; - std::ostringstream oss4; - oss4 << "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64KiB) [Allowable values: " << - JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]"; -*/ addOptions() ("store-dir", qpid::optValue(storeDir, "DIR"), "Store directory location for persistence (instead of using --data-dir value). " "Required if --no-data-dir is also used.") -// ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str()) -// ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str()) -// TODO: Uncomment these lines when auto-expand is enabled. -// ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"), -// "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. " -// "If no|false|0, the number of journal files will remain fixed (num-jfiles).") -// ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"), -// "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.") ("truncate", qpid::optValue(truncateFlag, "yes|no"), "If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve " "the existing store files for recovery.") @@ -1732,12 +1613,14 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : "Size of the pages in the write page cache in KiB. " "Allowable values - powers of 2: 1, 2, 4, ... , 128. " "Lower values decrease latency at the expense of throughput.") -// ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str()) -// ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str()) ("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"), "Size of the pages in the transaction prepared list write page cache in KiB. " "Allowable values - powers of 2: 1, 2, 4, ... , 128. " "Lower values decrease latency at the expense of throughput.") + ("efp-partition", qpid::optValue(efpPartition, "N"), + "Empty File Pool partition to use for finding empty journal files") + ("efp-file-size", qpid::optValue(efpFileSize, "N"), + "Empty File Pool file size to use for journal files") ; } diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 26e4962ead..5d5fa28ff8 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -60,35 +60,31 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem typedef boost::shared_ptr<DbEnv> dbEnv_ptr; struct StoreOptions : public qpid::Options { - StoreOptions(const std::string& name="Store Options"); + StoreOptions(const std::string& name="Linear Store Options"); std::string clusterName; std::string storeDir; -// u_int16_t numJrnlFiles; -// bool autoJrnlExpand; -// u_int16_t autoJrnlExpandMaxFiles; -// u_int32_t jrnlFsizePgs; - bool truncateFlag; - u_int32_t wCachePageSizeKib; -// u_int16_t tplNumJrnlFiles; -// u_int32_t tplJrnlFsizePgs; - u_int32_t tplWCachePageSizeKib; + bool truncateFlag; + uint32_t wCachePageSizeKib; + uint32_t tplWCachePageSizeKib; + uint16_t efpPartition; + uint64_t efpFileSize; }; protected: - typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index; - typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index; - typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index; + typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index; + typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index; + typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index; typedef LockedMappings::map txn_lock_map; typedef boost::ptr_list<PreparedTransaction> txn_list; // Structs for Transaction Recover List (TPL) recover state struct TplRecoverStruct { - u_int64_t rid; // rid of TPL record + uint64_t rid; // rid of TPL record bool deq_flag; bool commit_flag; bool tpc_flag; - TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag); + TplRecoverStruct(const uint64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag); }; typedef TplRecoverStruct TplRecover; typedef std::pair<std::string, TplRecover> TplRecoverMapPair; @@ -99,16 +95,11 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem typedef JournalListMap::iterator JournalListMapItr; // Default store settings -// static const u_int16_t defNumJrnlFiles = 8; -// static const u_int32_t defJrnlFileSizePgs = 24; - static const bool defTruncateFlag = false; - static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; -// static const u_int16_t defTplNumJrnlFiles = 8; -// static const u_int32_t defTplJrnlFileSizePgs = 24; - static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8; - // TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line -// static const bool defAutoJrnlExpand = false; -// static const u_int16_t defAutoJrnlExpandMaxFiles = 0; + static const bool defTruncateFlag = false; + static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; + static const uint32_t defTplWCachePageSize = defWCachePageSize / 8; + static const uint16_t defEfpPartition = 0; + static const uint64_t defEfpFileSize = 512 * JRNL_SBLK_SIZE; static const std::string storeTopLevelDir; static qpid::sys::Duration defJournalGetEventsTimeout; @@ -136,18 +127,18 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem IdSequence generalIdSequence; IdSequence messageIdSequence; std::string storeDir; - u_int16_t numJrnlFiles; + uint16_t numJrnlFiles; bool autoJrnlExpand; - u_int16_t autoJrnlExpandMaxFiles; - u_int32_t jrnlFsizeSblks; + uint16_t autoJrnlExpandMaxFiles; + uint32_t jrnlFsizeSblks; bool truncateFlag; - u_int32_t wCachePgSizeSblks; - u_int16_t wCacheNumPages; - u_int16_t tplNumJrnlFiles; - u_int32_t tplJrnlFsizeSblks; - u_int32_t tplWCachePgSizeSblks; - u_int16_t tplWCacheNumPages; - u_int64_t highestRid; + uint32_t wCachePgSizeSblks; + uint16_t wCacheNumPages; + uint16_t tplNumJrnlFiles; + uint32_t tplJrnlFsizeSblks; + uint32_t tplWCachePgSizeSblks; + uint16_t tplWCacheNumPages; + uint64_t highestRid; bool isInit; const char* envPath; qpid::broker::Broker* broker; @@ -157,21 +148,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // Parameter validation and calculation -// static u_int16_t chkJrnlNumFilesParam(const u_int16_t param, -// const std::string paramName); -// static u_int32_t chkJrnlFileSizeParam(const u_int32_t param, -// const std::string paramName, -// const u_int32_t wCachePgSizeSblks = 0); - static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param, + static uint32_t chkJrnlWrPageCacheSize(const uint32_t param, const std::string paramName/*, - const u_int16_t jrnlFsizePgs*/); - static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib); -// void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts, -// bool& autoJrnlExpand, -// u_int16_t& autoJrnlExpandMaxFiles, -// const std::string& autoJrnlExpandMaxFilesParamName, -// const u_int16_t numJrnlFiles, -// const std::string& numJrnlFilesParamName); + const uint16_t jrnlFsizePgs*/); + static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKib); void init(); @@ -213,7 +193,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem void recoverTplStore(); void recoverLockedMappings(txn_list& txns); TxnCtxt* check(qpid::broker::TransactionContext* ctxt); - u_int64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message); + uint64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message); void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message, @@ -245,7 +225,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // journal functions void createJrnlQueue(const qpid::broker::PersistableQueue& queue); - u_int32_t bHash(const std::string str); + uint32_t bHash(const std::string str); std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/ std::string getJrnlHashDir(const std::string& queueName); std::string getJrnlBaseDir(); @@ -280,15 +260,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem bool init(const qpid::Options* options); bool init(const std::string& dir, - /*u_int16_t jfiles = defNumJrnlFiles, - u_int32_t jfileSizePgs = defJrnlFileSizePgs,*/ const bool truncateFlag = false, - u_int32_t wCachePageSize = defWCachePageSize, - /*u_int16_t tplJfiles = defTplNumJrnlFiles, - u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,*/ - u_int32_t tplWCachePageSize = defTplWCachePageSize/*, - bool autoJExpand = defAutoJrnlExpand, - u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles*/); + uint32_t wCachePageSize = defWCachePageSize, + uint32_t tplWCachePageSize = defTplWCachePageSize); void truncateInit(const bool saveStoreContent = false); @@ -345,7 +319,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem void flush(const qpid::broker::PersistableQueue& queue); - u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue); + uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue); void collectPreparedXids(std::set<std::string>& xids); @@ -364,7 +338,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } - inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&) + inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&) { return qpid::management::Manageable::STATUS_OK; } std::string getStoreDir() const; diff --git a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h index 74b93abfde..9888342579 100644 --- a/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h +++ b/qpid/cpp/src/qpid/linearstore/PreparedTransaction.h @@ -25,6 +25,7 @@ #include <list> #include <map> #include <set> +#include <stdint.h> #include <string> #include <boost/shared_ptr.hpp> #include <boost/ptr_container/ptr_list.hpp> @@ -32,8 +33,8 @@ namespace qpid{ namespace linearstore{ -typedef u_int64_t queue_id; -typedef u_int64_t message_id; +typedef uint64_t queue_id; +typedef uint64_t message_id; class LockedMappings { diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp index 98b2275c96..840468d8bd 100644 --- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp @@ -23,7 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/DataDir.h" -#include "qpid/log/Statement.h" +#include "qpid/linearstore/Log.h" #include "qpid/linearstore/MessageStoreImpl.h" using qpid::linearstore::MessageStoreImpl; @@ -49,7 +49,7 @@ struct StorePlugin : public Plugin { if (options.storeDir.empty ()) { if (!dataDir.isEnabled ()) - throw Exception ("msgstore: If --data-dir is blank or --no-data-dir is specified, --store-dir must be present."); + throw Exception ("linearstore: If broker option --data-dir is blank or --no-data-dir is specified, linearstore option --store-dir must be present."); options.storeDir = dataDir.getPath (); } @@ -64,7 +64,7 @@ struct StorePlugin : public Plugin { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; if (!store) return; - QPID_LOG(info, "Enabling management instrumentation for the store."); + QLS_LOG(info, "Enabling management instrumentation."); store->initManagement(); } diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp index 4084fa992d..022d9c13e2 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -78,15 +78,15 @@ TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTok // // Human-readable tid: 53 bytes // // uuit_t is a char[16] // tid.reserve(53); -// u_int64_t* u1 = (u_int64_t*)uuid; -// u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t)); +// uint64_t* u1 = (uint64_t*)uuid; +// uint64_t* u2 = (uint64_t*)(uuid + sizeof(uint64_t)); // std::stringstream s; // s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2; // tid.assign(s.str()); // Binary tid: 24 bytes tid.reserve(24); - u_int64_t c = uuidSeq.next(); + uint64_t c = uuidSeq.next(); tid.append((char*)&c, sizeof(c)); tid.append((char*)&uuid, sizeof(uuid)); } @@ -172,7 +172,7 @@ DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); } void TxnCtxt::incrDtokRef() { dtokp->addRef(); } -void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) { +void TxnCtxt::recoverDtok(const uint64_t rid, const std::string xid) { dtokp->set_rid(rid); dtokp->set_wstate(DataTokenImpl::ENQ); dtokp->set_xid(xid); diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h index f66b9fd74f..4c009d7a94 100644 --- a/qpid/cpp/src/qpid/linearstore/TxnCtxt.h +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.h @@ -95,7 +95,7 @@ class TxnCtxt : public qpid::broker::TransactionContext bool impactedQueuesEmpty(); DataTokenImpl* getDtok(); void incrDtokRef(); - void recoverDtok(const u_int64_t rid, const std::string xid); + void recoverDtok(const uint64_t rid, const std::string xid); }; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h index f4048fd3d3..fc44e35331 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h @@ -49,7 +49,8 @@ * (The disk softblock size is 512 for Linux kernels >= 2.6) */ #define JRNL_DBLK_SIZE 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */ -#define JRNL_SBLK_SIZE 32 /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */ +#define JRNL_SBLK_SIZE_DBLKS 32 /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */ +#define JRNL_SBLK_SIZE JRNL_SBLK_SIZE_DBLKS * JRNL_DBLK_SIZE /**< Disk softblock size in bytes */ //#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr) //#define JRNL_MAX_FILE_SIZE 4194176 ///< Max. jrnl file size in sblks (excl. file_hdr) //#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp index cabb43191a..552aa92b9c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp @@ -644,7 +644,7 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, const std::vector<std::string>* /*prep_txn_ } // Check for file full condition - add one to _jfsize_sblks to account for file header - rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE; + rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE; // Check for journal full condition uint16_t next_wr_fid = (rd._lfid + 1) % rd._njf; @@ -893,7 +893,7 @@ jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& r // assert(fhdr._lfid == fid); if (!rd._fro) rd._fro = fhdr._fro; - std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_DBLK_SIZE * JRNL_SBLK_SIZE; + std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE; ifsp->seekg(foffs); } else @@ -938,14 +938,14 @@ jcntl::check_owi(const uint16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::st void jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcvdat& rd) { - unsigned sblk_offs = file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE); + unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE; if (sblk_offs) { { std::ostringstream oss; oss << std::hex << "Bad record alignment found at fid=0x" << fid; oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec; - oss << (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required."; + oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required."; this->log(LOG_WARN, oss.str()); } const uint32_t xmagic = QLS_EMPTY_MAGIC; @@ -965,7 +965,7 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv // clear should inspection of the file be required. std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic)); - while (file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE)) + while (file_pos % JRNL_SBLK_SIZE) { ofsp.write((const char*)buff, JRNL_DBLK_SIZE); assert(!ofsp.fail()); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h index e1253ce9c4..59c5df2ff2 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h @@ -150,7 +150,7 @@ namespace qls_jrnl static inline uint32_t size_dblks(const std::size_t size) { return size_blks(size, JRNL_DBLK_SIZE); } static inline uint32_t size_sblks(const std::size_t size) - { return size_blks(size, JRNL_DBLK_SIZE * JRNL_SBLK_SIZE); } + { return size_blks(size, JRNL_SBLK_SIZE); } static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize) { return (size + blksize - 1)/blksize; } virtual uint64_t rid() const = 0; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp index 7f9d0aed98..40611692cf 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp @@ -38,6 +38,7 @@ namespace qls_jrnl pmgr::page_cb::page_cb(uint16_t index): _index(index), _state(UNUSED), + _frid(0), _wdblks(0), _rdblks(0), _pdtokl(0), @@ -63,7 +64,7 @@ pmgr::page_cb::state_str() const return "<unknown>"; } -const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE; +const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE; pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap): _cache_pgsize_sblks(0), diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp index f49066c38d..b6d0f0f2ad 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp @@ -266,7 +266,7 @@ rmgr::get_events(page_state /*state*/, timespec* const timeout, bool flush) { // TODO: replace for linear store: _rfh /* - if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE) // Detects if write reset of this fcntl obj has occurred. + if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE_DBLKS) // Detects if write reset of this fcntl obj has occurred. { // Increment the completed read offset // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count. @@ -281,15 +281,15 @@ rmgr::get_events(page_state /*state*/, timespec* const timeout, bool flush) else // File header reads have no pcb { std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr_t)); - /*_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);*/ // TODO: replace for linear store: _rrfc + /*_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);*/ // TODO: replace for linear store: _rrfc - uint32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE; + uint32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE_DBLKS; // Check fro_dblks does not exceed the write pointers which can happen in some corrupted journal recoveries // TODO: replace for linear store: _fhdr._pfid, _rrfc -// if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE) -// fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE; - _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); - uint32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE; +// if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE_DBLKS) +// fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE_DBLKS; + _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS); + uint32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS; _pg_index = _pg_cntr % JRNL_RMGR_PAGES; _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks; // _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks); @@ -601,16 +601,16 @@ rmgr::init_aio_reads(const int16_t /*first_uninit*/, const uint16_t /*num_uninit if (_rrfc.subm_offs() == 0) { - _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); - _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE); + _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); + _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); } // TODO: Future perf improvement: Do a single AIO read for all available file // space into all contiguous empty pages in one AIO operation. uint32_t file_rem_dblks = _rrfc.remaining_dblks(); - file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary - uint32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE; + file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE_DBLKS; // round down to closest sblk boundary + uint32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS; uint32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks; if (rd_size) { @@ -618,7 +618,7 @@ rmgr::init_aio_reads(const int16_t /*first_uninit*/, const uint16_t /*num_uninit // TODO: For perf, combine contiguous pages into single read // 1 or 2 AIOs needed depending on whether read block folds aio_cb* aiocbp = &_aio_cb_arr[pi]; - aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs()); + aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE_DBLKS, _rrfc.subm_offs()); if (aio::submit(_ioctx, 1, &aiocbp) < 0) throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads"); _rrfc.add_subm_cnt_dblks(rd_size); @@ -640,7 +640,7 @@ rmgr::rotate_page() { _page_cb_arr[_pg_index]._rdblks = 0; _page_cb_arr[_pg_index]._state = UNUSED; - if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE) + if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS) { _pg_offset_dblks = 0; _pg_cntr++; @@ -682,7 +682,7 @@ rmgr::init_file_header_read() if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0) throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read"); _aio_evt_rem++; - _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); + _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); _fhdr_rd_outstanding = true; */ } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index 8ea129a945..ccc9bf6bab 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -94,13 +94,13 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); - _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE; + _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS; _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); if (eo) { - const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE; + const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS; uint32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr _pg_cntr = data_dblks / wr_pg_size_dblks; _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); @@ -154,11 +154,11 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -259,11 +259,11 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -361,11 +361,11 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -453,11 +453,11 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -545,10 +545,10 @@ wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint3 if (cont) { if (file_fit && !file_full) - fro = (rec_dblks_rem + JRNL_SBLK_SIZE) * JRNL_DBLK_SIZE; + fro = (rec_dblks_rem + JRNL_SBLK_SIZE_DBLKS) * JRNL_DBLK_SIZE; } else - fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE; + fro = JRNL_SBLK_SIZE; write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc } */ @@ -558,7 +558,7 @@ void wmgr::flush_check(iores& res, bool& cont, bool& done) { // Is page is full, flush - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE) + if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) { res = write_flush(); assert(res == RHM_IORES_SUCCESS); @@ -814,7 +814,7 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf; uint32_t lfid = fhp->_lfid; fcntl* fcntlp = _jc->get_fcntlp(lfid); - fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE); + fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); fcntlp->decr_aio_cnt(); fcntlp->set_wr_fhdr_aio_outstanding(false); */ @@ -971,7 +971,7 @@ void wmgr::dblk_roundup() { const uint32_t xmagic = QLS_EMPTY_MAGIC; - uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE) * JRNL_SBLK_SIZE; + uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS; while (_cached_offset_dblks < wdblks) { void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); @@ -1000,7 +1000,7 @@ wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro) _aio_evt_rem++; // TODO: replace for linearstore: _wrfc /* - _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); + _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); _wrfc.incr_aio_cnt(); _wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true); */ @@ -1010,7 +1010,7 @@ void wmgr::rotate_page() { _page_cb_arr[_pg_index]._state = AIO_PENDING; - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE) + if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) { _pg_offset_dblks = 0; _pg_cntr++; |