diff options
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 34 |
1 files changed, 17 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index a0835530f7..cbc395d61a 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -28,7 +28,6 @@ #include "qpid/linearstore/IdDbt.h" #include "qpid/linearstore/jrnl/EmptyFilePoolManager.h" #include "qpid/linearstore/jrnl/txn_map.h" -#include "qpid/linearstore/QpidLog.h" #include "qpid/framing/FieldValue.h" #include "qmf/org/apache/qpid/linearstore/Package.h" #include "qpid/linearstore/StoreException.h" @@ -73,6 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en isInit(false), envPath(envpath_), broker(broker_), + jrnlLog(qpid::qls_jrnl::JournalLog::LOG_NOTICE), mgmtObject(), agent(0) {} @@ -83,7 +83,7 @@ uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const s if (p == 0) { // For zero value, use default - p = JRNL_WMGR_DEF_PAGE_SIZE_KIB; + p = QLS_WMGR_DEF_PAGE_SIZE_KIB; QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); } else if ( p > 128 || (p & (p-1)) ) { // For any positive value that is not a power of 2, use closest value @@ -100,8 +100,8 @@ uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const s uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_) { - uint32_t wrPageSizeSblks = wrPageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks - uint32_t defTotWCacheSizeSblks = JRNL_WMGR_DEF_PAGE_SIZE_SBLKS * JRNL_WMGR_DEF_PAGES; + uint32_t wrPageSizeSblks = wrPageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks + uint32_t defTotWCacheSizeSblks = QLS_WMGR_DEF_PAGE_SIZE_SBLKS * QLS_WMGR_DEF_PAGES; switch (wrPageSizeKib_) { case 1: @@ -127,13 +127,13 @@ qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpi qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_, const std::string& paramName_) { - uint8_t rem = efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB); + uint8_t rem = efpFileSizeKib_ % uint64_t(QLS_SBLK_SIZE_KIB); if (rem != 0) { uint64_t newVal = efpFileSizeKib_ - rem; - if (rem >= (JRNL_SBLK_SIZE_KIB / 2)) - newVal += JRNL_SBLK_SIZE_KIB; + if (rem >= (QLS_SBLK_SIZE_KIB / 2)) + newVal += QLS_SBLK_SIZE_KIB; QLS_LOG(warning, "Parameter " << paramName_ << " (" << efpFileSizeKib_ << ") must be a multiple of " << - JRNL_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" << + QLS_SBLK_SIZE_KIB << "; changing this parameter to the closest allowable value (" << newVal << ")"); return newVal; } @@ -154,7 +154,7 @@ void MessageStoreImpl::initManagement () mgmtObject->set_location(storeDir); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); - mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE_BYTES); + mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES); mgmtObject->set_tplWritePages(tplWCacheNumPages); agent->addObject(mgmtObject, 0, true); @@ -193,9 +193,9 @@ bool MessageStoreImpl::init(const std::string& storeDir_, // Set geometry members (converting to correct units where req'd) defaultEfpPartitionNumber = efpPartition_; defaultEfpFileSize_kib = efpFileSize_kib_; - wCachePgSizeSblks = wCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks + wCachePgSizeSblks = wCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_); - tplWCachePgSizeSblks = tplWCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks + tplWCachePgSizeSblks = tplWCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_); if (storeDir_.size()>0) storeDir = storeDir_; @@ -267,7 +267,7 @@ void MessageStoreImpl::init() // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure. // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026 - tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); + tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), jrnlLog, defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); isInit = true; } catch (const DbException& e) { if (e.get_errno() == DB_VERSION_MISMATCH) @@ -291,7 +291,7 @@ void MessageStoreImpl::init() } } while (!isInit); - efpMgr.reset(new EmptyFilePoolManagerImpl(getStoreTopLevelDir())); + efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog)); efpMgr->findEfpPartitions(); } @@ -403,7 +403,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, return; } - jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), + jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), jrnlLog, defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { @@ -711,8 +711,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); break; } - jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName), defJournalGetEventsTimeout, - defJournalFlushTimeout, agent, + jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName),jrnlLog, + defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); @@ -1207,7 +1207,7 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) { - QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\""); +// QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\""); if (queue_.getExternalQueueStore() == 0) return; checkInit(); std::string qn = queue_.getName(); |