diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-10-07 18:39:24 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-10-07 18:39:24 +0000 |
commit | 127d7a7cd4f6a36ae3e44aaa6a12a28692b12b90 (patch) | |
tree | 9be501e6a790807892bc7a3f2c3cbd803d2cc01d /qpid/cpp | |
parent | c70bf3ea28cdf6bafd8571690d3e5c466a0658a2 (diff) | |
download | qpid-python-127d7a7cd4f6a36ae3e44aaa6a12a28692b12b90.tar.gz |
QPID-4984: WIP - Compiles, but functionally incomplete
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1530024 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
52 files changed, 1984 insertions, 1413 deletions
diff --git a/qpid/cpp/src/linearstore.cmake b/qpid/cpp/src/linearstore.cmake index 5377acd207..2237ed8a0f 100644 --- a/qpid/cpp/src/linearstore.cmake +++ b/qpid/cpp/src/linearstore.cmake @@ -83,27 +83,20 @@ if (BUILD_LINEARSTORE) qpid/linearstore/jrnl/EmptyFilePool.cpp qpid/linearstore/jrnl/EmptyFilePoolManager.cpp qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp - #qpid/linearstore/jrnl/fcntl.cpp qpid/linearstore/jrnl/jcntl.cpp qpid/linearstore/jrnl/jdir.cpp qpid/linearstore/jrnl/jerrno.cpp qpid/linearstore/jrnl/jexception.cpp - #qpid/linearstore/jrnl/jinf.cpp qpid/linearstore/jrnl/JournalFile.cpp - qpid/linearstore/jrnl/JournalFileController.cpp qpid/linearstore/jrnl/JournalLog.cpp qpid/linearstore/jrnl/jrec.cpp - #qpid/linearstore/jrnl/lp_map.cpp - #qpid/linearstore/jrnl/lpmgr.cpp + qpid/linearstore/jrnl/LinearFileController.cpp qpid/linearstore/jrnl/pmgr.cpp - qpid/linearstore/jrnl/rmgr.cpp - #qpid/linearstore/jrnl/rfc.cpp - #qpid/linearstore/jrnl/rrfc.cpp + qpid/linearstore/jrnl/RecoveryManager.cpp qpid/linearstore/jrnl/time_ns.cpp qpid/linearstore/jrnl/txn_map.cpp qpid/linearstore/jrnl/txn_rec.cpp qpid/linearstore/jrnl/wmgr.cpp - #qpid/linearstore/jrnl/wrfc.cpp ) # linearstore source files diff --git a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp index 37e3922846..2904afb944 100644 --- a/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/EmptyFilePoolManagerImpl.cpp @@ -49,8 +49,8 @@ void EmptyFilePoolManagerImpl::findEfpPartitions() { QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'"); for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) { - QLS_LOG(info, " - EFP \'" << (*j)->fileSizeKib() << "k\' containing " << (*j)->numEmptyFiles() << - " files of size " << (*j)->fileSizeKib() << " KiB totaling " << (*j)->cumFileSizeKib() << " KiB"); + QLS_LOG(info, " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() << + " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB"); } } } diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp index 3fdfb24592..42ca113dc2 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp @@ -62,7 +62,6 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_, jcntl(journalId, journalDirectory/*, journalBaseFilename*/), timer(timer_), getEventsTimerSetFlag(false), - efpp(0), // lastReadRid(0), writeActivityFlag(false), flushTriggeredFlag(true), @@ -119,8 +118,8 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a) _mgmtObject->set_name(_jid); _mgmtObject->set_directory(_jdir.dirname()); // _mgmtObject->set_baseFileName(_base_filename); - _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); - _mgmtObject->set_readPages(JRNL_RMGR_PAGES); +// _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); +// _mgmtObject->set_readPages(JRNL_RMGR_PAGES); // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime //_mgmtObject->set_initialFileCount(0); @@ -140,7 +139,6 @@ JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp) { - efpp = efpp_; // efpp->createJournal(_jdir); // QLS_LOG2(notice, _jid, "Initialized"); // std::ostringstream oss; @@ -150,7 +148,7 @@ JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_, // oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; // oss << " wcache_num_pages=" << wcache_num_pages; // QLS_LOG2(debug, _jid, oss.str()); - jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp, wcache_num_pages, wcache_pgsize_sblks, cbp); + jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp); // QLS_LOG2(debug, _jid, "Initialization complete"); // TODO: replace for linearstore: _lpmgr /* @@ -175,6 +173,7 @@ JournalImpl::recover(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles, const uint32_t jfsize_sblks,*/ + boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp, @@ -210,10 +209,10 @@ JournalImpl::recover(/*const uint16_t num_jfiles, prep_xid_list.push_back(i->xid); } - jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, + jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid); } else { - jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, + jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid); } @@ -559,9 +558,11 @@ JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl) switch (dtokp->wstate()) { case data_tok::ENQ: + std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG dtokp->getSourceMessage()->enqueueComplete(); break; case data_tok::DEQ: + std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=" << dtokp->rid() << std::endl << std::flush; // DEBUG /* Don't need to signal until we have a way to ack completion of dequeue in AMQP dtokp->getSourceMessage()->dequeueComplete(); if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue @@ -607,16 +608,7 @@ JournalImpl::handleIoResult(const iores r) { case qpid::qls_jrnl::RHM_IORES_SUCCESS: return; - case qpid::qls_jrnl::RHM_IORES_ENQCAPTHRESH: - { - std::ostringstream oss; - oss << "Enqueue capacity threshold exceeded."; - QLS_LOG2(warning, _jid, oss.str()); - if (_agent != 0) - _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"), - qpid::management::ManagementAgent::SEV_WARN); - THROW_STORE_FULL_EXCEPTION(oss.str()); - } +/* case qpid::qls_jrnl::RHM_IORES_FULL: { std::ostringstream oss; @@ -626,6 +618,7 @@ JournalImpl::handleIoResult(const iores r) _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR); THROW_STORE_FULL_EXCEPTION(oss.str()); } +*/ default: { std::ostringstream oss; diff --git a/qpid/cpp/src/qpid/linearstore/JournalImpl.h b/qpid/cpp/src/qpid/linearstore/JournalImpl.h index 264162e5bb..4352eda074 100644 --- a/qpid/cpp/src/qpid/linearstore/JournalImpl.h +++ b/qpid/cpp/src/qpid/linearstore/JournalImpl.h @@ -79,30 +79,16 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr typedef boost::function<void (JournalImpl&)> DeleteCallback; private: -// static qpid::sys::Mutex _static_lock; -// static uint32_t cnt; - qpid::sys::Timer& timer; bool getEventsTimerSetFlag; boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr; qpid::sys::Mutex _getf_lock; qpid::sys::Mutex _read_lock; - qpid::qls_jrnl::EmptyFilePool* efpp; - -// uint64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests -// std::vector<uint64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence bool writeActivityFlag; bool flushTriggeredFlag; boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr; - // temp local vars for loadMsgContent below -// void* _xidp; -// void* _datap; -// size_t _dlen; -// qpid::qls_jrnl::data_tok _dtok; -// bool _external; - qpid::management::ManagementAgent* _agent; qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject; DeleteCallback deleteCallback; @@ -112,7 +98,6 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr JournalImpl(qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, -// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* agent, @@ -122,29 +107,18 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void initManagement(qpid::management::ManagementAgent* agent); - void initialize(/*const uint16_t num_jfiles, - const bool auto_expand, - const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ - qpid::qls_jrnl::EmptyFilePool* efp, + void initialize(qpid::qls_jrnl::EmptyFilePool* efp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp); - inline void initialize(/*const uint16_t num_jfiles, - const bool auto_expand, - const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ - qpid::qls_jrnl::EmptyFilePool* efp, + inline void initialize(qpid::qls_jrnl::EmptyFilePool* efpp, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks) { - initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efp, wcache_num_pages, wcache_pgsize_sblks, this); + initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this); } - void recover(/*const uint16_t num_jfiles, - const bool auto_expand, - const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ + void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, qpid::qls_jrnl::aio_callback* const cbp, @@ -152,17 +126,13 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr uint64_t& highest_rid, uint64_t queue_id); - inline void recover(/*const uint16_t num_jfiles, - const bool auto_expand, - const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ + inline void recover(boost::shared_ptr<qpid::qls_jrnl::EmptyFilePoolManager> efpm, const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, uint64_t& highest_rid, uint64_t queue_id) { - recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, - this, prep_tx_list_ptr, highest_rid, queue_id); + recover(efpm, wcache_num_pages, wcache_pgsize_sblks, this, prep_tx_list_ptr, highest_rid, queue_id); } void recover_complete(); @@ -197,10 +167,6 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::qls_jr void stop(bool block_till_aio_cmpl = false); - // Logging -// void log(qpid::qls_jrnl::log_level level, const std::string& log_stmt) const; -// void log(qpid::qls_jrnl::log_level level, const char* const log_stmt) const; - // Overrides for get_events timer qpid::qls_jrnl::iores flush(const bool block_till_aio_cmpl = false); @@ -249,15 +215,15 @@ class TplJournalImpl : public JournalImpl TplJournalImpl(qpid::sys::Timer& timer, const std::string& journalId, const std::string& journalDirectory, -// const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* agent) : - JournalImpl(timer, journalId, journalDirectory/*, journalBaseFilename*/, getEventsTimeout, flushTimeout, agent) + JournalImpl(timer, journalId, journalDirectory, getEventsTimeout, flushTimeout, agent) {} virtual ~TplJournalImpl() {} +/* // Special version of read_data_record that ignores transactions - needed when reading the TPL inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize, bool& transient, bool& external, @@ -265,6 +231,7 @@ class TplJournalImpl : public JournalImpl return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true); } inline void read_reset() { _rmgr.invalidate(); } +*/ }; // class TplJournalImpl } // namespace msgstore diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 5372963bc2..a0835530f7 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -63,7 +63,7 @@ MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_, MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) : defaultEfpPartitionNumber(0), - defaultEfpFileSizeKib(0), + defaultEfpFileSize_kib(0), truncateFlag(false), wCachePgSizeSblks(0), wCacheNumPages(0), @@ -83,7 +83,7 @@ uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const s if (p == 0) { // For zero value, use default - p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; + p = JRNL_WMGR_DEF_PAGE_SIZE_KIB; QLS_LOG(warning, "parameter " << paramName_ << " (" << param_ << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")"); } else if ( p > 128 || (p & (p-1)) ) { // For any positive value that is not a power of 2, use closest value @@ -100,22 +100,22 @@ uint32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const uint32_t param_, const s uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_) { - uint32_t wrPageSizeSblks = wrPageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks - uint32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB). + uint32_t wrPageSizeSblks = wrPageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks + uint32_t defTotWCacheSizeSblks = JRNL_WMGR_DEF_PAGE_SIZE_SBLKS * JRNL_WMGR_DEF_PAGES; switch (wrPageSizeKib_) { case 1: case 2: case 4: // 256 KiB total cache - return defTotWCacheSize / wrPageSizeSblks / 4; + return defTotWCacheSizeSblks / wrPageSizeSblks / 4; case 8: case 16: // 512 KiB total cache - return defTotWCacheSize / wrPageSizeSblks / 2; + return defTotWCacheSizeSblks / wrPageSizeSblks / 2; default: // 32, 64, 128 // 1 MiB total cache - return defTotWCacheSize / wrPageSizeSblks; + return defTotWCacheSizeSblks / wrPageSizeSblks; } } @@ -125,7 +125,7 @@ qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpi return partition_; } -qpid::qls_jrnl::efpFileSizeKib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_, +qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_, const std::string& paramName_) { uint8_t rem = efpFileSizeKib_ % uint64_t(JRNL_SBLK_SIZE_KIB); if (rem != 0) { @@ -154,7 +154,7 @@ void MessageStoreImpl::initManagement () mgmtObject->set_location(storeDir); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); - mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE); + mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE_BYTES); mgmtObject->set_tplWritePages(tplWCacheNumPages); agent->addObject(mgmtObject, 0, true); @@ -172,36 +172,30 @@ bool MessageStoreImpl::init(const qpid::Options* options_) // Extract and check options const StoreOptions* opts = static_cast<const StoreOptions*>(options_); qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition"); - qpid::qls_jrnl::efpFileSizeKib_t efpFilePoolSize = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); + qpid::qls_jrnl::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); // Pass option values to init() - return init(opts->storeDir, efpPartition, efpFilePoolSize, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); + return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); } // These params, taken from options, are assumed to be correct and verified bool MessageStoreImpl::init(const std::string& storeDir_, - /*uint16_t jfiles, - uint32_t jfileSizePgs,*/ qpid::qls_jrnl::efpPartitionNumber_t efpPartition_, - qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_, + qpid::qls_jrnl::efpDataSize_kib_t efpFileSize_kib_, const bool truncateFlag_, uint32_t wCachePageSizeKib_, - /*uint16_t tplJfiles, - uint32_t tplJfileSizePgs,*/ uint32_t tplWCachePageSizeKib_) - /*bool autoJExpand, - uint16_t autoJExpandMaxFiles)*/ { if (isInit) return true; // Set geometry members (converting to correct units where req'd) defaultEfpPartitionNumber = efpPartition_; - defaultEfpFileSizeKib = efpFileSizeKib_; - wCachePgSizeSblks = wCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + defaultEfpFileSize_kib = efpFileSize_kib_; + wCachePgSizeSblks = wCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_); - tplWCachePgSizeSblks = tplWCachePageSizeKib_ * 1024 / JRNL_SBLK_SIZE; // convert from KiB to number sblks + tplWCachePgSizeSblks = tplWCachePageSizeKib_ / JRNL_SBLK_SIZE_KIB; // convert from KiB to number sblks tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_); if (storeDir_.size()>0) storeDir = storeDir_; @@ -212,13 +206,13 @@ bool MessageStoreImpl::init(const std::string& storeDir_, QLS_LOG(notice, "Store module initialized; store-dir=" << storeDir_); QLS_LOG(info, "> Default EFP partition: " << defaultEfpPartitionNumber); - QLS_LOG(info, "> Default EFP file size: " << defaultEfpFileSizeKib << " (KiB)"); + QLS_LOG(info, "> Default EFP file size: " << defaultEfpFileSize_kib << " (KiB)"); QLS_LOG(info, "> Default write cache page size: " << wCachePageSizeKib_ << " (KiB)"); QLS_LOG(info, "> Default number of write cache pages: " << wCacheNumPages); QLS_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib_ << " (KiB)"); QLS_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages); QLS_LOG(info, "> EFP partition: " << defaultEfpPartitionNumber); - QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSizeKib << " (KiB)"); + QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSize_kib << " (KiB)"); return isInit; } @@ -273,7 +267,7 @@ void MessageStoreImpl::init() // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure. // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026 - tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), /*"tpl",*/ defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); + tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); isInit = true; } catch (const DbException& e) { if (e.get_errno() == DB_VERSION_MISMATCH) @@ -350,7 +344,7 @@ void MessageStoreImpl::chkTplStoreInit() qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { qpid::qls_jrnl::jdir::create_dir(getTplBaseDir()); - tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSizeKib), tplWCacheNumPages, tplWCachePgSizeSblks); + tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } } @@ -403,26 +397,13 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, } JournalImpl* jQueue = 0; -// uint16_t localFileCount = numJrnlFiles; -// bool localAutoExpandFlag = autoJrnlExpand; -// uint16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles; -// uint32_t localFileSizeSblks = jrnlFsizeSblks; -// -// value = args.get("qpid.file_count"); -// if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) -// localFileCount = chkJrnlNumFilesParam((uint16_t) value->get<int>(), "qpid.file_count"); -// -// value = args.get("qpid.file_size"); -// if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) -// localFileSizeSblks = chkJrnlFileSizeParam((uint32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; - if (queue_.getName().size() == 0) { QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); return; } - jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_), /*std::string("JournalData"),*/ + jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { @@ -430,32 +411,8 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, journalList[queue_.getName()]=jQueue; } -// value = args.get("qpid.auto_expand"); -// if (value.get() != 0 && !value->empty() && value->convertsTo<bool>()) -// localAutoExpandFlag = (bool) value->get<bool>(); -// -// value = args.get("qpid.auto_expand_max_jfiles"); -// if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) -// localAutoExpandMaxFileCount = (uint16_t) value->get<int>(); -/* - qpid::framing::FieldTable::ValuePtr value; - qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = efpPartition; - value = args_.get("qpid.efp_partition"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); - } - - qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = efpFileSizeKib; - value = args_.get("qpid.efp_file_size"); - if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); - } -*/ - queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); try { - // init will create the deque's for the init... -// jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks); jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks); } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what()); @@ -471,7 +428,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, qpid::qls_jrnl::EmptyFilePool* MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_, - const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib_) { + const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_) { qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_); if (efpp == 0) { std::ostringstream oss; @@ -490,7 +447,7 @@ MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); } - qpid::qls_jrnl::efpFileSizeKib_t localEfpFileSizeKib = defaultEfpFileSizeKib; + qpid::qls_jrnl::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; value = args_.get("qpid.efp_file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); @@ -501,21 +458,19 @@ MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue_) { QLS_LOG(info, "*** MessageStoreImpl::destroy() queue=\"" << queue_.getName() << "\""); -/* checkInit(); - destroy(queueDb, queue); - deleteBindingsForQueue(queue); - qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore(); + destroy(queueDb, queue_); + deleteBindingsForQueue(queue_); + qpid::broker::ExternalQueueStore* eqs = queue_.getExternalQueueStore(); if (eqs) { JournalImpl* jQueue = static_cast<JournalImpl*>(eqs); jQueue->delete_jrnl_files(); - queue.setExternalQueueStore(0); // will delete the journal if exists + queue_.setExternalQueueStore(0); // will delete the journal if exists { qpid::sys::Mutex::ScopedLock sl(journalListLock); - journalList.erase(queue.getName()); + journalList.erase(queue_.getName()); } } -*/ } void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange_, @@ -727,14 +682,13 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) registry_.recoveryComplete(); } -void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/, - qpid::broker::RecoveryManager& /*registry*/, - queue_index& /*queue_index*/, - txn_list& /*prepared*/, - message_index& /*messages*/) +void MessageStoreImpl::recoverQueues(TxnCtxt& txn, + qpid::broker::RecoveryManager& registry, + queue_index& queue_index, + txn_list& prepared, + message_index& messages) { QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()"); -/* Cursor queues; queues.open(queueDb, txn.get()); @@ -757,8 +711,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/, QLS_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); break; } - jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"), - defJournalGetEventsTimeout, defJournalFlushTimeout, agent, + jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName), defJournalGetEventsTimeout, + defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); @@ -771,22 +725,23 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/, long rcnt = 0L; // recovered msg count long idcnt = 0L; // in-doubt msg count uint64_t thisHighestRid = 0ULL; - jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, - wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery - -// // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting -// // from recovery of a store that has had its size changed externally by the resize utility. -// // If so, update the queue store settings so that QMF queries will reflect the new values. -// const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; -// qpid::framing::FieldTable::ValuePtr value; -// value = storeargs.get("qpid.file_count"); -// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) { -// queue->addArgument("qpid.file_count", jQueue->num_jfiles()); -// } -// value = storeargs.get("qpid.file_size"); -// if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { -// queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); -// } + jQueue->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); + + // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting + // from recovery of a store that has had its size changed externally by the resize utility. + // If so, update the queue store settings so that QMF queries will reflect the new values. +/* + const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; + qpid::framing::FieldTable::ValuePtr value; + value = storeargs.get("qpid.file_count"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint16_t)value->get<int>() != jQueue->num_jfiles()) { + queue->addArgument("qpid.file_count", jQueue->num_jfiles()); + } + value = storeargs.get("qpid.file_size"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (uint32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { + queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); + } +*/ if (highestRid == 0ULL) highestRid = thisHighestRid; @@ -810,7 +765,6 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& /*txn*/, QLS_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec); queueIdSequence.reset(maxQueueId + 1); -*/ } @@ -901,16 +855,15 @@ void MessageStoreImpl::recoverGeneral(TxnCtxt& txn_, } void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, - qpid::broker::RecoveryManager& /*recovery*/, - qpid::broker::RecoverableQueue::shared_ptr& queue_, - txn_list& /*prepared*/, - message_index& /*messages*/, - long& /*rcnt*/, - long& /*idcnt*/) + qpid::broker::RecoveryManager& recovery, + qpid::broker::RecoverableQueue::shared_ptr& queue, + txn_list& prepared, + message_index& messages, + long& rcnt, + long& idcnt) { - QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue_->getName() << "\""); -/* - size_t preambleLength = sizeof(uint32_t)header size; + QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue->getName() << "\""); + size_t preambleLength = sizeof(uint32_t)/*header size*/; JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); DataTokenImpl dtok; @@ -1035,7 +988,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what()); } -*/ } qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/, @@ -1079,7 +1031,6 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, void MessageStoreImpl::readTplStore() { QLS_LOG(info, "*** MessageStoreImpl::readTplStore()"); -/* tplRecoverMap.clear(); qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map(); DataTokenImpl dtok; @@ -1148,7 +1099,6 @@ void MessageStoreImpl::readTplStore() } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); } -*/ } void MessageStoreImpl::recoverTplStore() @@ -1171,10 +1121,9 @@ void MessageStoreImpl::recoverTplStore() */ } -void MessageStoreImpl::recoverLockedMappings(txn_list& /*txns*/) +void MessageStoreImpl::recoverLockedMappings(txn_list& txns) { QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()"); -/* if (!tplStorePtr->is_ready()) recoverTplStore(); @@ -1186,7 +1135,6 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& /*txns*/) deq_ptr.reset(new LockedMappings); txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr)); } -*/ } void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/) @@ -1260,55 +1208,46 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) { QLS_LOG(info, "*** MessageStoreImpl::flush() queue=\"" << queue_.getName() << "\""); -/* - if (queue.getExternalQueueStore() == 0) return; + if (queue_.getExternalQueueStore() == 0) return; checkInit(); - std::string qn = queue.getName(); + std::string qn = queue_.getName(); try { - JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); + JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore()); if (jc) { // TODO: check if this result should be used... - mrg::journal::iores res = jc->flush(); + /*mrg::journal::iores res =*/ jc->flush(); } } catch (const qpid::qls_jrnl::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); } -*/ } -void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* /*ctxt*/, +void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt_, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_, - const qpid::broker::PersistableQueue& /*queue*/) + const qpid::broker::PersistableQueue& queue_) { -// QLS_LOG(info, "*** MessageStoreImpl::enqueue() queue=\"" << queue.getName() << "\""); -/* + //QLS_LOG(info, "*** MessageStoreImpl::enqueue() queue=\"" << queue_.getName() << "\""); checkInit(); - uint64_t queueId (queue.getPersistenceId()); - uint64_t messageId (msg->getPersistenceId()); + uint64_t queueId (queue_.getPersistenceId()); if (queueId == 0) { - THROW_STORE_EXCEPTION("Queue not created: " + queue.getName()); + THROW_STORE_EXCEPTION("Queue not created: " + queue_.getName()); } TxnCtxt implicit; TxnCtxt* txn = 0; - if (ctxt) { - txn = check(ctxt); + if (ctxt_) { + txn = check(ctxt_); } else { txn = &implicit; } - bool newId = false; - if (messageId == 0) { - messageId = messageIdSequence.next(); - msg->setPersistenceId(messageId); - newId = true; + if (msg_->getPersistenceId() == 0) { + msg_->setPersistenceId(messageIdSequence.next()); } - store(&queue, txn, msg, newId); + store(&queue_, txn, msg_); // add queue* to the txn map.. - if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); -*/ - msg_->enqueueComplete();// DEBUG: only while null fns in use + if (ctxt_) txn->addXidRecord(queue_.getExternalQueueStore()); } uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff_, @@ -1329,91 +1268,85 @@ uint64_t MessageStoreImpl::msgEncode(std::vector<char>& buff_, } void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_, - TxnCtxt* /*txn*/, - const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*message*/, - bool /*newId*/) + TxnCtxt* txn_, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message_) { - QLS_LOG(info, "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\""); -/* + //QLS_LOG(info, "*** MessageStoreImpl::store() queue=\"" << queue_->getName() << "\""); std::vector<char> buff; - uint64_t size = msgEncode(buff, message); + uint64_t size = msgEncode(buff, message_); try { - if (queue) { + if (queue_) { boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl); dtokp->addRef(); - dtokp->setSourceMessage(message); + dtokp->setSourceMessage(message_); dtokp->set_external_rid(true); - dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id) + dtokp->set_rid(message_->getPersistenceId()); // set the messageID into the Journal header (record-id) - JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); - if (txn->getXid().empty()) { - jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message->isPersistent()); + JournalImpl* jc = static_cast<JournalImpl*>(queue_->getExternalQueueStore()); + if (txn_->getXid().empty()) { + jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent()); } else { - jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn->getXid(), !message->isPersistent()); + jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent()); } } else { THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL.")); } } catch (const qpid::qls_jrnl::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " + + THROW_STORE_EXCEPTION(std::string("Queue ") + queue_->getName() + ": MessageStoreImpl::store() failed: " + e.what()); } -*/ } -void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* /*ctxt*/, +void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt_, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_, - const qpid::broker::PersistableQueue& /*queue*/) + const qpid::broker::PersistableQueue& queue_) { -// QLS_LOG(info, "*** MessageStoreImpl::dequeue() queue=\"" << queue.getName() << "\""); -/* + //QLS_LOG(info, "*** MessageStoreImpl::dequeue() queue=\"" << queue_.getName() << "\""); checkInit(); - uint64_t queueId (queue.getPersistenceId()); - uint64_t messageId (msg->getPersistenceId()); + uint64_t queueId (queue_.getPersistenceId()); + uint64_t messageId (msg_->getPersistenceId()); if (queueId == 0) { - THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)"); + THROW_STORE_EXCEPTION("Queue \"" + queue_.getName() + "\" has null queue Id (has not been created)"); } if (messageId == 0) { - THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\": Dequeuing message with null persistence Id."); + THROW_STORE_EXCEPTION("Queue \"" + queue_.getName() + "\": Dequeuing message with null persistence Id."); } TxnCtxt implicit; TxnCtxt* txn = 0; - if (ctxt) { - txn = check(ctxt); + if (ctxt_) { + txn = check(ctxt_); } else { txn = &implicit; } // add queue* to the txn map.. - if (ctxt) txn->addXidRecord(queue.getExternalQueueStore()); - async_dequeue(ctxt, msg, queue); -*/ + if (ctxt_) txn->addXidRecord(queue_.getExternalQueueStore()); + async_dequeue(ctxt_, msg_, queue_); msg_->dequeueComplete(); } -void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* /*ctxt*/, - const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/, +void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg_, const qpid::broker::PersistableQueue& queue_) { - QLS_LOG(info, "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\""); -/* + //QLS_LOG(info, "*** MessageStoreImpl::async_dequeue() queue=\"" << queue_.getName() << "\""); boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl); - ddtokp->setSourceMessage(msg); + ddtokp->setSourceMessage(msg_); ddtokp->set_external_rid(true); ddtokp->set_rid(messageIdSequence.next()); - ddtokp->set_dequeue_rid(msg->getPersistenceId()); + ddtokp->set_dequeue_rid(msg_->getPersistenceId()); ddtokp->set_wstate(DataTokenImpl::ENQ); std::string tid; - if (ctxt) { - TxnCtxt* txn = check(ctxt); + if (ctxt_) { + TxnCtxt* txn = check(ctxt_); tid = txn->getXid(); } // Manually increase the ref count, as raw pointers are used beyond this point ddtokp->addRef(); try { - JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore()); + JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore()); if (tid.empty()) { jc->dequeue_data_record(ddtokp.get()); } else { @@ -1421,9 +1354,8 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* /*ctxt*/, } } catch (const qpid::qls_jrnl::jexception& e) { ddtokp->release(); - THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what()); + THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": async_dequeue() failed: " + e.what()); } -*/ } uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/) @@ -1661,11 +1593,10 @@ std::string MessageStoreImpl::getTplBaseDir() return dir.str(); } -std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue_) //for exmaple /var/rhm/ + queueDir/ +std::string MessageStoreImpl::getJrnlDir(const std::string& queueName_) { - /*return getJrnlHashDir(queue_.getName().c_str());*/ std::ostringstream oss; - oss << getJrnlBaseDir() << queue_.getName(); + oss << getJrnlBaseDir() << queueName_; return oss.str(); } @@ -1679,8 +1610,8 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) { MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : qpid::Options(name_), truncateFlag(defTruncateFlag), - wCachePageSizeKib(defWCachePageSize), - tplWCachePageSizeKib(defTplWCachePageSize), + wCachePageSizeKib(defWCachePageSizeKib), + tplWCachePageSizeKib(defTplWCachePageSizeKib), efpPartition(defEfpPartition), efpFileSizeKib(defEfpFileSizeKib) { diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 8d88e9da97..0028b809e9 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -101,10 +101,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // Default store settings static const bool defTruncateFlag = false; - static const uint32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_SBLK_SIZE / 1024; - static const uint32_t defTplWCachePageSize = defWCachePageSize / 8; + static const uint32_t defWCachePageSizeKib = JRNL_WMGR_DEF_PAGE_SIZE_KIB; + static const uint32_t defTplWCachePageSizeKib = defWCachePageSizeKib / 8; static const uint16_t defEfpPartition = 1; - static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE / 1024; + static const uint64_t defEfpFileSizeKib = 512 * JRNL_SBLK_SIZE_KIB; static const std::string storeTopLevelDir; static qpid::sys::Duration defJournalGetEventsTimeout; @@ -133,7 +133,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem IdSequence messageIdSequence; std::string storeDir; qpid::qls_jrnl::efpPartitionNumber_t defaultEfpPartitionNumber; - qpid::qls_jrnl::efpFileSizeKib_t defaultEfpFileSizeKib; + qpid::qls_jrnl::efpDataSize_kib_t defaultEfpFileSize_kib; bool truncateFlag; uint32_t wCachePgSizeSblks; uint16_t wCacheNumPages; @@ -156,7 +156,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem static uint16_t getJrnlWrNumPages(const uint32_t wrPageSizeKiB); static qpid::qls_jrnl::efpPartitionNumber_t chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition, const std::string& paramName); - static qpid::qls_jrnl::efpFileSizeKib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKiB, + static qpid::qls_jrnl::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKiB, const std::string& paramName); void init(); @@ -202,8 +202,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem uint64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message); void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn, - const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message, - bool newId); + const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message); void async_dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, const qpid::broker::PersistableQueue& queue); @@ -231,8 +230,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // journal functions void createJrnlQueue(const qpid::broker::PersistableQueue& queue); - std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/ - qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpFileSizeKib_t s); + std::string getJrnlDir(const std::string& queueName); + qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t p, const qpid::qls_jrnl::efpDataSize_kib_t s); qpid::qls_jrnl::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args); std::string getStoreTopLevelDir(); std::string getJrnlBaseDir(); @@ -268,10 +267,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem bool init(const std::string& dir, qpid::qls_jrnl::efpPartitionNumber_t efpPartition = defEfpPartition, - qpid::qls_jrnl::efpFileSizeKib_t efpFileSizeKib = defEfpFileSizeKib, + qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib, const bool truncateFlag = false, - uint32_t wCachePageSize = defWCachePageSize, - uint32_t tplWCachePageSize = defTplWCachePageSize); + uint32_t wCachePageSize = defWCachePageSizeKib, + uint32_t tplWCachePageSize = defTplWCachePageSizeKib); void truncateInit(); @@ -279,6 +278,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem void finalize(); + // --- Implementation of qpid::broker::MessageStore --- + void create(qpid::broker::PersistableQueue& queue, const qpid::framing::FieldTable& args); @@ -344,6 +345,8 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem void abort(qpid::broker::TransactionContext& ctxt); + // --- Implementation of qpid::management::Managable --- + qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h new file mode 100644 index 0000000000..cdae41f944 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_ATOMICCOUNTER_H_ +#define QPID_LINEARSTORE_ATOMICCOUNTER_H_ + +#include "qpid/linearstore/jrnl/slock.h" + +namespace qpid { +namespace qls_jrnl { + +template <class T> +class AtomicCounter +{ +private: + T count; + mutable smutex countMutex; + +public: + AtomicCounter(const T& initValue = T(0)) : count(initValue) {} + + virtual ~AtomicCounter() {} + + T get() const { + slock l(countMutex); + return count; + } + + T increment() { + slock l(countMutex); + return ++count; + } + + T add(const T& a) { + slock l(countMutex); + count += a; + return count; + } + + T addLimit(const T& a, const T& limit, const uint32_t jerr) { + slock l(countMutex); + if (count + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit"); + count += a; + return count; + } + + T decrement() { + slock l(countMutex); + return --count; + } + + T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) { + slock l(countMutex); + if (count < limit + 1) { + throw jexception(jerr, "AtomicCounter", "decrementLimit"); + } + return --count; + } + + T subtract(const T& s) { + slock l(countMutex); + count -= s; + return count; + } + + T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) { + slock l(countMutex); + if (count < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit"); + count -= s; + return count; + } + + bool operator==(const T& o) const { + slock l(countMutex); + return count == o; + } + + bool operator<(const T& o) const { + slock l(countMutex); + return count < o; + } + + bool operator<=(const T& o) const { + slock l(countMutex); + return count <= o; + } + + friend T operator-(const T& a, const AtomicCounter& b) { + slock l(b.countMutex); + return a - b.count; + } + + friend T operator-(const AtomicCounter& a, const T& b) { + slock l(a.countMutex); + return a.count - b; + } + + friend T operator-(const AtomicCounter&a, const AtomicCounter& b) { + slock l1(a.countMutex); + slock l2(b.countMutex); + return a.count - b.count; + } + +/* + friend std::ostream& operator<<(std::ostream& out, const AtomicCounter& a) { + T temp; // Use temp so lock is not held while streaming to out. + { + slock l(a.countMutex); + temp = a.count; + } + out << temp; + return out; + } +*/ +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_ATOMICCOUNTER_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp index 90e04df7ed..8d9cf8ce43 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp @@ -40,7 +40,7 @@ namespace qls_jrnl { EmptyFilePool::EmptyFilePool(const std::string& efpDirectory_, const EmptyFilePoolPartition* partitionPtr_) : efpDirectory(efpDirectory_), - efpFileSizeKib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())), + efpDataSize_kib(fileSizeKbFromDirName(efpDirectory_, partitionPtr_->partitionNumber())), partitionPtr(partitionPtr_) {} @@ -50,7 +50,7 @@ void EmptyFilePool::initialize() { //std::cout << "Reading " << efpDirectory << std::endl; // DEBUG std::vector<std::string> dirList; - jdir::read_dir(efpDirectory, dirList, false, true, false); + jdir::read_dir(efpDirectory, dirList, false, true, false, false); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { size_t dotPos = i->rfind("."); if (dotPos != std::string::npos) { @@ -65,9 +65,24 @@ EmptyFilePool::initialize() { //std::cout << "Found " << emptyFileList.size() << " files" << std::endl; // DEBUG } -efpFileSizeKib_t -EmptyFilePool::fileSizeKib() const { - return efpFileSizeKib; +efpDataSize_kib_t +EmptyFilePool::dataSize_kib() const { + return efpDataSize_kib; +} + +efpFileSize_kib_t +EmptyFilePool::fileSize_kib() const { + return efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB); +} + +efpDataSize_sblks_t +EmptyFilePool::dataSize_sblks() const { + return efpDataSize_kib / JRNL_SBLK_SIZE_KIB; +} + +efpFileSize_sblks_t +EmptyFilePool::fileSize_sblks() const { + return (efpDataSize_kib / JRNL_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS; } efpFileCount_t @@ -76,10 +91,10 @@ EmptyFilePool::numEmptyFiles() const { return efpFileCount_t(emptyFileList.size()); } -efpFileSizeKib_t -EmptyFilePool::cumFileSizeKib() const { +efpDataSize_kib_t +EmptyFilePool::cumFileSize_kib() const { slock l(emptyFileListMutex); - return efpFileSizeKib_t(emptyFileList.size()) * efpFileSizeKib; + return efpDataSize_kib_t(emptyFileList.size()) * efpDataSize_kib; } efpPartitionNumber_t @@ -94,7 +109,7 @@ EmptyFilePool::getPartition() const { const efpIdentity_t EmptyFilePool::getIdentity() const { - return efpIdentity_t(partitionPtr->partitionNumber(), efpFileSizeKib); + return efpIdentity_t(partitionPtr->partitionNumber(), efpDataSize_kib); } std::string @@ -112,9 +127,9 @@ EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { bool EmptyFilePool::returnEmptyFile(const JournalFile* srcFile) { - std::string emptyFileName(efpDirectory + srcFile->fileName()); + std::string emptyFileName(efpDirectory + srcFile->getFileName()); // TODO: reset file here - if (::rename(srcFile->fqFileName().c_str(), emptyFileName.c_str())) { + if (::rename(srcFile->getFqFileName().c_str(), emptyFileName.c_str())) { std::ostringstream oss; oss << "file=\"" << srcFile << "\" dest=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); @@ -152,14 +167,13 @@ EmptyFilePool::popEmptyFile() { void EmptyFilePool::createEmptyFile() { - file_hdr_t fh; - ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDRSIZESBLKS, partitionPtr->partitionNumber(), - efpFileSizeKib); + ::file_hdr_t fh; + ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr->partitionNumber(), efpDataSize_kib); std::string efpfn = getEfpFileName(); std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary); if (ofs.good()) { - ofs.write((char*)&fh, sizeof(file_hdr_t)); - uint64_t rem = ((efpFileSizeKib + (QLS_JRNL_FHDRSIZESBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(file_hdr_t); + ofs.write((char*)&fh, sizeof(::file_hdr_t)); + uint64_t rem = ((efpDataSize_kib + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); while (rem--) ofs.put('\0'); ofs.close(); @@ -180,8 +194,8 @@ EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const { oss << "stat: file=\"" << emptyFileName_ << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile"); } - efpFileSizeKib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpFileSizeKib) * 1024; - if ((efpFileSizeKib_t)s.st_size != expectedSize) { + efpDataSize_kib_t expectedSize = (JRNL_SBLK_SIZE_KIB + efpDataSize_kib) * 1024; + if ((efpDataSize_kib_t)s.st_size != expectedSize) { //std::cout << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size << std::endl; // DEBUG return false; } @@ -194,8 +208,8 @@ EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const { const uint8_t fhFileNameBuffLen = 50; char fhFileNameBuff[fhFileNameBuffLen]; - file_hdr_t fh; - ifs.read((char*)&fh, sizeof(file_hdr_t)); + ::file_hdr_t fh; + ifs.read((char*)&fh, sizeof(::file_hdr_t)); uint16_t fhFileNameLen = fh._queue_name_len > fhFileNameBuffLen ? fhFileNameBuffLen : fh._queue_name_len; ifs.read(fhFileNameBuff, fhFileNameLen); std::string fhFileName(fhFileNameBuff, fhFileNameLen); @@ -204,7 +218,7 @@ EmptyFilePool::validateEmptyFile(const std::string& emptyFileName_) const { if (fh._rhdr._magic != QLS_FILE_MAGIC || fh._rhdr._version != QLS_JRNL_VERSION || fh._efp_partition != partitionPtr->partitionNumber() || - fh._file_size_kib != efpFileSizeKib || + fh._file_size_kib != efpDataSize_kib || !::is_file_hdr_reset(&fh)) { //std::cout << "ERROR: File " << emptyFileName << ": Invalid file header" << std::endl; @@ -227,7 +241,7 @@ EmptyFilePool::getEfpFileName() { // protected // static -efpFileSizeKib_t +efpDataSize_kib_t EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_, const efpPartitionNumber_t partitionNumber_) { // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0. @@ -243,7 +257,7 @@ EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName_, valid = n[charNum] == 'k'; } } - efpFileSizeKib_t s = ::atol(n.c_str()); + efpDataSize_kib_t s = ::atol(n.c_str()); if (!valid || s == 0 || s % JRNL_SBLK_SIZE_KIB != 0) { std::ostringstream oss; oss << "Partition: " << partitionNumber_ << "; EFP directory: \'" << n << "\'"; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h index 1958bb7647..4de12fdbf3 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h @@ -47,7 +47,7 @@ protected: typedef emptyFileList_t::iterator emptyFileListItr_t; const std::string efpDirectory; - const efpFileSizeKib_t efpFileSizeKib; + const efpDataSize_kib_t efpDataSize_kib; const EmptyFilePoolPartition* partitionPtr; private: @@ -60,9 +60,12 @@ public: virtual ~EmptyFilePool(); void initialize(); - efpFileSizeKib_t fileSizeKib() const; + efpDataSize_kib_t dataSize_kib() const; + efpFileSize_kib_t fileSize_kib() const; + efpDataSize_sblks_t dataSize_sblks() const; + efpFileSize_sblks_t fileSize_sblks() const; efpFileCount_t numEmptyFiles() const; - efpFileSizeKib_t cumFileSizeKib() const; + efpDataSize_kib_t cumFileSize_kib() const; efpPartitionNumber_t getPartitionNumber() const; const EmptyFilePoolPartition* getPartition() const; const efpIdentity_t getIdentity() const; @@ -76,8 +79,8 @@ protected: void createEmptyFile(); bool validateEmptyFile(const std::string& emptyFileName_) const; std::string getEfpFileName(); - static efpFileSizeKib_t fileSizeKbFromDirName(const std::string& dirName_, - const efpPartitionNumber_t partitionNumber_); + static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName_, + const efpPartitionNumber_t partitionNumber_); }; }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp index 1bc716f912..dfee91f6b5 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp @@ -49,7 +49,7 @@ void EmptyFilePoolManager::findEfpPartitions() { //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG std::vector<std::string> dirList; - jdir::read_dir(qlsStorePath, dirList, true, false, true); + jdir::read_dir(qlsStorePath, dirList, true, false, true, false); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN efpPartitionNumber_t pn = ::atoi(i->c_str() + 1); @@ -90,15 +90,15 @@ EmptyFilePoolManager::getEfpPartition(const efpPartitionNumber_t partitionNumber void EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, - const efpFileSizeKib_t efpFileSizeKb) const { + const efpDataSize_kib_t efpFileSizeKb) const { slock l(partitionMapMutex); for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { if (efpFileSizeKb == 0) { partitionNumberList.push_back(i->first); } else { - std::vector<efpFileSizeKib_t> efpFileSizeList; + std::vector<efpDataSize_kib_t> efpFileSizeList; i->second->getEmptyFilePoolSizesKb(efpFileSizeList); - for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { + for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { if (*j == efpFileSizeKb) { partitionNumberList.push_back(i->first); break; @@ -110,15 +110,15 @@ EmptyFilePoolManager::getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& void EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, - const efpFileSizeKib_t efpFileSizeKb) { + const efpDataSize_kib_t efpFileSizeKb) { slock l(partitionMapMutex); for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { if (efpFileSizeKb == 0) { partitionList.push_back(i->second); } else { - std::vector<efpFileSizeKib_t> efpFileSizeList; + std::vector<efpDataSize_kib_t> efpFileSizeList; i->second->getEmptyFilePoolSizesKb(efpFileSizeList); - for (std::vector<efpFileSizeKib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { + for (std::vector<efpDataSize_kib_t>::iterator j=efpFileSizeList.begin(); j!=efpFileSizeList.end(); ++j) { if (*j == efpFileSizeKb) { partitionList.push_back(i->second); break; @@ -129,7 +129,7 @@ EmptyFilePoolManager::getEfpPartitions(std::vector<EmptyFilePoolPartition*>& par } void -EmptyFilePoolManager::getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList, +EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber) const { if (efpPartitionNumber == 0) { for (partitionMapConstItr_t i=partitionMap.begin(); i!=partitionMap.end(); ++i) { @@ -160,7 +160,7 @@ EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePo EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, - const efpFileSizeKib_t efpFileSizeKib) { + const efpDataSize_kib_t efpFileSizeKib) { EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber); if (efppp != 0) return efppp->getEmptyFilePool(efpFileSizeKib); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h index e34eb2c0f3..b25f5ac5e5 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h @@ -48,13 +48,13 @@ public: uint16_t getNumEfpPartitions() const; EmptyFilePoolPartition* getEfpPartition(const efpPartitionNumber_t partitionNumber); - void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpFileSizeKib_t efpFileSizeKb = 0) const; - void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpFileSizeKib_t efpFileSizeKb = 0); + void getEfpPartitionNumbers(std::vector<efpPartitionNumber_t>& partitionNumberList, const efpDataSize_kib_t efpFileSizeKb = 0) const; + void getEfpPartitions(std::vector<EmptyFilePoolPartition*>& partitionList, const efpDataSize_kib_t efpFileSizeKb = 0); - void getEfpFileSizes(std::vector<efpFileSizeKib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const; + void getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList, const efpPartitionNumber_t efpPartitionNumber = 0) const; void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, const efpPartitionNumber_t efpPartitionNumber = 0); - EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpFileSizeKib_t efpFileSizeKb); + EmptyFilePool* getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpFileSizeKb); EmptyFilePool* getEmptyFilePool(const efpIdentity_t efpIdentity); }; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp index 6e6bbc4abd..70840d5e8d 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp @@ -63,7 +63,7 @@ void EmptyFilePoolPartition::findEmptyFilePools() { //std::cout << "Reading " << partitionDir << std::endl; // DEBUG std::vector<std::string> dirList; - jdir::read_dir(partitionDir, dirList, true, false, false); + jdir::read_dir(partitionDir, dirList, true, false, false, false); bool foundEfpDir = false; for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { if (i->compare(efpTopLevelDir) == 0) { @@ -75,15 +75,14 @@ EmptyFilePoolPartition::findEmptyFilePools() { std::string efpDir(partitionDir + "/" + efpTopLevelDir); //std::cout << "Reading " << efpDir << std::endl; // DEBUG dirList.clear(); - jdir::read_dir(efpDir, dirList, true, false, false); + jdir::read_dir(efpDir, dirList, true, false, false, true); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { - std::string efpSizeDir(efpDir + "/" + (*i)); EmptyFilePool* efpp = 0; try { - efpp = new EmptyFilePool(efpSizeDir, this); + efpp = new EmptyFilePool(*i, this); { slock l(efpMapMutex); - efpMap[efpp->fileSizeKib()] = efpp; + efpMap[efpp->dataSize_kib()] = efpp; } } catch (const std::exception& e) { @@ -110,7 +109,7 @@ EmptyFilePoolPartition::partitionDirectory() const { } EmptyFilePool* -EmptyFilePoolPartition::getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb) { +EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb) { efpMapItr_t i = efpMap.find(efpFileSizeKb); if (i == efpMap.end()) return 0; @@ -118,7 +117,7 @@ EmptyFilePoolPartition::getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb) { } void -EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const { +EmptyFilePoolPartition::getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const { for (efpMapConstItr_t i=efpMap.begin(); i!=efpMap.end(); ++i) { efpFileSizesKbList.push_back(i->first); } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h index 2013884b38..1172c84ab7 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h @@ -44,7 +44,7 @@ class EmptyFilePoolPartition public: static const std::string efpTopLevelDir; protected: - typedef std::map<efpFileSizeKib_t, EmptyFilePool*> efpMap_t; + typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t; typedef efpMap_t::iterator efpMapItr_t; typedef efpMap_t::const_iterator efpMapConstItr_t; @@ -62,8 +62,8 @@ public: efpPartitionNumber_t partitionNumber() const; std::string partitionDirectory() const; - EmptyFilePool* getEmptyFilePool(const efpFileSizeKib_t efpFileSizeKb); - void getEmptyFilePoolSizesKb(std::vector<efpFileSizeKib_t>& efpFileSizesKbList) const; + EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpFileSizeKb); + void getEmptyFilePoolSizesKb(std::vector<efpDataSize_kib_t>& efpFileSizesKbList) const; void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList); }; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h index de91bdc06a..bdb77d8c8d 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h @@ -23,14 +23,18 @@ #define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_ #include <stdint.h> +#include <utility> // std::pair namespace qpid { namespace qls_jrnl { - typedef uint64_t efpFileSizeKib_t; + typedef uint64_t efpDataSize_kib_t; // Size of data part of file (excluding file header) in kib + typedef uint64_t efpFileSize_kib_t; // Size of file (header + data) in kib + typedef uint32_t efpDataSize_sblks_t; // Size of data part of file (excluding file header) in sblks + typedef uint32_t efpFileSize_sblks_t; // Size of file (header + data) in sblks typedef uint32_t efpFileCount_t; typedef uint16_t efpPartitionNumber_t; - typedef std::pair<efpPartitionNumber_t, efpFileSizeKib_t> efpIdentity_t; + typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t; }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp index 87ecdfb7a8..af953aa4ef 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp @@ -21,34 +21,271 @@ #include "qpid/linearstore/jrnl/JournalFile.h" +#include <fcntl.h> +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/pmgr.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" +#include <unistd.h> + namespace qpid { namespace qls_jrnl { -JournalFile::JournalFile(const std::string& fqFileName_) : - fqfn(fqFileName_) +JournalFile::JournalFile(const std::string& fqFileName_, + const uint64_t fileSeqNum_, + const uint32_t fileSize_kib_) : + fqFileName(fqFileName_), + fileSeqNum(fileSeqNum_), + fileHandle(-1), + fileCloseFlag(false), + fileHeaderBasePtr (0), + fileHeaderPtr(0), + aioControlBlockPtr(0), + fileSizeDblks(((fileSize_kib_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES)) / JRNL_DBLK_SIZE_BYTES), + enqueuedRecordCount(0), + submittedDblkCount(0), + completedDblkCount(0), + outstandingAioOpsCount(0) {} -JournalFile::~JournalFile() {} +JournalFile::~JournalFile() { + finalize(); +} + +void +JournalFile::initialize() { + if (::posix_memalign(&fileHeaderBasePtr, QLS_AIO_ALIGN_BOUNDARY, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024)) + { + std::ostringstream oss; + oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024); + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + } + fileHeaderPtr = (::file_hdr_t*)fileHeaderBasePtr; + aioControlBlockPtr = new aio_cb; +} + +void +JournalFile::finalize() { + if (fileHeaderBasePtr != 0) { + std::free(fileHeaderBasePtr); + fileHeaderBasePtr = 0; + fileHeaderPtr = 0; + } + if (aioControlBlockPtr != 0) { + std::free(aioControlBlockPtr); + aioControlBlockPtr = 0; + } +} const std::string -JournalFile::directory() const { - return fqfn.substr(0, fqfn.rfind('/')); +JournalFile::getDirectory() const { + return fqFileName.substr(0, fqFileName.rfind('/')); } const std::string -JournalFile::fileName() const { - return fqfn.substr(fqfn.rfind('/')); +JournalFile::getFileName() const { + return fqFileName.substr(fqFileName.rfind('/')+1); } const std::string -JournalFile::fqFileName() const { - return fqfn; +JournalFile::getFqFileName() const { + return fqFileName; +} + +uint64_t +JournalFile::getFileSeqNum() const { + return fileSeqNum; +} + +int +JournalFile::open() { + fileHandle = ::open(fqFileName.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r-- + if (fileHandle < 0) { + std::ostringstream oss; + oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JNLF_OPEN, oss.str(), "JournalFile", "open"); + } + return fileHandle; } bool -JournalFile::empty() const { - // TODO: return true if no still-enqueued records (or parts of records) exist in this file - return true; +JournalFile::isOpen() const { + return fileHandle >= 0; +} + +void +JournalFile::close() { + if (fileHandle >= 0) { + if (getOutstandingAioDblks()) { + fileCloseFlag = true; // Close later when all outstanding AIOs have returned + } else { + int res = ::close(fileHandle); + fileHandle = -1; + if (res != 0) { + std::ostringstream oss; + oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JNLF_CLOSE, oss.str(), "JournalFile", "open"); + } + } + } +} + +void +JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr_, + const efpPartitionNumber_t efpPartitionNumber_, + const efpDataSize_kib_t efpDataSize_kib_, + const uint16_t userFlags_, + const uint64_t recordId_, + const uint64_t firstRecordOffset_, + const std::string queueName_) { + ::file_hdr_create(fileHeaderPtr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber_, efpDataSize_kib_); + ::file_hdr_init(fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, userFlags_, recordId_, firstRecordOffset_, fileSeqNum, queueName_.size(), queueName_.data()); + aio::prep_pwrite(aioControlBlockPtr, fileHandle, (void*)fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, 0UL); + if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr) < 0) + throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); + addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS); + incrOutstandingAioOperationCount(); +} + +void +JournalFile::asyncPageWrite(io_context_t ioContextPtr_, + aio_cb* aioControlBlockPtr_, + void* data_, + uint32_t dataSize_dblks_) { + aio::prep_pwrite_2(aioControlBlockPtr_, fileHandle, data_, dataSize_dblks_ * JRNL_DBLK_SIZE_BYTES, submittedDblkCount.get() * JRNL_DBLK_SIZE_BYTES); + pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr_->data); // This page's control block (pcb) + pcbp->_wdblks = dataSize_dblks_; + pcbp->_jfp = this; + if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr_) < 0) + throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); + addSubmittedDblkCount(dataSize_dblks_); + incrOutstandingAioOperationCount(); +} + +uint32_t +JournalFile::getEnqueuedRecordCount() const { + return enqueuedRecordCount.get(); +} + +uint32_t +JournalFile::incrEnqueuedRecordCount() { + return enqueuedRecordCount.increment(); +} + +uint32_t +JournalFile::addEnqueuedRecordCount(const uint32_t a) { + return enqueuedRecordCount.add(a); +} + +uint32_t +JournalFile::decrEnqueuedRecordCount() { + return enqueuedRecordCount.decrementLimit(); +} + +uint32_t +JournalFile::subtrEnqueuedRecordCount(const uint32_t s) { + return enqueuedRecordCount.subtractLimit(s); +} + +uint32_t +JournalFile::getSubmittedDblkCount() const { + return submittedDblkCount.get(); +} + +uint32_t +JournalFile::addSubmittedDblkCount(const uint32_t a) { + return submittedDblkCount.addLimit(a, fileSizeDblks, jerrno::JERR_JNLF_FILEOFFSOVFL); +} + +uint32_t +JournalFile::getCompletedDblkCount() const { + return completedDblkCount.get(); +} + +uint32_t +JournalFile::addCompletedDblkCount(const uint32_t a) { + return completedDblkCount.addLimit(a, submittedDblkCount.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL); +} + +uint16_t JournalFile::getOutstandingAioOperationCount() const { + return outstandingAioOpsCount.get(); +} + +uint16_t JournalFile::incrOutstandingAioOperationCount() { + return outstandingAioOpsCount.increment(); +} + +uint16_t JournalFile::decrOutstandingAioOperationCount() { + uint16_t r = outstandingAioOpsCount.decrementLimit(); + if (fileCloseFlag && outstandingAioOpsCount == 0) { // Delayed close + close(); + } + return r; +} + +bool +JournalFile::isEmpty() const { + return submittedDblkCount == 0; +} + +bool +JournalFile::isDataEmpty() const { + return submittedDblkCount <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS; +} + +u_int32_t +JournalFile::dblksRemaining() const { + return fileSizeDblks - submittedDblkCount; +} + +bool +JournalFile::isFull() const { + return submittedDblkCount == fileSizeDblks; +} + +bool +JournalFile::isFullAndComplete() const { + return completedDblkCount == fileSizeDblks; +} + +u_int32_t +JournalFile::getOutstandingAioDblks() const { + return submittedDblkCount - completedDblkCount; +} + +bool +JournalFile::getNextFile() const { + return isFull(); +} + +bool +JournalFile::isNoEnqueuedRecordsRemaining() const { + return !isDataEmpty() && // Must be written to, not empty + enqueuedRecordCount == 0; // No remaining enqueued records +} + +const std::string +JournalFile::status_str(const uint8_t indentDepth_) const { + std::string indent((size_t)indentDepth_, '.'); + std::ostringstream oss; + oss << indent << "JournalFile: fileName=" << getFileName() << std::endl; + oss << indent << " directory=" << getDirectory() << std::endl; + oss << indent << " fileSizeDblks=" << fileSizeDblks << std::endl; + oss << indent << " open=" << (isOpen() ? "T" : "F") << std::endl; + oss << indent << " fileHandle=" << fileHandle << std::endl; + oss << indent << " enqueuedRecordCount=" << getEnqueuedRecordCount() << std::endl; + oss << indent << " submittedDblkCount=" << getSubmittedDblkCount() << std::endl; + oss << indent << " completedDblkCount=" << getCompletedDblkCount() << std::endl; + oss << indent << " outstandingAioOpsCount=" << getOutstandingAioOperationCount() << std::endl; + oss << indent << " isEmpty()=" << (isEmpty() ? "T" : "F") << std::endl; + oss << indent << " isDataEmpty()=" << (isDataEmpty() ? "T" : "F") << std::endl; + oss << indent << " dblksRemaining()=" << dblksRemaining() << std::endl; + oss << indent << " isFull()=" << (isFull() ? "T" : "F") << std::endl; + oss << indent << " isFullAndComplete()=" << (isFullAndComplete() ? "T" : "F") << std::endl; + oss << indent << " getOutstandingAioDblks()=" << getOutstandingAioDblks() << std::endl; + oss << indent << " getNextFile()=" << (getNextFile() ? "T" : "F") << std::endl; + return oss.str(); } }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h index de587d94e3..302c53619c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h @@ -22,23 +22,90 @@ #ifndef QPID_LINEARSTORE_JOURNALFILE_H_ #define QPID_LINEARSTORE_JOURNALFILE_H_ +#include "qpid/linearstore/jrnl/aio.h" +#include "qpid/linearstore/jrnl/AtomicCounter.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" +#include <stdint.h> #include <string> +class file_hdr_t; + namespace qpid { namespace qls_jrnl { class JournalFile { protected: - const std::string fqfn; + const std::string fqFileName; + const uint64_t fileSeqNum; + int fileHandle; + bool fileCloseFlag; + void* fileHeaderBasePtr; + ::file_hdr_t* fileHeaderPtr; + aio_cb* aioControlBlockPtr; + uint32_t fileSizeDblks; ///< File size in data blocks, including file header + AtomicCounter<uint32_t> enqueuedRecordCount; ///< Count of enqueued records + AtomicCounter<uint32_t> submittedDblkCount; ///< Write file count (data blocks) for submitted AIO + AtomicCounter<uint32_t> completedDblkCount; ///< Write file count (data blocks) for completed AIO + AtomicCounter<uint16_t> outstandingAioOpsCount; ///< Outstanding AIO operations on this file + public: - JournalFile(const std::string& fqFileName_); + JournalFile(const std::string& fqFileName, + const uint64_t fileSeqNum, + const uint32_t fileSize_kib); virtual ~JournalFile(); - const std::string directory() const; - const std::string fileName() const; - const std::string fqFileName() const; - bool empty() const; + void initialize(); + void finalize(); + + const std::string getDirectory() const; + const std::string getFileName() const; + const std::string getFqFileName() const; + uint64_t getFileSeqNum() const; + + int open(); + bool isOpen() const; + void close(); + void asyncFileHeaderWrite(io_context_t ioContextPtr, + const efpPartitionNumber_t efpPartitionNumber, + const efpDataSize_kib_t efpDataSize_kib, + const uint16_t userFlags, + const uint64_t recordId, + const uint64_t firstRecordOffset, + const std::string queueName); + void asyncPageWrite(io_context_t ioContextPtr, + aio_cb* aioControlBlockPtr, + void* data, + uint32_t dataSize_dblks); + + uint32_t getEnqueuedRecordCount() const; + uint32_t incrEnqueuedRecordCount(); + uint32_t addEnqueuedRecordCount(const uint32_t a); + uint32_t decrEnqueuedRecordCount(); + uint32_t subtrEnqueuedRecordCount(const uint32_t s); + + uint32_t getSubmittedDblkCount() const; + uint32_t addSubmittedDblkCount(const uint32_t a); + + uint32_t getCompletedDblkCount() const; + uint32_t addCompletedDblkCount(const uint32_t a); + + uint16_t getOutstandingAioOperationCount() const; + uint16_t incrOutstandingAioOperationCount(); + uint16_t decrOutstandingAioOperationCount(); + + // Status helper functions + bool isEmpty() const; ///< True if no writes of any kind have occurred + bool isDataEmpty() const; ///< True if only file header written, data is still empty + u_int32_t dblksRemaining() const; ///< Dblks remaining until full + bool isFull() const; ///< True if all possible dblks have been submitted (but may not yet have returned from AIO) + bool isFullAndComplete() const; ///< True if all submitted dblks have returned from AIO + u_int32_t getOutstandingAioDblks() const; ///< Dblks still to be written + bool getNextFile() const; ///< True when next file is needed + bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued + + // Debug aid + const std::string status_str(const uint8_t indentDepth) const; }; }} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp deleted file mode 100644 index 6167a641c9..0000000000 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp +++ /dev/null @@ -1,142 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/linearstore/jrnl/JournalFileController.h" - -#include <fstream> -#include "qpid/linearstore/jrnl/EmptyFilePool.h" -#include "qpid/linearstore/jrnl/jcfg.h" -#include "qpid/linearstore/jrnl/JournalFile.h" -#include "qpid/linearstore/jrnl/slock.h" -#include "qpid/linearstore/jrnl/utils/file_hdr.h" - -#include <iostream> // DEBUG - -namespace qpid { -namespace qls_jrnl { - -JournalFileController::JournalFileController(const std::string& dir_, - EmptyFilePool* efpp_) : - dir(dir_), - efpp(efpp_), - fileSeqCounter(0) -{ - //std::cout << "*** JournalFileController::JournalFileController() dir=" << dir << std::endl; -} - -JournalFileController::~JournalFileController() {} - -void -JournalFileController::pullEmptyFileFromEfp(const uint64_t recId_, - const uint64_t firstRecOffs_, - const std::string& queueName_) { - std::string ef = efpp->takeEmptyFile(dir); - //std::cout << "*** JournalFileController::pullEmptyFileFromEfp() qn=" << queueName_ << " ef=" << ef << std::endl; - const JournalFile* jfp = new JournalFile(ef/*efpp->takeEmptyFile(dir)*/); - initialzeFileHeader(jfp->fqFileName(), recId_, firstRecOffs_, getNextFileSeqNum(), queueName_); - { - slock l(journalFileListMutex); - journalFileList.push_back(jfp); - } -} - -void -JournalFileController::purgeFilesToEfp() { - slock l(journalFileListMutex); - while (journalFileList.front()->empty()) { - - efpp->returnEmptyFile(journalFileList.front()); - delete journalFileList.front(); - journalFileList.pop_front(); - } -} - -void -JournalFileController::finalize() {} - -void -JournalFileController::setFileSeqNum(const uint64_t fileSeqNum) { - fileSeqCounter = fileSeqNum; -} - -// protected - -std::string -JournalFileController::readFileHeader(file_hdr_t* fhdr_, - const std::string& fileName_) { - //std::cout << "*** JournalFileController::readFileHeader() fn=" << fileName_ << std::endl; - char buff[JRNL_SBLK_SIZE]; - std::ifstream ifs(fileName_.c_str(), std::ifstream::in | std::ifstream::binary); - if (ifs.good()) { - ifs.read(buff, JRNL_SBLK_SIZE); - ifs.close(); - std::memcpy(fhdr_, buff, sizeof(file_hdr_t)); - return std::string(buff + sizeof(file_hdr_t), fhdr_->_queue_name_len); - } else { - std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for reading." << std::endl; - } - return std::string(""); -} - -void -JournalFileController::writeFileHeader(const file_hdr_t* fhdr_, - const std::string& queueName_, - const std::string& fileName_) { - //std::cout << "*** JournalFileController::writeFileHeader() qn=" << queueName_ << " fn=" << fileName_ << std::endl; - std::fstream fs(fileName_.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary); - if (fs.good()) { - fs.seekp(0); - fs.write((const char*)fhdr_, sizeof(file_hdr_t)); - fs.write(queueName_.data(), fhdr_->_queue_name_len); - fs.close(); - } else { - std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for writing." << std::endl; - } -} - -void -JournalFileController::resetFileHeader(const std::string& fileName_) { - //std::cout << "*** JournalFileController::resetFileHeader() fn=" << fileName_ << std::endl; - file_hdr_t fhdr; - readFileHeader(&fhdr, fileName_); - ::file_hdr_reset(&fhdr); - writeFileHeader(&fhdr, std::string(""), fileName_); -} - -void -JournalFileController::initialzeFileHeader(const std::string& fileName_, - const uint64_t recId_, - const uint64_t firstRecOffs_, - const uint64_t fileSeqNum_, - const std::string& queueName_) { - //std::cout << "*** JournalFileController::initialzeFileHeader() fn=" << fileName_ << " sn=" << fileSeqNum_ << " qn=" << queueName_ << std::endl; - file_hdr_t fhdr; - readFileHeader(&fhdr, fileName_); - ::file_hdr_init(&fhdr, 0, recId_, firstRecOffs_, fileSeqNum_, queueName_.length(), queueName_.data()); - writeFileHeader(&fhdr, queueName_, fileName_); -} - -uint64_t -JournalFileController::getNextFileSeqNum() { - return __sync_add_and_fetch(&fileSeqCounter, 1); // GCC atomic increment, not portable -} - -}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h deleted file mode 100644 index cd23afd959..0000000000 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ -#define QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ - -#include <deque> -#include "qpid/linearstore/jrnl/smutex.h" - -struct file_hdr_t; -namespace qpid { -namespace qls_jrnl { -class EmptyFilePool; -class JournalFile; -//typedef struct file_hdr_t file_hdr_t; - -class JournalFileController -{ -protected: - typedef std::deque<const JournalFile*> JournalFileList_t; - typedef JournalFileList_t::iterator JournalFileListItr_t; - - const std::string dir; - EmptyFilePool* efpp; - uint64_t fileSeqCounter; - JournalFileList_t journalFileList; - smutex journalFileListMutex; - -public: - JournalFileController(const std::string& dir, - EmptyFilePool* efpp); - virtual ~JournalFileController(); - - void pullEmptyFileFromEfp(const uint64_t recId, const uint64_t firstRecOffs, const std::string& queueName); - void purgeFilesToEfp(); - void finalize(); - void setFileSeqNum(const uint64_t fileSeqNum); - -protected: - std::string readFileHeader(file_hdr_t* fhdr, const std::string& fileName); - void writeFileHeader(const file_hdr_t* fhdr, const std::string& queueName, const std::string& fileName); - void resetFileHeader(const std::string& fileName); - void initialzeFileHeader(const std::string& fileName, const uint64_t recId, const uint64_t firstRecOffs, - const uint64_t fileSeqNum, const std::string& queueName); - uint64_t getNextFileSeqNum(); -}; - -}} // namespace qpid::qls_jrnl - -#endif // QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp index 22a6412c21..4cabefec32 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp @@ -38,7 +38,7 @@ void JournalLog::log(log_level_t ll, const char* jid, const char* const log_stmt) const { if (ll > LOG_ERROR) { std::cerr << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl; - } else if (ll > LOG_INFO) { + } else if (ll >= LOG_INFO) { std::cout << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp new file mode 100644 index 0000000000..161e6deb3b --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp @@ -0,0 +1,324 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/LinearFileController.h" + +#include <fstream> +#include "qpid/linearstore/jrnl/EmptyFilePool.h" +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/jcntl.h" +#include "qpid/linearstore/jrnl/JournalFile.h" +#include "qpid/linearstore/jrnl/slock.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" + +#include <iostream> // DEBUG + +namespace qpid { +namespace qls_jrnl { + +LinearFileController::LinearFileController(jcntl& jcntlRef_) : + jcntlRef(jcntlRef_), + emptyFilePoolPtr(0), + currentJournalFilePtr(0), + fileSeqCounter(0), + recordIdCounter(0) +{} + +LinearFileController::~LinearFileController() {} + +void +LinearFileController::initialize(const std::string& journalDirectory_, + EmptyFilePool* emptyFilePoolPtr_) { + journalDirectory.assign(journalDirectory_); + emptyFilePoolPtr = emptyFilePoolPtr_; +} + +void +LinearFileController::finalize() { + while (!journalFileList.empty()) { + delete journalFileList.front(); + journalFileList.pop_front(); + } +} + +void +LinearFileController::pullEmptyFileFromEfp() { + if (currentJournalFilePtr) + currentJournalFilePtr->close(); + std::string ef = emptyFilePoolPtr->takeEmptyFile(journalDirectory); // Moves file from EFP only, returns new file name + std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG + currentJournalFilePtr = new JournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr->dataSize_kib()); + currentJournalFilePtr->initialize(); + { + slock l(journalFileListMutex); + journalFileList.push_back(currentJournalFilePtr); + } + currentJournalFilePtr->open(); +} + +void +LinearFileController::purgeFilesToEfp() { + slock l(journalFileListMutex); + while (journalFileList.front()->isNoEnqueuedRecordsRemaining()) { + emptyFilePoolPtr->returnEmptyFile(journalFileList.front()); + delete journalFileList.front(); + journalFileList.pop_front(); + } +} + +efpDataSize_kib_t +LinearFileController::dataSize_kib() const { + return emptyFilePoolPtr->dataSize_kib(); +} + +efpFileSize_kib_t +LinearFileController::fileSize_kib() const { + return emptyFilePoolPtr->fileSize_kib(); +} + +efpDataSize_sblks_t +LinearFileController::dataSize_sblks() const { + return emptyFilePoolPtr->dataSize_sblks(); +} + +efpFileSize_sblks_t +LinearFileController::fileSize_sblks() const { + return emptyFilePoolPtr->fileSize_sblks(); +} + +uint64_t +LinearFileController::getNextRecordId() { + return recordIdCounter.increment(); +} + +uint32_t +LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { + slock l(journalFileListMutex); + return find(fileSeqNumber)->decrEnqueuedRecordCount(); +} + +uint32_t +LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) { + slock l(journalFileListMutex); + return find(fileSeqNumber)->addCompletedDblkCount(a); +} + +uint16_t +LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) { + slock l(journalFileListMutex); + return find(fileSeqNumber)->decrOutstandingAioOperationCount(); +} + +void +LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr, + const uint16_t userFlags, + const uint64_t recordId, + const uint64_t firstRecordOffset) { + currentJournalFilePtr->asyncFileHeaderWrite(ioContextPtr, + emptyFilePoolPtr->getPartitionNumber(), + emptyFilePoolPtr->dataSize_kib(), + userFlags, + recordId, + firstRecordOffset, + jcntlRef.id()); +} + +void +LinearFileController::asyncPageWrite(io_context_t ioContextPtr, + aio_cb* aioControlBlockPtr, + void* data, + uint32_t dataSize_dblks) { + assertCurrentJournalFileValid("asyncPageWrite"); + currentJournalFilePtr->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks); +} + +uint64_t +LinearFileController::getCurrentFileSeqNum() const { + assertCurrentJournalFileValid("getCurrentFileSeqNum"); + return currentJournalFilePtr->getFileSeqNum(); +} + +uint32_t +LinearFileController::getEnqueuedRecordCount() const { + assertCurrentJournalFileValid("getEnqueuedRecordCount"); + return currentJournalFilePtr->getEnqueuedRecordCount(); +} + +uint32_t +LinearFileController::incrEnqueuedRecordCount() { + assertCurrentJournalFileValid("incrEnqueuedRecordCount"); + return currentJournalFilePtr->incrEnqueuedRecordCount(); +} + +uint32_t +LinearFileController::addEnqueuedRecordCount(const uint32_t a) { + assertCurrentJournalFileValid("addEnqueuedRecordCount"); + return currentJournalFilePtr->addEnqueuedRecordCount(a); +} + +uint32_t +LinearFileController::decrEnqueuedRecordCount() { + assertCurrentJournalFileValid("decrEnqueuedRecordCount"); + return currentJournalFilePtr->decrEnqueuedRecordCount(); +} + +uint32_t +LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) { + assertCurrentJournalFileValid("subtrEnqueuedRecordCount"); + return currentJournalFilePtr->subtrEnqueuedRecordCount(s); +} + +uint32_t +LinearFileController::getWriteSubmittedDblkCount() const { + assertCurrentJournalFileValid("getWriteSubmittedDblkCount"); + return currentJournalFilePtr->getSubmittedDblkCount(); +} + +uint32_t +LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) { + assertCurrentJournalFileValid("addWriteSubmittedDblkCount"); + return currentJournalFilePtr->addSubmittedDblkCount(a); +} + +uint32_t +LinearFileController::getWriteCompletedDblkCount() const { + assertCurrentJournalFileValid("getWriteCompletedDblkCount"); + return currentJournalFilePtr->getCompletedDblkCount(); +} + +uint32_t +LinearFileController::addWriteCompletedDblkCount(const uint32_t a) { + assertCurrentJournalFileValid("addWriteCompletedDblkCount"); + return currentJournalFilePtr->addCompletedDblkCount(a); +} + +uint16_t +LinearFileController::getOutstandingAioOperationCount() const { + assertCurrentJournalFileValid("getOutstandingAioOperationCount"); + return currentJournalFilePtr->getOutstandingAioOperationCount(); +} + +uint16_t +LinearFileController::incrOutstandingAioOperationCount() { + assertCurrentJournalFileValid("incrOutstandingAioOperationCount"); + return currentJournalFilePtr->incrOutstandingAioOperationCount(); +} + +uint16_t +LinearFileController::decrOutstandingAioOperationCount() { + assertCurrentJournalFileValid("decrOutstandingAioOperationCount"); + return currentJournalFilePtr->decrOutstandingAioOperationCount(); +} + +bool +LinearFileController::isEmpty() const { + assertCurrentJournalFileValid("isEmpty"); + return currentJournalFilePtr->isEmpty(); +} + +bool +LinearFileController::isDataEmpty() const { + assertCurrentJournalFileValid("isDataEmpty"); + return currentJournalFilePtr->isDataEmpty(); +} + +u_int32_t +LinearFileController::dblksRemaining() const { + assertCurrentJournalFileValid("dblksRemaining"); + return currentJournalFilePtr->dblksRemaining(); +} + +bool +LinearFileController::isFull() const { + assertCurrentJournalFileValid("isFull"); + return currentJournalFilePtr->isFull(); +} + +bool +LinearFileController::isFullAndComplete() const { + assertCurrentJournalFileValid("isFullAndComplete"); + return currentJournalFilePtr->isFullAndComplete(); +} + +u_int32_t +LinearFileController::getOutstandingAioDblks() const { + assertCurrentJournalFileValid("getOutstandingAioDblks"); + return currentJournalFilePtr->getOutstandingAioDblks(); +} + +bool +LinearFileController::getNextFile() const { + assertCurrentJournalFileValid("getNextFile"); + return currentJournalFilePtr->getNextFile(); +} + +const std::string +LinearFileController::status(const uint8_t indentDepth) const { + std::string indent((size_t)indentDepth, '.'); + std::ostringstream oss; + oss << indent << "LinearFileController: queue=" << jcntlRef.id() << std::endl; + oss << indent << " journalDirectory=" << journalDirectory << std::endl; + oss << indent << " fileSeqCounter=" << fileSeqCounter.get() << std::endl; + oss << indent << " recordIdCounter=" << recordIdCounter.get() << std::endl; + oss << indent << " journalFileList.size=" << journalFileList.size() << std::endl; + if (checkCurrentJournalFileValid()) { + oss << currentJournalFilePtr->status_str(indentDepth+2); + } else { + oss << indent << " <No current journal file>" << std::endl; + } + return oss.str(); +} + +// protected + +bool +LinearFileController::checkCurrentJournalFileValid() const { + return currentJournalFilePtr != 0; +} + +void +LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const { + if (!checkCurrentJournalFileValid()) { + throw jexception(jerrno::JERR__NULL, "LinearFileController", functionName); + } +} + +// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock +JournalFile* +LinearFileController::find(const efpFileCount_t fileSeqNumber) { + if (currentJournalFilePtr != 0 && currentJournalFilePtr->getFileSeqNum() == fileSeqNumber) + return currentJournalFilePtr; + for (JournalFileListItr_t i=journalFileList.begin(); i!=journalFileList.end(); ++i) { + if ((*i)->getFileSeqNum() == fileSeqNumber) { + return *i; + } + } + std::ostringstream oss; + oss << "fileSeqNumber=" << fileSeqNumber; + throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find"); +} + +uint64_t +LinearFileController::getNextFileSeqNum() { + return fileSeqCounter.increment(); +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h new file mode 100644 index 0000000000..ff880a2647 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_LINEARFILECONTROLLER_H_ +#define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_ + +#include <deque> +#include "qpid/linearstore/jrnl/aio.h" +#include "qpid/linearstore/jrnl/AtomicCounter.h" +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" +#include "qpid/linearstore/jrnl/smutex.h" + +struct file_hdr_t; +namespace qpid { +namespace qls_jrnl { +class EmptyFilePool; +class jcntl; +class JournalFile; + +class LinearFileController +{ +protected: + typedef std::deque<JournalFile*> JournalFileList_t; + typedef JournalFileList_t::iterator JournalFileListItr_t; + + jcntl& jcntlRef; + std::string journalDirectory; + EmptyFilePool* emptyFilePoolPtr; + JournalFile* currentJournalFilePtr; + AtomicCounter<uint64_t> fileSeqCounter; + AtomicCounter<uint64_t> recordIdCounter; + + JournalFileList_t journalFileList; + smutex journalFileListMutex; + +public: + LinearFileController(jcntl& jcntlRef_); + virtual ~LinearFileController(); + + void initialize(const std::string& journalDirectory_, EmptyFilePool* emptyFilePoolPtr_); + void finalize(); + + void pullEmptyFileFromEfp(); + void purgeFilesToEfp(); + efpDataSize_kib_t dataSize_kib() const; + efpFileSize_kib_t fileSize_kib() const; + efpDataSize_sblks_t dataSize_sblks() const; + efpFileSize_sblks_t fileSize_sblks() const; + + uint64_t getNextRecordId(); + + // Functions for manipulating counts of non-current JournalFile instances in journalFileList + uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber); + uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a); + uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber); + + // Pass-through functions for JournalFile class + void asyncFileHeaderWrite(io_context_t ioContextPtr, + const uint16_t userFlags, + const uint64_t recordId, + const uint64_t firstRecordOffset); + void asyncPageWrite(io_context_t ioContextPtr, + aio_cb* aioControlBlockPtr, + void* data, + uint32_t dataSize_dblks); + + uint64_t getCurrentFileSeqNum() const; + + uint32_t getEnqueuedRecordCount() const; + uint32_t incrEnqueuedRecordCount(); + uint32_t addEnqueuedRecordCount(const uint32_t a); + uint32_t decrEnqueuedRecordCount(); + uint32_t subtrEnqueuedRecordCount(const uint32_t s); + + uint32_t getWriteSubmittedDblkCount() const; + uint32_t addWriteSubmittedDblkCount(const uint32_t a); + + uint32_t getWriteCompletedDblkCount() const; + uint32_t addWriteCompletedDblkCount(const uint32_t a); + + uint16_t getOutstandingAioOperationCount() const; + uint16_t incrOutstandingAioOperationCount(); + uint16_t decrOutstandingAioOperationCount(); + + bool isEmpty() const; // True if no writes of any kind have occurred + bool isDataEmpty() const; // True if only file header written, data is still empty + u_int32_t dblksRemaining() const; // Dblks remaining until full + bool isFull() const; // True if all possible dblks have been submitted (but may not yet have returned from AIO) + bool isFullAndComplete() const; // True if all submitted dblks have returned from AIO + u_int32_t getOutstandingAioDblks() const; // Dblks still to be written + bool getNextFile() const; // True when next file is needed + + // Debug aid + const std::string status(const uint8_t indentDepth) const; + +protected: + bool checkCurrentJournalFileValid() const; + void assertCurrentJournalFileValid(const char* const functionName) const; + JournalFile* find(const efpFileCount_t fileSeqNumber); // NOT THREAD SAFE - use under external lock + uint64_t getNextFileSeqNum(); +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp new file mode 100644 index 0000000000..2c60d70c93 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/jrnl/RecoveryManager.h" + +#include <iomanip> +#include "qpid/linearstore/jrnl/jcfg.h" +#include <sstream> + +namespace qpid { +namespace qls_jrnl +{ + +RecoveryManager::RecoveryManager() : _journalFileList(), + _fileNumberNameMap(), + _enqueueCountList(), + _journalEmptyFlag(false), + _firstRecordOffset(0), + _endOffset(0), + _highestRecordId(0ULL), + _lastFileFullFlag(false), + _currentRid(0ULL), + _currentFileNumber(0ULL), + _currentFileName(), + _fileSize(0), + _recordStart(0), + _inFileStream(), + _readComplete(false) +{} + +RecoveryManager::~RecoveryManager() {} + +std::string +RecoveryManager::toString(const std::string& jid, + bool compact) { + std::ostringstream oss; + if (compact) { + oss << "Recovery journal analysis (jid=\"" << jid << "\"):"; + oss << " jfl=["; + for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) { + if (i!=_fileNumberNameMap.begin()) oss << " "; + oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1); + } + oss << "] ecl=[ "; + for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) { + if (j != _enqueueCountList.begin()) oss << " "; + oss << *j; + } + oss << " ] empty=" << (_journalEmptyFlag ? "T" : "F"); + oss << " fro=0x" << std::hex << _firstRecordOffset << std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)"; + oss << " eo=0x" << std::hex << _endOffset << std::dec << " (" << (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)"; + oss << " hrid=0x" << std::hex << _highestRecordId << std::dec; + oss << " lffull=" << (_lastFileFullFlag ? "T" : "F"); + } else { + oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl; + oss << " Number of journal files = " << _fileNumberNameMap.size() << std::endl; + oss << " Journal File List:" << std::endl; + for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) { + oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl; + } + oss << " Enqueue Counts: [ " << std::endl; + for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) { + if (j != _enqueueCountList.begin()) oss << ", "; + oss << *j; + } + oss << " ]" << std::endl; + for (unsigned i=0; i<_enqueueCountList.size(); i++) + oss << " File " << std::setw(2) << i << ": " << _enqueueCountList[i] << std::endl; + oss << " Journal empty (_jempty) = " << (_journalEmptyFlag ? "TRUE" : "FALSE") << std::endl; + oss << " First record offset in first fid (_fro) = 0x" << std::hex << _firstRecordOffset << + std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl; + oss << " End offset (_eo) = 0x" << std::hex << _endOffset << std::dec << " (" << + (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl; + oss << " Highest rid (_h_rid) = 0x" << std::hex << _highestRecordId << std::dec << std::endl; + oss << " Last file full (_lffull) = " << (_lastFileFullFlag ? "TRUE" : "FALSE") << std::endl; + oss << " Enqueued records (txn & non-txn):" << std::endl; + } + return oss.str(); +} + +}} // namespace qpid::qls_jrnl diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h new file mode 100644 index 0000000000..452b3d144b --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_ +#define QPID_LINEARSTORE_RECOVERYSTATE_H_ + +#include <fstream> +#include <map> +#include <stdint.h> +#include <vector> + +namespace qpid { +namespace qls_jrnl { + +class RecoveryManager +{ +private: + // Initial journal analysis data + std::vector<std::string> _journalFileList; ///< Journal file list + std::map<uint64_t, std::string> _fileNumberNameMap; ///< File number - name map + std::vector<uint32_t> _enqueueCountList; ///< Number enqueued records found for each file + bool _journalEmptyFlag; ///< Journal data files empty + std::streamoff _firstRecordOffset; ///< First record offset in ffid + std::streamoff _endOffset; ///< End offset (first byte past last record) + uint64_t _highestRecordId; ///< Highest rid found + bool _lastFileFullFlag; ///< Last file is full + + // State for recovery of individual enqueued records + uint64_t _currentRid; + uint64_t _currentFileNumber; + std::string _currentFileName; + std::streamoff _fileSize; + std::streamoff _recordStart; + std::ifstream _inFileStream; + bool _readComplete; + +public: + RecoveryManager(); + virtual ~RecoveryManager(); + + std::string toString(const std::string& jid, + bool compact = true); +}; + +}} // namespace qpid::qls_jrnl + +#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_ diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp index 616dbd6ef5..ab2a763f17 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp @@ -39,10 +39,10 @@ smutex data_tok::_mutex; data_tok::data_tok(): _wstate(NONE), - _rstate(UNREAD), +// _rstate(UNREAD), _dsize(0), _dblks_written(0), - _dblks_read(0), +// _dblks_read(0), _pg_cnt(0), _fid(0), _rid(0), @@ -106,12 +106,15 @@ data_tok::wstate_str(write_state wstate) return "<wstate unknown>"; } +/* const char* data_tok::rstate_str() const { return rstate_str(_rstate); } +*/ +/* const char* data_tok::rstate_str(read_state rstate) { @@ -129,7 +132,9 @@ data_tok::rstate_str(read_state rstate) } return "<rstate unknown>"; } +*/ +/* void data_tok::set_rstate(const read_state rstate) { @@ -143,15 +148,16 @@ data_tok::set_rstate(const read_state rstate) } _rstate = rstate; } +*/ void data_tok::reset() { _wstate = NONE; - _rstate = UNREAD; +// _rstate = UNREAD; _dsize = 0; _dblks_written = 0; - _dblks_read = 0; +// _dblks_read = 0; _pg_cnt = 0; _fid = 0; _rid = 0; @@ -164,7 +170,7 @@ data_tok::status_str() const { std::ostringstream oss; oss << std::hex << std::setfill('0'); - oss << "dtok id=0x" << _icnt << "; ws=" << wstate_str() << "; rs=" << rstate_str(); + oss << "dtok id=0x" << _icnt << "; ws=" << wstate_str()/* << "; rs=" << rstate_str()*/; oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid="; for (unsigned i=0; i<_xid.size(); i++) { @@ -174,8 +180,8 @@ data_tok::status_str() const oss << "/" << std::setw(2) << (int)((char)_xid[i]); } oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F"); - oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written << "; dr=0x" << _dblks_read; - oss << " pc=0x" << _pg_cnt; + oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written/* << "; dr=0x" << _dblks_read*/; + oss << "; pc=0x" << _pg_cnt; return oss.str(); } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h index bdfa276b0f..39dc1c4a81 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h @@ -72,6 +72,7 @@ namespace qls_jrnl COMMITTED }; +/* enum read_state { UNREAD, ///< Data block not read @@ -79,18 +80,19 @@ namespace qls_jrnl SKIP_PART, ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill READ ///< Data block is fully read }; +*/ protected: static smutex _mutex; static uint64_t _cnt; uint64_t _icnt; write_state _wstate; ///< Enqueued / dequeued state of data - read_state _rstate; ///< Read state of data +// read_state _rstate; ///< Read state of data std::size_t _dsize; ///< Data size in bytes uint32_t _dblks_written; ///< Data blocks read/written - uint32_t _dblks_read; ///< Data blocks read/written +// uint32_t _dblks_read; ///< Data blocks read/written uint32_t _pg_cnt; ///< Page counter - incr for each page containing part of data - uint16_t _fid; ///< FID containing header of enqueue record + uint64_t _fid; ///< FID containing header of enqueue record uint64_t _rid; ///< RID of data set by enqueue operation std::string _xid; ///< XID set by enqueue operation uint64_t _dequeue_rid; ///< RID of data set by dequeue operation @@ -104,16 +106,16 @@ namespace qls_jrnl inline write_state wstate() const { return _wstate; } const char* wstate_str() const; static const char* wstate_str(write_state wstate); - inline read_state rstate() const { return _rstate; } - const char* rstate_str() const; - static const char* rstate_str(read_state rstate); +// inline read_state rstate() const { return _rstate; } +// const char* rstate_str() const; +// static const char* rstate_str(read_state rstate); inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; } inline bool is_enqueued() const { return _wstate == ENQ; } inline bool is_readable() const { return _wstate == ENQ; } - inline bool is_read() const { return _rstate == READ; } +// inline bool is_read() const { return _rstate == READ; } inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; } inline void set_wstate(const write_state wstate) { _wstate = wstate; } - void set_rstate(const read_state rstate); +// void set_rstate(const read_state rstate); inline std::size_t dsize() const { return _dsize; } inline void set_dsize(std::size_t dsize) { _dsize = dsize; } @@ -122,16 +124,16 @@ namespace qls_jrnl { _dblks_written += dblks_written; } inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; } - inline uint32_t dblocks_read() const { return _dblks_read; } - inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; } - inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; } +// inline uint32_t dblocks_read() const { return _dblks_read; } +// inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; } +// inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; } inline uint32_t pg_cnt() const { return _pg_cnt; } inline uint32_t incr_pg_cnt() { return ++_pg_cnt; } inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; } - inline uint16_t fid() const { return _fid; } - inline void set_fid(const uint16_t fid) { _fid = fid; } + inline uint64_t fid() const { return _fid; } + inline void set_fid(const uint64_t fid) { _fid = fid; } inline uint64_t rid() const { return _rid; } inline void set_rid(const uint64_t rid) { _rid = rid; } inline uint64_t dequeue_rid() const {return _dequeue_rid; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp index ee4bd65853..e32452b304 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp @@ -110,8 +110,8 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) if (_xidp == 0) assert(_deq_hdr._xidsize == 0); - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; - std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE; + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; + std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES; std::size_t wr_cnt = 0; if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages) { @@ -162,8 +162,8 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize); wr_cnt += wsize; #ifdef RHM_CLEAN - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; - std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE; + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; + std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES; std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } @@ -206,7 +206,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) wr_cnt += sizeof(_deq_tail); } #ifdef RHM_CLEAN - std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE; + std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES; std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } @@ -226,7 +226,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize); const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize + sizeof(rec_tail_t)); - const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; + const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks) { @@ -259,7 +259,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs; std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); rd_cnt += xid_rem; - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem); @@ -269,7 +269,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ else { // Remainder of xid split - const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE); + const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES); std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size); rd_cnt += xid_cp_size; } @@ -309,7 +309,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ // Entire header and xid fit within this page, tail split std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize); rd_cnt += _deq_hdr._xidsize; - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem); @@ -319,7 +319,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ else { // Header fits within this page, xid split - const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size); rd_cnt += xid_cp_size; } @@ -381,7 +381,7 @@ deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) return false; } } - ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size()); + ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size()); if (_deq_hdr._xidsize) chk_tail(); // Throws if tail invalid or record incomplete assert(!ifsp->fail() && !ifsp->bad()); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp index e3ba09d392..bc2040ef95 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp @@ -41,42 +41,33 @@ int16_t enq_map::EMAP_FALSE = 0; int16_t enq_map::EMAP_TRUE = 1; enq_map::enq_map(): - _map(), - _pfid_enq_cnt() -{} + _map(){} enq_map::~enq_map() {} -void -enq_map::set_num_jfiles(const uint16_t num_jfiles) -{ - _pfid_enq_cnt.resize(num_jfiles, 0); -} - -int16_t -enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid) +short +enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn) { - return insert_pfid(rid, pfid, false); + return insert_pfid(rid, pfid, file_posn, false); } -int16_t -enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const bool locked) +short +enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked) { std::pair<emap_itr, bool> ret; - emap_data_struct rec(pfid, locked); + emap_data_struct_t rec(pfid, file_posn, locked); { slock s(_mutex); ret = _map.insert(emap_param(rid, rec)); } if (ret.second == false) return EMAP_DUP_RID; - _pfid_enq_cnt.at(pfid)++; return EMAP_OK; } -int16_t -enq_map::get_pfid(const uint64_t rid) +short +enq_map::get_pfid(const uint64_t rid, int16_t& pfid) { slock s(_mutex); emap_itr itr = _map.find(rid); @@ -84,11 +75,12 @@ enq_map::get_pfid(const uint64_t rid) return EMAP_RID_NOT_FOUND; if (itr->second._lock) return EMAP_LOCKED; - return itr->second._pfid; + pfid = itr->second._pfid; + return EMAP_OK; } -int16_t -enq_map::get_remove_pfid(const uint64_t rid, const bool txn_flag) +short +enq_map::get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag) { slock s(_mutex); emap_itr itr = _map.find(rid); @@ -96,10 +88,33 @@ enq_map::get_remove_pfid(const uint64_t rid, const bool txn_flag) return EMAP_RID_NOT_FOUND; if (itr->second._lock && !txn_flag) // locked, but not a commit/abort return EMAP_LOCKED; - uint16_t pfid = itr->second._pfid; + pfid = itr->second._pfid; _map.erase(itr); - _pfid_enq_cnt.at(pfid)--; - return pfid; + return EMAP_OK; +} + +short +enq_map::get_file_posn(const uint64_t rid, std::streampos& file_posn) { + slock s(_mutex); + emap_itr itr = _map.find(rid); + if (itr == _map.end()) // not found in map + return EMAP_RID_NOT_FOUND; + if (itr->second._lock) + return EMAP_LOCKED; + file_posn = itr->second._file_posn; + return EMAP_OK; +} + +short +enq_map::get_data(const uint64_t rid, emap_data_struct_t& eds) { + slock s(_mutex); + emap_itr itr = _map.find(rid); + if (itr == _map.end()) // not found in map + return EMAP_RID_NOT_FOUND; + eds._pfid = itr->second._pfid; + eds._file_posn = itr->second._file_posn; + eds._lock = itr->second._lock; + return EMAP_OK; } bool @@ -114,7 +129,7 @@ enq_map::is_enqueued(const uint64_t rid, bool ignore_lock) return true; } -int16_t +short enq_map::lock(const uint64_t rid) { slock s(_mutex); @@ -125,7 +140,7 @@ enq_map::lock(const uint64_t rid) return EMAP_OK; } -int16_t +short enq_map::unlock(const uint64_t rid) { slock s(_mutex); @@ -136,7 +151,7 @@ enq_map::unlock(const uint64_t rid) return EMAP_OK; } -int16_t +short enq_map::is_locked(const uint64_t rid) { slock s(_mutex); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h index cb9c039dfb..d3be5f4b56 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h @@ -64,44 +64,46 @@ namespace qls_jrnl { public: // return/error codes - static int16_t EMAP_DUP_RID; - static int16_t EMAP_LOCKED; - static int16_t EMAP_RID_NOT_FOUND; - static int16_t EMAP_OK; - static int16_t EMAP_FALSE; - static int16_t EMAP_TRUE; + static short EMAP_DUP_RID; + static short EMAP_LOCKED; + static short EMAP_RID_NOT_FOUND; + static short EMAP_OK; + static short EMAP_FALSE; + static short EMAP_TRUE; - private: - - struct emap_data_struct - { - uint16_t _pfid; - bool _lock; - emap_data_struct(const uint16_t pfid, const bool lock) : _pfid(pfid), _lock(lock) {} - }; - typedef std::pair<uint64_t, emap_data_struct> emap_param; - typedef std::map<uint64_t, emap_data_struct> emap; + typedef struct emap_data_struct_t { + uint16_t _pfid; + std::streampos _file_posn; + bool _lock; + emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {} + emap_data_struct_t(const uint16_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {} + } emqp_data_struct_t; + typedef std::pair<uint64_t, emap_data_struct_t> emap_param; + typedef std::map<uint64_t, emap_data_struct_t> emap; typedef emap::iterator emap_itr; + private: emap _map; smutex _mutex; - std::vector<uint32_t> _pfid_enq_cnt; +// std::vector<uint32_t> _pfid_enq_cnt; public: enq_map(); virtual ~enq_map(); - void set_num_jfiles(const uint16_t num_jfiles); - inline uint32_t get_enq_cnt(const uint16_t pfid) const { return _pfid_enq_cnt.at(pfid); }; +// void set_num_jfiles(const uint16_t num_jfiles); +// inline uint32_t get_enq_cnt(const uint16_t pfid) const { return _pfid_enq_cnt.at(pfid); }; - int16_t insert_pfid(const uint64_t rid, const uint16_t pfid); // 0=ok; -3=duplicate rid; - int16_t insert_pfid(const uint64_t rid, const uint16_t pfid, const bool locked); // 0=ok; -3=duplicate rid; - int16_t get_pfid(const uint64_t rid); // >=0=pfid; -1=rid not found; -2=locked - int16_t get_remove_pfid(const uint64_t rid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked + short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid; + short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid; + short get_pfid(const uint64_t rid, int16_t& pfid); // >=0=pfid; -1=rid not found; -2=locked + short get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked + short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked + short get_data(const uint64_t rid, emap_data_struct_t& eds); bool is_enqueued(const uint64_t rid, bool ignore_lock = false); - int16_t lock(const uint64_t rid); // 0=ok; -1=rid not found - int16_t unlock(const uint64_t rid); // 0=ok; -1=rid not found - int16_t is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found + short lock(const uint64_t rid); // 0=ok; -1=rid not found + short unlock(const uint64_t rid); // 0=ok; -1=rid not found + short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found inline void clear() { _map.clear(); } inline bool empty() const { return _map.empty(); } inline uint32_t size() const { return uint32_t(_map.size()); } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp index 8e6fb5cc31..c94e552da9 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp @@ -109,8 +109,8 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) if (_xidp == 0) assert(_enq_hdr._xidsize == 0); - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; - std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE; + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; + std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES; std::size_t wr_cnt = 0; if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages) { @@ -182,8 +182,8 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize); wr_cnt += wsize; #ifdef RHM_CLEAN - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; - std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE; + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; + std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES; std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } @@ -238,7 +238,7 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail)); wr_cnt += sizeof(_enq_tail); #ifdef RHM_CLEAN - std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE; + std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES; std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } @@ -260,7 +260,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t); const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size); const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size); - const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; + const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; const std::size_t offs = rec_offs - sizeof(enq_hdr_t); if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks) @@ -331,7 +331,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ std::memcpy((char*)_buff + offs, rptr, data_rem); rd_cnt += data_rem; } - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem); @@ -341,7 +341,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ else { // Since xid and data are contiguous, both fit within current page - copy whole page - const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE); + const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES); std::memcpy((char*)_buff + offs, rptr, data_cp_size); rd_cnt += data_cp_size; } @@ -405,7 +405,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ _enq_hdr._dsize); rd_cnt += _enq_hdr._dsize; } - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem); @@ -422,7 +422,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ } if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr)) { - const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size); rd_cnt += data_cp_size; } @@ -430,7 +430,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ else { // Header fits within this page, xid split or separated - const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size); rd_cnt += data_cp_size; } @@ -516,7 +516,7 @@ enq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) return false; } } - ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size()); + ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size()); chk_tail(); // Throws if tail invalid or record incomplete assert(!ifsp->fail() && !ifsp->bad()); return true; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h index 31fa4e6ba3..824eaa90f4 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/enums.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/enums.h @@ -29,7 +29,7 @@ namespace qls_jrnl // TODO: Change this to flags, as multiple of these conditions may exist simultaneously /** - * \brief Enumeration of possilbe return states from journal read and write operations. + * \brief Enumeration of possible return states from journal read and write operations. */ enum _iores { @@ -37,12 +37,12 @@ namespace qls_jrnl RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO. RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO. RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read. - RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or uninitialized) - RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached. - RHM_IORES_FULL, ///< During write operations, the journal files are full. +// RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or uninitialized) +// RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached. +// RHM_IORES_FULL, ///< During write operations, the journal files are full. RHM_IORES_BUSY, ///< Another blocking operation is in progress. RHM_IORES_TXPENDING, ///< Operation blocked by pending transaction. - RHM_IORES_NOTIMPL ///< Function is not yet implemented. + RHM_IORES_NOTIMPL ///< Function is not implemented. }; typedef _iores iores; @@ -54,9 +54,9 @@ namespace qls_jrnl case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT"; case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT"; case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY"; - case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID"; - case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH"; - case RHM_IORES_FULL: return "RHM_IORES_FULL"; +// case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID"; +// case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH"; +// case RHM_IORES_FULL: return "RHM_IORES_FULL"; case RHM_IORES_BUSY: return "RHM_IORES_BUSY"; case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING"; case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL"; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h index 110fee1334..c947ff46db 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h @@ -22,70 +22,46 @@ #ifndef QPID_LEGACYSTORE_JRNL_JCFG_H #define QPID_LEGACYSTORE_JRNL_JCFG_H -/* -#if defined(__i386__) little endian, 32 bits -#define JRNL_LITTLE_ENDIAN -#define JRNL_32_BIT -#elif defined(__PPC__) || defined(__s390__) big endian, 32 bits -#define JRNL_BIG_ENDIAN -#define JRNL_32_BIT -#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) little endian, 64 bits -#define JRNL_LITTLE_ENDIAN -#define JRNL_64_BIT -#elif defined(__powerpc64__) || defined(__s390x__) big endian, 64 bits -#define JRNL_BIG_ENDIAN -#define JRNL_64_BIT -#else -#error endian? -#endif -*/ - /** -* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that +* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE_BYTES) MUST be a power of 2 AND +* a power of 2 factor of the disk softblock size (JRNL_SBLK_SIZE_BYTES): * <pre> -* JRNL_DBLK_SIZE * JRNL_SBLK_SIZE == n * 512 (n = 1,2,3...) +* n * JRNL_DBLK_SIZE_BYTES == JRNL_SBLK_SIZE_BYTES (n = 1,2,4,8...) * </pre> -* (The disk softblock size is 512 for Linux kernels >= 2.6) */ -#define JRNL_DBLK_SIZE 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */ -#define JRNL_SBLK_SIZE_DBLKS 32 /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */ -#define JRNL_SBLK_SIZE JRNL_SBLK_SIZE_DBLKS * JRNL_DBLK_SIZE /**< Disk softblock size in bytes */ -#define JRNL_SBLK_SIZE_KIB JRNL_SBLK_SIZE / 1024 /**< Disk softblock size in KiB */ +#define JRNL_SBLK_SIZE_BYTES 4096 /**< Disk softblock size in bytes */ +#define QLS_AIO_ALIGN_BOUNDARY JRNL_SBLK_SIZE_BYTES +#define JRNL_DBLK_SIZE_BYTES 128 /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */ +#define JRNL_SBLK_SIZE_DBLKS (JRNL_SBLK_SIZE_BYTES / JRNL_DBLK_SIZE_BYTES) /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */ +#define JRNL_SBLK_SIZE_KIB (JRNL_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */ //#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr) //#define JRNL_MAX_FILE_SIZE 4194176 ///< Max. jrnl file size in sblks (excl. file_hdr) //#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files //#define JRNL_MAX_NUM_FILES 64 ///< Max. number of journal files //#define JRNL_ENQ_THRESHOLD 80 ///< Percent full when enqueue connection will be closed // -#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks -#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr +//#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks +//#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr // -#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default) -#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default) +#define JRNL_WMGR_DEF_PAGE_SIZE_KIB 32 +#define JRNL_WMGR_DEF_PAGE_SIZE_SBLKS (JRNL_WMGR_DEF_PAGE_SIZE_KIB / JRNL_SBLK_SIZE_KIB) ///< Journal write page size in softblocks (default) +#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default) // -#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr -#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO +#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr +#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO // //#define JRNL_INFO_EXTENSION "jinf" ///< Extension for journal info files //#define JRNL_DATA_EXTENSION "jdat" ///< Extension for journal data files -#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */ -//#define RHM_JDAT_TXA_MAGIC 0x614d4852 ///< ("RHMa" in little endian) Magic for dtx abort hdrs -#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */ -//#define RHM_JDAT_TXC_MAGIC 0x634d4852 ///< ("RHMc" in little endian) Magic for dtx commit hdrs -#define QLS_TXC_MAGIC 0x63534c51 ///< ("RHMc" in little endian) Magic for dtx commit hdrs -//#define RHM_JDAT_DEQ_MAGIC 0x644d4852 ///< ("RHMd" in little endian) Magic for deq rec hdrs -#define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */ -//#define RHM_JDAT_ENQ_MAGIC 0x654d4852 ///< ("RHMe" in little endian) Magic for enq rec hdrs -#define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */ -//#define RHM_JDAT_FILE_MAGIC 0x664d4852 ///< ("RHMf" in little endian) Magic for file hdrs -#define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */ -//#define RHM_JDAT_EMPTY_MAGIC 0x784d4852 ///< ("RHMx" in little endian) Magic for empty dblk -#define QLS_EMPTY_MAGIC 0x78534c51 /**< ("QLSx" in little endian) Magic for empty dblk */ -//#define RHM_JDAT_VERSION 0x01 ///< Version (of file layout) -#define QLS_JRNL_VERSION 0x0002 /**< Version (of file layout) */ -#define QLS_JRNL_FHDRSIZESBLKS 0x0001 /**< Journal file header size in sblks (as defined by JRNL_SBLK_SIZE) */ -//#define RHM_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk -#define QLS_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk +#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */ +#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */ +#define QLS_TXC_MAGIC 0x63534c51 /**< ("RHMc" in little endian) Magic for dtx commit hdrs */ +#define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */ +#define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */ +#define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */ +#define QLS_EMPTY_MAGIC 0x78534c51 /**< ("QLSx" in little endian) Magic for empty dblk */ +#define QLS_JRNL_VERSION 2 /**< Version (of file layout) */ +#define QLS_JRNL_FHDR_RES_SIZE_SBLKS 1 /**< Journal file header reserved size in sblks (as defined by JRNL_SBLK_SIZE_BYTES) */ +#define QLS_CLEAN_CHAR 0xff /**< Char used to clear empty space on disk */ // //#define RHM_LENDIAN_FLAG 0 ///< Value of little endian flag on disk //#define RHM_BENDIAN_FLAG 1 ///< Value of big endian flag on disk diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp index 126d3f7da3..280d09f886 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp @@ -30,10 +30,12 @@ #include <iomanip> #include <iostream> #include <qpid/linearstore/jrnl/EmptyFilePool.h> +#include <qpid/linearstore/jrnl/EmptyFilePoolManager.h> //#include "qpid/linearstore/jrnl/file_hdr.h" #include "qpid/linearstore/jrnl/jerrno.h" //#include "qpid/linearstore/jrnl/jinf.h" -#include "qpid/linearstore/jrnl/JournalFileController.h" +//#include "qpid/linearstore/jrnl/JournalFileController.h" +#include "qpid/linearstore/jrnl/utils/enq_hdr.h" #include <limits> #include <sstream> #include <unistd.h> @@ -72,15 +74,16 @@ jcntl::jcntl(const std::string& jid, const std::string& jdir/*, const std::strin _stop_flag(false), _readonly_flag(false), // _autostop(true), - _jfcp(0), + _linearFileController(*this), + _emptyFilePoolPtr(0), // _jfsize_sblks(0), // _lpmgr(), _emap(), _tmap(), // _rrfc(&_lpmgr), // _wrfc(&_lpmgr), - _rmgr(this, _emap, _tmap/*, _rrfc*/), - _wmgr(this, _emap, _tmap/*, _wrfc*/), +// _rmgr(this, _emap, _tmap/*, _rrfc*/), + _wmgr(this, _emap, _tmap, _linearFileController/*, _wrfc*/), _rcvdat() {} @@ -90,11 +93,7 @@ jcntl::~jcntl() try { stop(true); } catch (const jexception& e) { std::cerr << e << std::endl; } // _lpmgr.finalize(); - if (_jfcp) { - _jfcp->finalize(); - delete _jfcp; - _jfcp = 0; - } + _linearFileController.finalize(); } void @@ -109,11 +108,7 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_ _emap.clear(); _tmap.clear(); - if (_jfcp) { - _jfcp->finalize(); - delete _jfcp; - _jfcp = 0; - } + _linearFileController.finalize(); // _lpmgr.finalize(); @@ -129,14 +124,15 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_ // Clear any existing journal files _jdir.clear_dir(); -// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); +// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files - _jfcp = new JournalFileController(_jdir.dirname(), efpp); - _jfcp->pullEmptyFileFromEfp(1, 4096, _jid); + _linearFileController.initialize(_jdir.dirname(), efpp); + _linearFileController.pullEmptyFileFromEfp(); + std::cout << _linearFileController.status(2); // _wrfc.initialize(_jfsize_sblks); // _rrfc.initialize(); // _rrfc.set_findex(0); - _rmgr.initialize(cbp); +// _rmgr.initialize(cbp); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS); // Write info file (<basename>.jinf) to disk @@ -146,11 +142,12 @@ jcntl::initialize(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_ } void -jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, -// const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const std::vector<std::string>* prep_txn_list_ptr, - aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, - uint64_t& highest_rid) +jcntl::recover(EmptyFilePoolManager* efpm, + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, + aio_callback* const cbp, + const std::vector<std::string>* prep_txn_list_ptr, + uint64_t& highest_rid) { _init_flag = false; _stop_flag = false; @@ -159,6 +156,8 @@ jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max _emap.clear(); _tmap.clear(); + _linearFileController.finalize(); + // _lpmgr.finalize(); // assert(num_jfiles >= JRNL_MIN_NUM_FILES); @@ -171,18 +170,19 @@ jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max _jdir.verify_dir(); // _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/); - rcvr_janalyze(_rcvdat, prep_txn_list_ptr); + rcvr_janalyze(prep_txn_list_ptr, efpm); highest_rid = _rcvdat._h_rid; - if (_rcvdat._jfull) - throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover"); - this->log(LOG_DEBUG, _jid, _rcvdat.to_log(_jid)); +// if (_rcvdat._jfull) +// throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover"); + this->log(/*LOG_DEBUG*/LOG_INFO, _jid, _rcvdat.to_log(_jid)); // _lpmgr.recover(_rcvdat, this, &new_fcntl); + _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr); // _wrfc.initialize(_jfsize_sblks, &_rcvdat); // _rrfc.initialize(); // _rrfc.set_findex(_rcvdat.ffid()); - _rmgr.initialize(cbp); +// _rmgr.initialize(cbp); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, (_rcvdat._lffull ? 0 : _rcvdat._eo)); @@ -200,7 +200,7 @@ jcntl::recover_complete() // _wrfc.initialize(_jfsize_sblks, &_rcvdat); // _rrfc.initialize(); // _rrfc.set_findex(_rcvdat.ffid()); - _rmgr.recover_complete(); +// _rmgr.recover_complete(); _readonly_flag = false; } @@ -208,7 +208,7 @@ void jcntl::delete_jrnl_files() { stop(true); // wait for AIO to complete - _jfcp->purgeFilesToEfp(); + _linearFileController.purgeFilesToEfp(); _jdir.delete_dir(); } @@ -287,8 +287,55 @@ jcntl::discard_data_record(data_tok* const dtokp) iores jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize, - bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns) + bool& transient, bool& external, data_tok* const dtokp, bool /*ignore_pending_txns*/) { + if (!dtokp->is_readable()) { + std::ostringstream oss; + oss << std::hex << std::setfill('0'); + oss << "dtok_id=0x" << std::setw(8) << dtokp->id(); + oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid(); + oss << "; dtok_wstate=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record"); + } + std::vector<uint64_t> ridl; + _emap.rid_list(ridl); + enq_map::emap_data_struct_t eds; + for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) { + short res = _emap.get_data(*i, eds); + if (res == enq_map::EMAP_OK) { + std::ifstream ifs(_rcvdat._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary); + if (!ifs.good()) { + std::ostringstream oss; + oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn; + throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "read_data_record"); + } + ifs.seekg(eds._file_posn, std::ifstream::beg); + ::enq_hdr_t eh; + ifs.read((char*)&eh, sizeof(::enq_hdr_t)); + if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) { + std::ostringstream oss; + oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn; + throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record"); + } + dsize = eh._dsize; + xidsize = eh._xidsize; + transient = ::is_enq_transient(&eh); + external = ::is_enq_external(&eh); + if (xidsize) { + *xidpp = ::malloc(xidsize); + ifs.read((char*)(*xidpp), xidsize); + } else { + *xidpp = 0; + } + if (dsize) { + *datapp = ::malloc(dsize); + ifs.read((char*)(*datapp), dsize); + } else { + *datapp = 0; + } + } + } +/* check_rstatus("read_data"); iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns); if (res == RHM_IORES_RCINVALID) @@ -302,6 +349,8 @@ jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xi res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns); } return res; +*/ + return RHM_IORES_SUCCESS; } iores @@ -370,11 +419,13 @@ jcntl::get_wr_events(timespec* const timeout) return res; } +/* int32_t jcntl::get_rd_events(timespec* const timeout) { return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout); } +*/ void jcntl::stop(const bool block_till_aio_cmpl) @@ -386,26 +437,13 @@ jcntl::stop(const bool block_till_aio_cmpl) _stop_flag = true; if (!_readonly_flag) flush(block_till_aio_cmpl); -// _rrfc.finalize(); -// _lpmgr.finalize(); + _linearFileController.finalize(); } -/* -uint16_t -jcntl::get_earliest_fid() -{ - uint16_t ffid = _wrfc.earliest_index(); - uint16_t fid = _wrfc.index(); - while ( _emap.get_enq_cnt(ffid) == 0 && _tmap.get_txn_pfid_cnt(ffid) == 0 && ffid != fid) - { - if (++ffid >= _lpmgr.num_jfiles()) - ffid = 0; - } - if (!_rrfc.is_active()) - _rrfc.set_findex(ffid); - return ffid; +LinearFileController& +jcntl::getLinearFileControllerRef() { + return _linearFileController; } -*/ iores jcntl::flush(const bool block_till_aio_cmpl) @@ -496,22 +534,6 @@ jcntl::check_rstatus(const char* fn_name) const throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name); } -/* -void -jcntl::write_infofile() const -{ - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - { - std::ostringstream oss; - oss << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR__RTCLOCK, oss.str(), "jcntl", "write_infofile"); - } - jinf ji(_jid, _jdir.dirname(), _base_filename, _lpmgr.num_jfiles(), _lpmgr.is_ae(), _lpmgr.ae_max_jfiles(), - _jfsize_sblks, _wmgr.cache_pgsize_sblks(), _wmgr.cache_num_pages(), ts); - ji.write(); -} -*/ void jcntl::aio_cmpl_wait() @@ -530,6 +552,7 @@ jcntl::aio_cmpl_wait() } } + bool jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp) { @@ -569,64 +592,71 @@ jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp) return false; } -void -jcntl::rcvr_janalyze(rcvdat& /*rd*/, const std::vector<std::string>* /*prep_txn_list_ptr*/) -{ -/* - jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true); - // If the number of files does not tie up with the jinf file from the journal being recovered, - // use the jinf data. - if (rd._njf != ji.num_jfiles()) - { - std::ostringstream oss; - oss << "Recovery found " << ji.num_jfiles() << - " files (different from --num-jfiles value of " << rd._njf << ")."; - this->log(LOG_INFO, oss.str()); - rd._njf = ji.num_jfiles(); - _rcvdat._enq_cnt_list.resize(rd._njf); - } - _emap.set_num_jfiles(rd._njf); - _tmap.set_num_jfiles(rd._njf); - if (_jfsize_sblks != ji.jfsize_sblks()) - { +// static +void +jcntl::rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName) { + const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024; + char buffer[headerBlockSize]; + std::ifstream ifs(jfn.c_str(), std::ifstream::in | std::ifstream::binary); + if (!ifs.good()) { std::ostringstream oss; - oss << "Recovery found file size = " << (ji.jfsize_sblks() / JRNL_RMGR_PAGE_SIZE) << - " (different from --jfile-size-pgs value of " << - (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) << ")."; - this->log(LOG_INFO, oss.str()); - _jfsize_sblks = ji.jfsize_sblks(); + oss << "File=" << jfn; + throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "rcvr_read_jfile"); } - if (_jdir.dirname().compare(ji.jdir())) - { + ifs.read(buffer, headerBlockSize); + if (!ifs) { + std::streamsize s = ifs.gcount(); + ifs.close(); std::ostringstream oss; - oss << "Journal file location change: original = \"" << ji.jdir() << - "\"; current = \"" << _jdir.dirname() << "\""; - this->log(LOG_WARN, oss.str()); - ji.set_jdir(_jdir.dirname()); + oss << "File=" << jfn << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s; + throw jexception(jerrno::JERR_JCNTL_READ, oss.str(), "jcntl", "rcvr_read_jfile"); } + ifs.close(); + ::memcpy(fh, buffer, sizeof(::file_hdr_t)); + queueName.assign(buffer + sizeof(::file_hdr_t), fh->_queue_name_len); +} - try - { - rd._ffid = ji.get_first_pfid(); - rd._lfid = ji.get_last_pfid(); - rd._owi = ji.get_initial_owi(); - rd._frot = ji.get_frot(); - rd._jempty = false; - ji.get_normalized_pfid_list(rd._fid_list); // _pfid_list + +void jcntl::rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp) { + std::string headerQueueName; + ::file_hdr_t fh; + efpIdentity_t efpid; +// std::map<uint64_t, std::string> fileMap; + std::vector<std::string> dirList; + jdir::read_dir(_jdir.dirname(), dirList, false, true, false, true); + for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { + rcvr_read_jfile(*i, &fh, headerQueueName); + if (headerQueueName.compare(_jid) != 0) { + std::ostringstream oss; + oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; + log(LOG_WARN, _jid, oss.str()); + } else { + _rcvdat._fm[fh._file_number] = *i; + efpid.first = fh._efp_partition; + efpid.second = fh._file_size_kib; + } } - catch (const jexception& e) - { - if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY) throw; + _rcvdat._jfl.clear(); + for (std::map<uint64_t, std::string>::iterator i=_rcvdat._fm.begin(); i!=_rcvdat._fm.end(); ++i) { + _rcvdat._jfl.push_back(i->second); } + _rcvdat._enq_cnt_list.resize(_rcvdat._jfl.size(), 0); + _emptyFilePoolPtr = efpmp->getEmptyFilePool(efpid); +} + + +void jcntl::rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp) { + // Analyze file headers of existing journal files + rcvr_analyze_fhdrs(efpmp); // Restore all read and write pointers and transactions - if (!rd._jempty) + if (!_rcvdat._jempty) { - uint16_t fid = rd._ffid; + uint16_t fid = 0; std::ifstream ifs; - bool lowi = rd._owi; // local copy of owi to be used during analysis - while (rcvr_get_next_record(fid, &ifs, lowi, rd)) ; + //bool lowi = rd._owi; // local copy of owi to be used during analysis + while (rcvr_get_next_record(fid, &ifs)) ; if (ifs.is_open()) ifs.close(); // Remove all txns from tmap that are not in the prepared list @@ -645,7 +675,7 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, const std::vector<std::string>* /*prep_txn_ for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) { if (i->_enq_flag) // enq op - decrement enqueue count - rd._enq_cnt_list[i->_pfid]--; + _rcvdat._enq_cnt_list[i->_pfid]--; else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record { int16_t ret = _emap.unlock(i->_drid); @@ -662,18 +692,14 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, const std::vector<std::string>* /*prep_txn_ } } - // Check for file full condition - add one to _jfsize_sblks to account for file header - rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE; - - // Check for journal full condition - uint16_t next_wr_fid = (rd._lfid + 1) % rd._njf; - rd._jfull = rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid] && rd._lffull; + // Check for file full condition + _rcvdat._lffull = _rcvdat._eo == _emptyFilePoolPtr->fileSize_kib() * 1024; } -*/ } + bool -jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd) +jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp) { std::size_t cum_size_read = 0; void* xidp = 0; @@ -685,7 +711,7 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, { if (!ifsp->is_open()) { - if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, true)) + if (!jfile_cycle(fid, ifsp, true)) return false; } file_pos = ifsp->tellg(); @@ -694,7 +720,7 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, hdr_ok = true; else { - if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, true)) + if (!jfile_cycle(fid, ifsp, true)) return false; } } @@ -703,13 +729,14 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, { case QLS_ENQ_MAGIC: { + std::cout << " e" << std::flush; enq_rec er; uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary - if (!decode(er, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos)) + if (!decode(er, fid, ifsp, cum_size_read, h, file_pos)) return false; if (!er.is_transient()) // Ignore transient msgs { - rd._enq_cnt_list[start_fid]++; + _rcvdat._enq_cnt_list[start_fid]++; if (er.xid_size()) { er.get_xid(&xidp); @@ -726,7 +753,7 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, } else { - if (_emap.insert_pfid(h._rid, start_fid) < enq_map::EMAP_OK) // fail + if (_emap.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) // fail { // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; @@ -739,9 +766,10 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, break; case QLS_DEQ_MAGIC: { + std::cout << " d" << std::flush; deq_rec dr; uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary - if (!decode(dr, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos)) + if (!decode(dr, fid, ifsp, cum_size_read, h, file_pos)) return false; if (dr.xid_size()) { @@ -762,16 +790,17 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, } else { - int16_t enq_fid = _emap.get_remove_pfid(dr.deq_rid(), true); - if (enq_fid >= enq_map::EMAP_OK) // ignore not found error - rd._enq_cnt_list[enq_fid]--; + int16_t enq_fid; + if (_emap.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) // ignore not found error + _rcvdat._enq_cnt_list[enq_fid]--; } } break; case QLS_TXA_MAGIC: { + std::cout << " a" << std::flush; txn_rec ar; - if (!decode(ar, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos)) + if (!decode(ar, fid, ifsp, cum_size_read, h, file_pos)) return false; // Delete this txn from tmap, unlock any locked records in emap ar.get_xid(&xidp); @@ -781,7 +810,7 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->_enq_flag) - rd._enq_cnt_list[itr->_pfid]--; + _rcvdat._enq_cnt_list[itr->_pfid]--; else _emap.unlock(itr->_drid); // ignore not found error } @@ -790,8 +819,9 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, break; case QLS_TXC_MAGIC: { + std::cout << " t" << std::flush; txn_rec cr; - if (!decode(cr, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos)) + if (!decode(cr, fid, ifsp, cum_size_read, h, file_pos)) return false; // Delete this txn from tmap, process records into emap cr.get_xid(&xidp); @@ -802,7 +832,7 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, { if (itr->_enq_flag) // txn enqueue { - if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail + if (_emap.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) // fail { // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; @@ -812,9 +842,9 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, } else // txn dequeue { - int16_t enq_fid = _emap.get_remove_pfid(itr->_drid, true); - if (enq_fid >= enq_map::EMAP_OK) - rd._enq_cnt_list[enq_fid]--; + int16_t enq_fid; + if (_emap.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error + _rcvdat._enq_cnt_list[enq_fid]--; } } std::free(xidp); @@ -822,32 +852,40 @@ jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, break; case QLS_EMPTY_MAGIC: { + std::cout << " x" << std::flush; uint32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr_t)); - ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(rec_hdr_t)); + ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE_BYTES - sizeof(rec_hdr_t)); assert(!ifsp->fail() && !ifsp->bad()); - if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, false)) + if (!jfile_cycle(fid, ifsp, false)) return false; } break; case 0: - check_journal_alignment(fid, file_pos, rd); + std::cout << " 0" << std::endl << std::flush; + check_journal_alignment(fid, file_pos); return false; default: + std::cout << " ?" << std::endl << std::flush; // Stop as this is the overwrite boundary. - check_journal_alignment(fid, file_pos, rd); + check_journal_alignment(fid, file_pos); return false; } return true; } + bool jcntl::decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read, - rec_hdr_t& h/*, bool& lowi*/, rcvdat& rd, std::streampos& file_offs) + rec_hdr_t& h, std::streampos& file_offs) { uint16_t start_fid = fid; std::streampos start_file_offs = file_offs; -// if (!check_owi(fid, h, lowi, rd, file_offs)) -// return false; + + if (_rcvdat._h_rid == 0) + _rcvdat._h_rid = h._rid; + else if (h._rid - _rcvdat._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit + _rcvdat._h_rid = h._rid; + bool done = false; while (!done) { @@ -860,64 +898,60 @@ jcntl::decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_si // fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw; // Tried this, but did not work // if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw; - check_journal_alignment(start_fid, start_file_offs, rd); + check_journal_alignment(start_fid, start_file_offs); // rd._lfid = start_fid; return false; } - if (!done && !jfile_cycle(fid, ifsp/*, lowi*/, rd, false)) + if (!done && !jfile_cycle(fid, ifsp, /*lowi, rd,*/ false)) { - check_journal_alignment(start_fid, start_file_offs, rd); + check_journal_alignment(start_fid, start_file_offs); return false; } } return true; } + bool -jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd, const bool jump_fro) +jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp, const bool jump_fro) { if (ifsp->is_open()) { if (ifsp->eof() || !ifsp->good()) { ifsp->clear(); - rd._eo = ifsp->tellg(); // remember file offset before closing - assert(rd._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1 + _rcvdat._eo = ifsp->tellg(); // remember file offset before closing + assert(_rcvdat._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1 ifsp->close(); - if (++fid >= rd._njf) - { - fid = 0; -// lowi = !lowi; // Flip local owi - } - if (fid == rd._ffid) // used up all journal files + if (++fid == _rcvdat._jfl.size()) // used up all known journal files return false; } } if (!ifsp->is_open()) { - std::ostringstream oss; - oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO - linear journal name - oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION; ifsp->clear(); // clear eof flag, req'd for older versions of c++ - ifsp->open(oss.str().c_str(), std::ios_base::in | std::ios_base::binary); + ifsp->open(_rcvdat._jfl[fid].c_str(), std::ios_base::in | std::ios_base::binary); if (!ifsp->good()) - throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "jfile_cycle"); + throw jexception(jerrno::JERR__FILEIO, _rcvdat._jfl[fid], "jcntl", "jfile_cycle"); // Read file header + std::cout << " F" << fid << std::flush; file_hdr_t fhdr; ifsp->read((char*)&fhdr, sizeof(fhdr)); assert(ifsp->good()); if (fhdr._rhdr._magic == QLS_FILE_MAGIC) { -// assert(fhdr._lfid == fid); - if (!rd._fro) - rd._fro = fhdr._fro; - std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE; + if (!_rcvdat._fro) + _rcvdat._fro = fhdr._fro; + std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE_BYTES; ifsp->seekg(foffs); } else { ifsp->close(); + if (fid == 0) { + _rcvdat._jempty = true; + } return false; } } @@ -925,46 +959,17 @@ jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& r } -/* -bool -jcntl::check_owi(const uint16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& file_pos) -{ - if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed - { - uint16_t expected_fid = rd._ffid ? rd._ffid - 1 : rd._njf - 1; - if (fid == expected_fid) - { - check_journal_alignment(fid, file_pos, rd); - return false; - } - std::ostringstream oss; - oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic; - oss << " fid=0x" << std::setw(4) << fid << " rid=0x" << std::setw(8) << h._rid; - oss << " foffs=0x" << std::setw(8) << file_pos; - oss << " expected_fid=0x" << std::setw(4) << expected_fid; - throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl", - "check_owi"); - } - if (rd._h_rid == 0) - rd._h_rid = h._rid; - else if (h._rid - rd._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit - rd._h_rid = h._rid; - return true; -} -*/ - - void -jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcvdat& rd) +jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos/*, rcvdat& rd*/) { - unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE; + unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE_BYTES; if (sblk_offs) { { std::ostringstream oss; oss << std::hex << "Bad record alignment found at fid=0x" << fid; oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec; - oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required."; + oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE_BYTES)) << " filler record(s) required."; this->log(LOG_WARN, _jid, oss.str()); } const uint32_t xmagic = QLS_EMPTY_MAGIC; @@ -976,17 +981,17 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv if (!ofsp.good()) throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment"); ofsp.seekp(file_pos); - void* buff = std::malloc(JRNL_DBLK_SIZE); + void* buff = std::malloc(JRNL_DBLK_SIZE_BYTES); assert(buff != 0); std::memcpy(buff, (const void*)&xmagic, sizeof(xmagic)); // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover // situation (i.e. performance is not an issue), and it makes the location of the write // clear should inspection of the file be required. - std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic)); + std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic)); - while (file_pos % JRNL_SBLK_SIZE) + while (file_pos % JRNL_SBLK_SIZE_BYTES) { - ofsp.write((const char*)buff, JRNL_DBLK_SIZE); + ofsp.write((const char*)buff, JRNL_DBLK_SIZE_BYTES); assert(!ofsp.fail()); std::ostringstream oss; oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos; @@ -995,12 +1000,9 @@ jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcv } ofsp.close(); std::free(buff); - rd._lfid = fid; -// if (!rd._frot) -// rd._ffid = (fid + 1) % rd._njf; this->log(LOG_INFO, _jid, "Bad record alignment fixed."); } - rd._eo = file_pos; + _rcvdat._eo = file_pos; } }} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h index 8a2c178f67..d875ab55c6 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h @@ -19,8 +19,8 @@ * */ -#ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H -#define QPID_LEGACYSTORE_JRNL_JCNTL_H +#ifndef QPID_LINEARSTORE_JRNL_JCNTL_H +#define QPID_LINEARSTORE_JRNL_JCNTL_H namespace qpid { @@ -31,6 +31,7 @@ namespace qls_jrnl #include <cstddef> #include <deque> +#include <qpid/linearstore/jrnl/LinearFileController.h> #include <qpid/linearstore/jrnl/JournalLog.h> #include "qpid/linearstore/jrnl/jdir.h" //#include "qpid/linearstore/jrnl/fcntl.h" @@ -46,8 +47,8 @@ namespace qpid { namespace qls_jrnl { -class EmptyFilePool; -class JournalFileController; + class EmptyFilePool; + class EmptyFilePoolManager; /** * \brief Access and control interface for the journal. This is the top-level class for the @@ -127,17 +128,14 @@ class JournalFileController; //bool _autostop; ///< Autostop flag - stops journal when overrun occurs // Journal control structures - JournalFileController* _jfcp;///< Journal File Controller - //uint32_t _jfsize_sblks; ///< Journal file size in sblks - //lpmgr _lpmgr; ///< LFID-PFID manager tracks inserted journal files - enq_map _emap; ///< Enqueue map for low water mark management - txn_map _tmap; ///< Transaction map open transactions - //rrfc _rrfc; ///< Read journal rotating file controller - //wrfc _wrfc; ///< Write journal rotating file controller - rmgr _rmgr; ///< Read page manager which manages AIO - wmgr _wmgr; ///< Write page manager which manages AIO - rcvdat _rcvdat; ///< Recovery data used for recovery - smutex _wr_mutex; ///< Mutex for journal writes + LinearFileController _linearFileController; ///< Linear File Controller + EmptyFilePool* _emptyFilePoolPtr; ///< Pointer to Empty File Pool for this queue + enq_map _emap; ///< Enqueue map for low water mark management + txn_map _tmap; ///< Transaction map open transactions + //rmgr _rmgr; ///< Read page manager which manages AIO + wmgr _wmgr; ///< Write page manager which manages AIO + rcvdat _rcvdat; ///< Recovery data used for recovery + smutex _wr_mutex; ///< Mutex for journal writes public: static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns @@ -230,9 +228,12 @@ class JournalFileController; * * \exception TODO */ - void recover(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles, - const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks, - aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, uint64_t& highest_rid); + void recover(EmptyFilePoolManager* efpm, + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, + aio_callback* const cbp, + const std::vector<std::string>* prep_txn_list_ptr, + uint64_t& highest_rid); /** * \brief Notification to the journal that recovery is complete and that normal operation @@ -532,7 +533,7 @@ class JournalFileController; * operations, but if these operations cease, then this call needs to be made to force the * processing of any outstanding AIO operations. */ - int32_t get_rd_events(timespec* const timeout); +// int32_t get_rd_events(timespec* const timeout); /** * \brief Stop the journal from accepting any further requests to read or write data. @@ -554,11 +555,11 @@ class JournalFileController; */ iores flush(const bool block_till_aio_cmpl = false); - inline uint32_t get_enq_cnt() const { return _emap.size(); } + inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: Thread safe? inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); } - inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); } +// inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); } inline uint32_t get_wr_outstanding_aio_dblks() const; /*{ return _wrfc.aio_outstanding_dblks(); }*/ @@ -575,6 +576,7 @@ class JournalFileController; // inline uint16_t get_rd_fid() const { return _rrfc.index(); } // inline uint16_t get_wr_fid() const { return _wrfc.index(); } // uint16_t get_earliest_fid(); + LinearFileController& getLinearFileControllerRef(); /** * \brief Check if a particular rid is enqueued. Note that this function will return @@ -692,22 +694,25 @@ class JournalFileController; /** * \brief Analyze journal for recovery. */ - void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr); + static void rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName); - bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd); + void rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp); + + void rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp); + + bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi, rcvdat& rd*/); bool decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read, - rec_hdr_t& h/*, bool& lowi*/, rcvdat& rd, std::streampos& rec_offset); + rec_hdr_t& h, /*bool& lowi, rcvdat& rd,*/ std::streampos& rec_offset); - bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd, - const bool jump_fro); + bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp, /*bool& lowi, rcvdat& rd,*/ const bool jump_fro); //bool check_owi(const uint16_t fid, rec_hdr_t& h, bool& lowi, rcvdat& rd, // std::streampos& read_pos); - void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset, rcvdat& rd); + void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset/*, rcvdat& rd*/); }; }} -#endif // ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H +#endif // ifndef QPID_LINEARSTORE_JRNL_JCNTL_H diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp index 89bce926e0..977c75e37a 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp @@ -409,7 +409,7 @@ jdir::exists(const std::string& name) } void -jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links) { +jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn) { struct stat s; if (is_dir(name)) { DIR* dir = ::opendir(name.c_str()); @@ -425,8 +425,13 @@ jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, cons oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir"); } - if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) - dir_list.push_back(entry->d_name); + if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) { + if (return_fqfn) { + dir_list.push_back(name + "/" + entry->d_name); + } else { + dir_list.push_back(entry->d_name); + } + } } } } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h index 53519b1f2c..01a08c57fc 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h @@ -336,7 +336,7 @@ namespace qls_jrnl */ static bool exists(const std::string& name); - static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links); + static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn); /** * \brief Stream operator diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp index ecd5469d8b..8793a882f7 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp @@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__TIMEOUT = 0x0107; const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108; const uint32_t jerrno::JERR__RECNFOUND = 0x0109; const uint32_t jerrno::JERR__NOTIMPL = 0x010a; +const uint32_t jerrno::JERR__NULL = 0x010b; // class jcntl const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200; @@ -49,8 +50,11 @@ const uint32_t jerrno::JERR_JCNTL_READONLY = 0x0201; const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202; const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203; const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204; -const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205; -const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH = 0x0206; +const uint32_t jerrno::JERR_JCNTL_OPENRD = 0x0205; +const uint32_t jerrno::JERR_JCNTL_READ = 0x0206; +const uint32_t jerrno::JERR_JCNTL_ENQSTATE = 0x0207; +const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR = 0x0208; + // class jdir const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300; @@ -65,21 +69,14 @@ const uint32_t jerrno::JERR_JDIR_STAT = 0x0308; const uint32_t jerrno::JERR_JDIR_UNLINK = 0x0309; const uint32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a; -// class fcntl -//const uint32_t jerrno::JERR_FCNTL_OPENWR = 0x0400; -//const uint32_t jerrno::JERR_FCNTL_WRITE = 0x0401; -//const uint32_t jerrno::JERR_FCNTL_CLOSE = 0x0402; -//const uint32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0403; -//const uint32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0404; -//const uint32_t jerrno::JERR_FCNTL_RDOFFSOVFL = 0x0405; - -// class lfmgr -//const uint32_t jerrno::JERR_LFMGR_BADAEFNUMLIM = 0x0500; -//const uint32_t jerrno::JERR_LFMGR_AEFNUMLIMIT = 0x0501; -//const uint32_t jerrno::JERR_LFMGR_AEDISABLED = 0x0502; +// class JournalFile +const uint32_t jerrno::JERR_JNLF_OPEN = 0x0400; +const uint32_t jerrno::JERR_JNLF_CLOSE = 0x0401; +const uint32_t jerrno::JERR_JNLF_FILEOFFSOVFL = 0x0402; +const uint32_t jerrno::JERR_JNLF_CMPLOFFSOVFL = 0x0403; -// class rrfc -//const uint32_t jerrno::JERR_RRFC_OPENRD = 0x0600; +// class LinearFileController +const uint32_t jerrno::JERR_LFCR_SEQNUMNOTFOUND = 0x0500; // class jrec, enq_rec, deq_rec, txn_rec const uint32_t jerrno::JERR_JREC_BADRECHDR = 0x0700; @@ -91,13 +88,14 @@ const uint32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802; const uint32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803; const uint32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804; const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805; +const uint32_t jerrno::JERR_WMGR_BADFH = 0x0806; -// class rmgr -const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900; -const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901; -//const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902; -const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903; -const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904; +//// class rmgr +//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900; +//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901; +////const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902; +//const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903; +//const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904; // class data_tok const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00; @@ -108,19 +106,6 @@ const uint32_t jerrno::JERR_MAP_DUPLICATE = 0x0b00; const uint32_t jerrno::JERR_MAP_NOTFOUND = 0x0b01; const uint32_t jerrno::JERR_MAP_LOCKED = 0x0b02; -// class jinf -//const uint32_t jerrno::JERR_JINF_CVALIDFAIL = 0x0c00; -//const uint32_t jerrno::JERR_JINF_NOVALUESTR = 0x0c01; -//const uint32_t jerrno::JERR_JINF_BADVALUESTR = 0x0c02; -//const uint32_t jerrno::JERR_JINF_JDATEMPTY = 0x0c03; -//const uint32_t jerrno::JERR_JINF_TOOMANYFILES = 0x0c04; -//const uint32_t jerrno::JERR_JINF_INVALIDFHDR = 0x0c05; -//const uint32_t jerrno::JERR_JINF_STAT = 0x0c06; -//const uint32_t jerrno::JERR_JINF_NOTREGFILE = 0x0c07; -//const uint32_t jerrno::JERR_JINF_BADFILESIZE = 0x0c08; -//const uint32_t jerrno::JERR_JINF_OWIBAD = 0x0c09; -//const uint32_t jerrno::JERR_JINF_ZEROLENFILE = 0x0c0a; - // EFP errors const uint32_t jerrno::JERR_EFP_BADPARTITIONNAME = 0x0d01; const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02; @@ -150,6 +135,7 @@ jerrno::__init() _err_map[JERR__UNEXPRESPONSE] = "JERR__UNEXPRESPONSE: Unexpected response to call or event."; _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found."; _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented"; + _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer"; // class jcntl _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal."; @@ -157,8 +143,10 @@ jerrno::__init() _err_map[JERR_JCNTL_AIOCMPLWAIT] = "JERR_JCNTL_AIOCMPLWAIT: Timeout waiting for AIOs to complete."; _err_map[JERR_JCNTL_UNKNOWNMAGIC] = "JERR_JCNTL_UNKNOWNMAGIC: Found record with unknown magic."; _err_map[JERR_JCNTL_NOTRECOVERED] = "JERR_JCNTL_NOTRECOVERED: Operation requires recover() to be run first."; - _err_map[JERR_JCNTL_RECOVERJFULL] = "JERR_JCNTL_RECOVERJFULL: Journal data files full, cannot write."; - _err_map[JERR_JCNTL_OWIMISMATCH] = "JERR_JCNTL_OWIMISMATCH: Overwrite Indicator (OWI) change found in unexpected location."; + _err_map[JERR_JCNTL_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write"; + _err_map[JERR_JCNTL_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read"; + _err_map[JERR_JCNTL_ENQSTATE] = "JERR_JCNTL_ENQSTATE: Read error: Record not in ENQ state"; + _err_map[JERR_JCNTL_INVALIDENQHDR] = "JERR_JCNTL_INVALIDENQHDR: Invalid ENQ header"; // class jdir _err_map[JERR_JDIR_NOTDIR] = "JERR_JDIR_NOTDIR: Directory name exists but is not a directory."; @@ -173,21 +161,14 @@ jerrno::__init() _err_map[JERR_JDIR_UNLINK] = "JERR_JDIR_UNLINK: File delete failed."; _err_map[JERR_JDIR_BADFTYPE] = "JERR_JDIR_BADFTYPE: Bad or unknown file type (stat mode)."; - // class fcntl -// _err_map[JERR_FCNTL_OPENWR] = "JERR_FCNTL_OPENWR: Unable to open file for write."; -// _err_map[JERR_FCNTL_WRITE] = "JERR_FCNTL_WRITE: Unable to write to file."; -// _err_map[JERR_FCNTL_CLOSE] = "JERR_FCNTL_CLOSE: File close failed."; -// _err_map[JERR_FCNTL_FILEOFFSOVFL] = "JERR_FCNTL_FILEOFFSOVFL: Attempted increase file offset past file size."; -// _err_map[JERR_FCNTL_CMPLOFFSOVFL] = "JERR_FCNTL_CMPLOFFSOVFL: Attempted increase completed file offset past submitted offset."; -// _err_map[JERR_FCNTL_RDOFFSOVFL] = "JERR_FCNTL_RDOFFSOVFL: Attempted increase read offset past write offset."; + // class JournalFile + _err_map[JERR_JNLF_OPEN] = "JERR_JNLF_OPEN: Unable to open file for write"; + _err_map[JERR_JNLF_CLOSE] = "JERR_JNLF_CLOSE: Unable to close file"; + _err_map[JERR_JNLF_FILEOFFSOVFL] = "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size."; + _err_map[JERR_JNLF_CMPLOFFSOVFL] = "JERR_JNLF_CMPLOFFSOVFL: Attempted to increase completed file offset past submitted offset."; - // class lfmgr -// _err_map[JERR_LFMGR_BADAEFNUMLIM] = "JERR_LFMGR_BADAEFNUMLIM: Auto-expand file number limit lower than initial number of journal files."; -// _err_map[JERR_LFMGR_AEFNUMLIMIT] = "JERR_LFMGR_AEFNUMLIMIT: Exceeded auto-expand file number limit."; -// _err_map[JERR_LFMGR_AEDISABLED] = "JERR_LFMGR_AEDISABLED: Attempted to expand with auto-expand disabled."; - - // class rrfc -// _err_map[JERR_RRFC_OPENRD] = "JERR_RRFC_OPENRD: Unable to open file for read."; + // class LinearFileController + _err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found"; // class jrec, enq_rec, deq_rec, txn_rec _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header."; @@ -199,13 +180,14 @@ jerrno::__init() _err_map[JERR_WMGR_ENQDISCONT] = "JERR_WMGR_ENQDISCONT: Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART)."; _err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART)."; _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued."; + _err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle."; - // class rmgr - _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic."; - _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID"; - //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc"; - _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ"; - _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type"; +// // class rmgr +// _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic."; +// _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID"; +// //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc"; +// _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ"; +// _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type"; // class data_tok _err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state."; @@ -216,19 +198,6 @@ jerrno::__init() _err_map[JERR_MAP_NOTFOUND] = "JERR_MAP_NOTFOUND: Key not found in map."; _err_map[JERR_MAP_LOCKED] = "JERR_MAP_LOCKED: Record ID locked by a pending transaction."; - // class jinf -// _err_map[JERR_JINF_CVALIDFAIL] = "JERR_JINF_CVALIDFAIL: Journal compatibility validation failure."; -// _err_map[JERR_JINF_NOVALUESTR] = "JERR_JINF_NOVALUESTR: No value attribute found in jinf file."; -// _err_map[JERR_JINF_BADVALUESTR] = "JERR_JINF_BADVALUESTR: Bad format for value attribute in jinf file"; -// _err_map[JERR_JINF_JDATEMPTY] = "JERR_JINF_JDATEMPTY: Journal data files empty."; -// _err_map[JERR_JINF_TOOMANYFILES] = "JERR_JINF_TOOMANYFILES: Too many journal data files."; -// _err_map[JERR_JINF_INVALIDFHDR] = "JERR_JINF_INVALIDFHDR: Invalid journal data file header"; -// _err_map[JERR_JINF_STAT] = "JERR_JINF_STAT: Error while trying to stat a journal data file"; -// _err_map[JERR_JINF_NOTREGFILE] = "JERR_JINF_NOTREGFILE: Target journal data file is not a regular file"; -// _err_map[JERR_JINF_BADFILESIZE] = "JERR_JINF_BADFILESIZE: Journal data file is of incorrect or unexpected size"; -// _err_map[JERR_JINF_OWIBAD] = "JERR_JINF_OWIBAD: Journal data files have inconsistent OWI flags; >1 transition found in non-auto-expand or min-size journal"; -// _err_map[JERR_JINF_ZEROLENFILE] = "JERR_JINF_ZEROLENFILE: Journal info file zero length"; - // EFP errors _err_map[JERR_EFP_BADPARTITIONNAME] = "JERR_EFP_BADPARTITIONNAME: Invalid partition name (must be \'pNNN\' where NNN is a non-zero number)"; _err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad Empty File Pool directory name (must be \'NNNk\', where NNN is a number which is a multiple of 4)"; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h index 2ebabd84b8..ecf969a344 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h @@ -61,6 +61,7 @@ namespace qls_jrnl static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event static const uint32_t JERR__RECNFOUND; ///< Record not found static const uint32_t JERR__NOTIMPL; ///< Not implemented + static const uint32_t JERR__NULL; ///< Operation on null pointer // class jcntl static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal @@ -68,8 +69,10 @@ namespace qls_jrnl static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first - static const uint32_t JERR_JCNTL_RECOVERJFULL; ///< Journal data files full, cannot write - static const uint32_t JERR_JCNTL_OWIMISMATCH; ///< OWI change found in unexpected location + static const uint32_t JERR_JCNTL_OPENRD; ///< Unable to open file for read + static const uint32_t JERR_JCNTL_READ; ///< Read error: no or insufficient data to read + static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state + static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header // class jdir static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory @@ -84,21 +87,14 @@ namespace qls_jrnl static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode) - // class fcntl -// static const uint32_t JERR_FCNTL_OPENWR; ///< Unable to open file for write -// static const uint32_t JERR_FCNTL_WRITE; ///< Unable to write to file -// static const uint32_t JERR_FCNTL_CLOSE; ///< File close failed -// static const uint32_t JERR_FCNTL_FILEOFFSOVFL; ///< Increased offset past file size -// static const uint32_t JERR_FCNTL_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs -// static const uint32_t JERR_FCNTL_RDOFFSOVFL; ///< Increased read offs past write offs + // class JournalFile + static const uint32_t JERR_JNLF_OPEN; ///< Unable to open file for write + static const uint32_t JERR_JNLF_CLOSE; ///< Unable to close file + static const uint32_t JERR_JNLF_FILEOFFSOVFL; ///< Increased offset past file size + static const uint32_t JERR_JNLF_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs - // class lfmgr -// static const uint32_t JERR_LFMGR_BADAEFNUMLIM; ///< Bad auto-expand file number limit -// static const uint32_t JERR_LFMGR_AEFNUMLIMIT; ///< Exceeded auto-expand file number limit -// static const uint32_t JERR_LFMGR_AEDISABLED; ///< Attempted to expand with auto-expand disabled - - // class rrfc -// static const uint32_t JERR_RRFC_OPENRD; ///< Unable to open file for read + // class LinearFileController + static const uint32_t JERR_LFCR_SEQNUMNOTFOUND;///< File sequence number not found // class jrec, enq_rec, deq_rec, txn_rec static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header @@ -110,13 +106,14 @@ namespace qls_jrnl static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl. static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl. static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued + static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle - // class rmgr - static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic - static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok - //static const uint32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc - static const uint32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ - static const uint32_t JERR_RMGR_BADRECTYPE; ///< Attempted op on incorrect rec type +// // class rmgr +// static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic +// static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok +// //static const uint32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc +// static const uint32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ +// static const uint32_t JERR_RMGR_BADRECTYPE; ///< Attempted op on incorrect rec type // class data_tok static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state @@ -127,19 +124,6 @@ namespace qls_jrnl static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn - // class jinf -// static const uint32_t JERR_JINF_CVALIDFAIL; ///< Compatibility validation failure -// static const uint32_t JERR_JINF_NOVALUESTR; ///< No value attr found in jinf file -// static const uint32_t JERR_JINF_BADVALUESTR; ///< Bad format for value attr in jinf file -// static const uint32_t JERR_JINF_JDATEMPTY; ///< Journal data files empty -// static const uint32_t JERR_JINF_TOOMANYFILES; ///< Too many journal data files -// static const uint32_t JERR_JINF_INVALIDFHDR; ///< Invalid file header -// static const uint32_t JERR_JINF_STAT; ///< Error while trying to stat a file -// static const uint32_t JERR_JINF_NOTREGFILE; ///< Target file is not a regular file -// static const uint32_t JERR_JINF_BADFILESIZE; ///< File is of incorrect or unexpected size -// static const uint32_t JERR_JINF_OWIBAD; ///< OWI inconsistent (>1 transition in non-ae journal) -// static const uint32_t JERR_JINF_ZEROLENFILE; ///< Journal info file is zero length (empty). - // EFP errors static const uint32_t JERR_EFP_BADPARTITIONNAME; ///< Partition name invalid or of value 0 static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h index 59c5df2ff2..c77daa0cb4 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h @@ -148,9 +148,9 @@ namespace qls_jrnl virtual std::size_t rec_size() const = 0; inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); } static inline uint32_t size_dblks(const std::size_t size) - { return size_blks(size, JRNL_DBLK_SIZE); } + { return size_blks(size, JRNL_DBLK_SIZE_BYTES); } static inline uint32_t size_sblks(const std::size_t size) - { return size_blks(size, JRNL_SBLK_SIZE); } + { return size_blks(size, JRNL_SBLK_SIZE_BYTES); } static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize) { return (size + blksize - 1)/blksize; } virtual uint64_t rid() const = 0; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp index 40611692cf..f61a2bb970 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp @@ -29,7 +29,6 @@ #include "qpid/linearstore/jrnl/jerrno.h" #include <sstream> - namespace qpid { namespace qls_jrnl @@ -44,6 +43,7 @@ pmgr::page_cb::page_cb(uint16_t index): _pdtokl(0), // _wfh(0), // _rfh(0), + _jfp(0), _pbuff(0) {} @@ -64,7 +64,8 @@ pmgr::page_cb::state_str() const return "<unknown>"; } -const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE; +// static +const uint32_t pmgr::_sblkSizeBytes = JRNL_SBLK_SIZE_BYTES; pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap): _cache_pgsize_sblks(0), @@ -107,32 +108,33 @@ pmgr::initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks, con _cbp = cbp; // 1. Allocate page memory (as a single block) - std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblksize; - if (::posix_memalign(&_page_base_ptr, _sblksize, cache_pgsize)) + std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblkSizeBytes; + if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY, cache_pgsize)) { clean(); std::ostringstream oss; - oss << "posix_memalign(): blksize=" << _sblksize << " size=" << cache_pgsize; + oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << cache_pgsize; oss << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize"); } + // 2. Allocate array of page pointers _page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*)); MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize"); - // 3. Allocate and initilaize page control block (page_cb) array + // 3. Allocate and initialize page control block (page_cb) array _page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb)); MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize"); std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb)); - // 5. Allocate IO control block (iocb) array + // 4. Allocate IO control block (iocb) array _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb)); MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize"); - // 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block + // 5. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block for (uint16_t i=0; i<_cache_num_pages; i++) { - _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * i); + _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblkSizeBytes * i); _page_cb_arr[i]._index = i; _page_cb_arr[i]._state = UNUSED; _page_cb_arr[i]._pbuff = _page_ptr_arr[i]; @@ -141,12 +143,12 @@ pmgr::initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks, con _aio_cb_arr[i].data = (void*)&_page_cb_arr[i]; } - // 7. Allocate io_event array, max one event per cache page plus one for each file - const uint16_t max_aio_evts = _cache_num_pages /*+ _jc->num_jfiles()*/; // TODO find replacement here for linear store + // 6. Allocate io_event array, max one event per cache page plus one for each file + const uint16_t max_aio_evts = _cache_num_pages + 1; // One additional event for file header writes _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event)); MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize"); - // 8. Initialize AIO context + // 7. Initialize AIO context if (int ret = aio::queue_init(max_aio_evts, &_ioctx)) { std::ostringstream oss; @@ -158,7 +160,7 @@ pmgr::initialize(aio_callback* const cbp, const uint32_t cache_pgsize_sblks, con void pmgr::clean() { - // clean up allocated memory here + // Clean up allocated memory here if (_ioctx) aio::queue_release(_ioctx); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h index ebd3ddf181..e9764aeeac 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h @@ -45,6 +45,7 @@ namespace qpid { namespace qls_jrnl { +class JournalFile; /** * \brief Abstract class for managing either read or write page cache of arbitrary size and @@ -64,7 +65,6 @@ namespace qls_jrnl AIO_COMPLETE ///< An AIO request is complete. }; - protected: /** * \brief Page control block, carries control and state information for each page in the * cache. @@ -79,13 +79,15 @@ namespace qls_jrnl std::deque<data_tok*>* _pdtokl; ///< Page message tokens list //fcntl* _wfh; ///< File handle for incrementing write compl counts //fcntl* _rfh; ///< File handle for incrementing read compl counts + JournalFile* _jfp; ///< Journal file for incrementing compl counts void* _pbuff; ///< Page buffer page_cb(uint16_t index); ///< Convenience constructor const char* state_str() const; ///< Return state as string for this pcb }; - static const uint32_t _sblksize; ///< Disk softblock size + protected: + static const uint32_t _sblkSizeBytes; ///< Disk softblock size uint32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages uint16_t _cache_num_pages; ///< Number of page cache cache_num_pages jcntl* _jc; ///< Pointer to journal controller diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h b/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h index 57c80b13b4..46541e7f31 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h @@ -37,98 +37,42 @@ namespace qls_jrnl struct rcvdat { - uint16_t _njf; ///< Number of journal files -// bool _ae; ///< Auto-expand mode -// uint16_t _aemjf; ///< Auto-expand mode max journal files -// bool _owi; ///< Overwrite indicator -// bool _frot; ///< First rotation flag - bool _jempty; ///< Journal data files empty - uint16_t _ffid; ///< First file id - std::size_t _fro; ///< First record offset in ffid - uint16_t _lfid; ///< Last file id - std::size_t _eo; ///< End offset (first byte past last record) - uint64_t _h_rid; ///< Highest rid found - bool _lffull; ///< Last file is full - bool _jfull; ///< Journal is full -// std::vector<uint16_t> _fid_list; ///< Fid-lid mapping - list of fids in order of lid - std::vector<uint32_t> _enq_cnt_list; ///< Number enqueued records found for each file + std::vector<std::string> _jfl; ///< Journal file list + std::map<uint64_t, std::string> _fm; ///< File number - name map + std::vector<uint32_t> _enq_cnt_list; ///< Number enqueued records found for each file + bool _jempty; ///< Journal data files empty + std::size_t _fro; ///< First record offset in ffid + std::size_t _eo; ///< End offset (first byte past last record) + uint64_t _h_rid; ///< Highest rid found + bool _lffull; ///< Last file is full rcvdat(): - _njf(0), -// _ae(false), -// _aemjf(0), -// _owi(false), -// _frot(false), - _jempty(true), - _ffid(0), + _jfl(), + _fm(), + _enq_cnt_list(), + _jempty(false), _fro(0), - _lfid(0), _eo(0), _h_rid(0), - _lffull(false), - _jfull(false), -// _fid_list(), - _enq_cnt_list() + _lffull(false) {} - void reset(const uint16_t num_jfiles/*, const bool auto_expand, const uint16_t ae_max_jfiles*/) - { - _njf = num_jfiles; -// _ae = auto_expand; -// _aemjf = ae_max_jfiles; -// _owi = false; -// _frot = false; - _jempty = true; - _ffid = 0; - _fro = 0; - _lfid = 0; - _eo = 0; - _h_rid = 0; - _lffull = false; - _jfull = false; -// _fid_list.clear(); - _enq_cnt_list.clear(); - _enq_cnt_list.resize(num_jfiles, 0); - } - - // Find first fid with enqueued records - uint16_t ffid() - { - uint16_t index = _ffid; - while (index != _lfid && _enq_cnt_list[index] == 0) - { - if (++index >= _njf) - index = 0; - } - return index; - } - std::string to_string(const std::string& jid) { std::ostringstream oss; oss << "Recover file analysis (jid=\"" << jid << "\"):" << std::endl; - oss << " Number of journal files (_njf) = " << _njf << std::endl; -// oss << " Auto-expand mode (_ae) = " << (_ae ? "TRUE" : "FALSE") << std::endl; -// if (_ae) oss << " Auto-expand mode max journal files (_aemjf) = " << _aemjf << std::endl; -// oss << " Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") << std::endl; -// oss << " First rotation (_frot) = " << (_frot ? "TRUE" : "FALSE") << std::endl; + oss << " Number of journal files = " << _fm.size() << std::endl; + oss << " Journal File List (_jfl):"; + for (std::map<uint64_t, std::string>::const_iterator i=_fm.begin(); i!=_fm.end(); ++i) { + oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl; + } oss << " Journal empty (_jempty) = " << (_jempty ? "TRUE" : "FALSE") << std::endl; - oss << " First (earliest) fid (_ffid) = " << _ffid << std::endl; oss << " First record offset in first fid (_fro) = 0x" << std::hex << _fro << - std::dec << " (" << (_fro/JRNL_DBLK_SIZE) << " dblks)" << std::endl; - oss << " Last (most recent) fid (_lfid) = " << _lfid << std::endl; + std::dec << " (" << (_fro/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl; oss << " End offset (_eo) = 0x" << std::hex << _eo << std::dec << " (" << - (_eo/JRNL_DBLK_SIZE) << " dblks)" << std::endl; + (_eo/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl; oss << " Highest rid (_h_rid) = 0x" << std::hex << _h_rid << std::dec << std::endl; oss << " Last file full (_lffull) = " << (_lffull ? "TRUE" : "FALSE") << std::endl; - oss << " Journal full (_jfull) = " << (_jfull ? "TRUE" : "FALSE") << std::endl; - oss << " Normalized fid list (_fid_list) = ["; -// for (std::vector<uint16_t>::const_iterator i = _fid_list.begin(); i < _fid_list.end(); i++) -// { -// if (i != _fid_list.begin()) oss << ", "; -// oss << *i; -// } - oss << "]" << std::endl; oss << " Enqueued records (txn & non-txn):" << std::endl; for (unsigned i=0; i<_enq_cnt_list.size(); i++) oss << " File " << std::setw(2) << i << ": " << _enq_cnt_list[i] << @@ -140,28 +84,25 @@ namespace qls_jrnl { std::ostringstream oss; oss << "Recover file analysis (jid=\"" << jid << "\"):"; - oss << " njf=" << _njf; -// oss << " ae=" << (_owi ? "T" : "F"); -// oss << " aemjf=" << _aemjf; -// oss << " owi=" << (_ae ? "T" : "F"); -// oss << " frot=" << (_frot ? "T" : "F"); + oss << " jfl=["; + for (std::map<uint64_t, std::string>::const_iterator i=_fm.begin(); i!=_fm.end(); ++i) { + if (i!=_fm.begin()) oss << " "; + oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1); + } + oss << "]"; + oss << " _enq_cnt_list: [ "; + for (unsigned i=0; i<_enq_cnt_list.size(); i++) { + if (i) oss << " "; + oss << _enq_cnt_list[i]; + } + oss << " ]"; oss << " jempty=" << (_jempty ? "T" : "F"); - oss << " ffid=" << _ffid; oss << " fro=0x" << std::hex << _fro << std::dec << " (" << - (_fro/JRNL_DBLK_SIZE) << " dblks)"; - oss << " lfid=" << _lfid; + (_fro/JRNL_DBLK_SIZE_BYTES) << " dblks)"; oss << " eo=0x" << std::hex << _eo << std::dec << " (" << - (_eo/JRNL_DBLK_SIZE) << " dblks)"; + (_eo/JRNL_DBLK_SIZE_BYTES) << " dblks)"; oss << " h_rid=0x" << std::hex << _h_rid << std::dec; oss << " lffull=" << (_lffull ? "T" : "F"); - oss << " jfull=" << (_jfull ? "T" : "F"); - oss << " Enqueued records (txn & non-txn): [ "; - for (unsigned i=0; i<_enq_cnt_list.size(); i++) - { - if (i) oss << " "; - oss << "fid_" << std::setw(2) << std::setfill('0') << i << "=" << _enq_cnt_list[i]; - } - oss << " ]"; return oss.str(); } }; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp index 9722b78e81..f0ab4fe5cf 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp @@ -53,10 +53,10 @@ rmgr::initialize(aio_callback* const cbp) pmgr::initialize(cbp, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES); clean(); // Allocate memory for reading file header - if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize)) + if (::posix_memalign(&_fhdr_buffer, _sblkSizeBytes, _sblkSizeBytes)) { std::ostringstream oss; - oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize; + oss << "posix_memalign(): blksize=" << _sblkSizeBytes << " size=" << _sblkSizeBytes; oss << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize"); } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp index 721bb3a966..d1972ea1f2 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp @@ -50,23 +50,27 @@ txn_data_struct::txn_data_struct(const uint64_t rid, const uint64_t drid, const {} txn_map::txn_map(): - _map(), - _pfid_txn_cnt() + _map()/*, + _pfid_txn_cnt()*/ {} txn_map::~txn_map() {} +/* void txn_map::set_num_jfiles(const uint16_t num_jfiles) { _pfid_txn_cnt.resize(num_jfiles, 0); } +*/ +/* uint32_t txn_map::get_txn_pfid_cnt(const uint16_t pfid) const { return _pfid_txn_cnt.at(pfid); } +*/ bool txn_map::insert_txn_data(const std::string& xid, const txn_data& td) @@ -84,7 +88,7 @@ txn_map::insert_txn_data(const std::string& xid, const txn_data& td) } else itr->second.push_back(td); - _pfid_txn_cnt.at(td._pfid)++; +// _pfid_txn_cnt.at(td._pfid)++; return ok; } @@ -113,8 +117,8 @@ txn_map::get_remove_tdata_list(const std::string& xid) return _empty_data_list; txn_data_list list = itr->second; _map.erase(itr); - for (tdl_itr i=list.begin(); i!=list.end(); i++) - _pfid_txn_cnt.at(i->_pfid)--; +// for (tdl_itr i=list.begin(); i!=list.end(); i++) +// _pfid_txn_cnt.at(i->_pfid)--; return list; } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h index 0cbafd66b0..b69d17da1c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h @@ -112,15 +112,15 @@ namespace qls_jrnl xmap _map; smutex _mutex; - std::vector<uint32_t> _pfid_txn_cnt; +// std::vector<uint32_t> _pfid_txn_cnt; const txn_data_list _empty_data_list; public: txn_map(); virtual ~txn_map(); - void set_num_jfiles(const uint16_t num_jfiles); - uint32_t get_txn_pfid_cnt(const uint16_t pfid) const; +// void set_num_jfiles(const uint16_t num_jfiles); +// uint32_t get_txn_pfid_cnt(const uint16_t pfid) const; bool insert_txn_data(const std::string& xid, const txn_data& td); const txn_data_list get_tdata_list(const std::string& xid); const txn_data_list get_remove_tdata_list(const std::string& xid); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp index 886877976b..e6f6f238f1 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp @@ -94,8 +94,8 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) assert(max_size_dblks > 0); assert(_xidp != 0 && _txn_hdr._xidsize > 0); - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; - std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE; + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; + std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES; std::size_t wr_cnt = 0; if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages) { @@ -146,8 +146,8 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize); wr_cnt += wsize; #ifdef RHM_CLEAN - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; - std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE; + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; + std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES; std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } @@ -187,7 +187,7 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail)); wr_cnt += sizeof(_txn_tail); #ifdef RHM_CLEAN - std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE; + std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES; std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } @@ -206,7 +206,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ { const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize); const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t)); - const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; + const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks) { @@ -239,7 +239,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs; std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); rd_cnt += xid_rem; - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem); @@ -249,7 +249,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ else { // Remainder of xid split - const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE); + const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES); std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size); rd_cnt += xid_cp_size; } @@ -288,7 +288,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ // Entire header and xid fit within this page, tail split std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize); rd_cnt += _txn_hdr._xidsize; - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem); @@ -298,7 +298,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_ else { // Header fits within this page, xid split - const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size); rd_cnt += xid_cp_size; } @@ -357,7 +357,7 @@ txn_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) return false; } } - ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size()); + ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size()); chk_tail(); // Throws if tail invalid or record incomplete assert(!ifsp->fail() && !ifsp->bad()); return true; diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c index 235828ef5a..b8d2da7722 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c @@ -54,3 +54,10 @@ void set_enq_external(enq_hdr_t *eh, const bool external) { eh->_rhdr._uflag = external ? eh->_rhdr._uflag | ENQ_HDR_EXTERNAL_MASK : eh->_rhdr._uflag & (~ENQ_HDR_EXTERNAL_MASK); } + +bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid) { + return eh->_rhdr._magic == magic && + eh->_rhdr._version == version && + rid > 0 ? eh->_rhdr._rid == rid /* If rid == 0, don't compare rids */ + : true; +} diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h index e194dafb9a..1beaef1db8 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h @@ -70,6 +70,7 @@ bool is_enq_transient(const enq_hdr_t *eh); void set_enq_transient(enq_hdr_t *eh, const bool transient); bool is_enq_external(const enq_hdr_t *eh); void set_enq_external(enq_hdr_t *eh, const bool external); +bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid); #pragma pack() diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c index e8ebe9db94..8689ca3097 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c @@ -36,19 +36,21 @@ void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t vers dest->_queue_name_len = 0; } -int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro, +int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) { - dest->_rhdr._uflag = uflag; - dest->_rhdr._rid = rid; - dest->_fro = fro; - dest->_file_number = file_number; + file_hdr_t* fhp = (file_hdr_t*)dest; + fhp->_rhdr._uflag = uflag; + fhp->_rhdr._rid = rid; + fhp->_fro = fro; + fhp->_file_number = file_number; if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) { - dest->_queue_name_len = queue_name_len; + fhp->_queue_name_len = queue_name_len; } else { - dest->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t); + fhp->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t); } - dest->_queue_name_len = queue_name_len; - memcpy(dest + sizeof(file_hdr_t), queue_name, queue_name_len); + fhp->_queue_name_len = queue_name_len; + memcpy((char*)dest + sizeof(file_hdr_t), queue_name, queue_name_len); + memset((char*)dest + sizeof(file_hdr_t) + queue_name_len, 0, dest_len - sizeof(file_hdr_t) - queue_name_len); return set_time_now(dest); } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h index 6a2cae4ba4..efdd97b624 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h @@ -89,7 +89,7 @@ typedef struct file_hdr_t { void file_hdr_create(file_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t fhdr_size_sblks, const uint16_t efp_partition, const uint64_t file_size); -int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro, +int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro, const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name); void file_hdr_reset(file_hdr_t* target); int is_file_hdr_reset(file_hdr_t* target); diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index 3062f059cd..f61da49a1b 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -28,26 +28,23 @@ #include "qpid/linearstore/jrnl/utils/file_hdr.h" #include "qpid/linearstore/jrnl/jcntl.h" #include "qpid/linearstore/jrnl/jerrno.h" +#include "qpid/linearstore/jrnl/JournalFile.h" #include <sstream> #include <stdint.h> +//#include <iostream> // DEBUG + namespace qpid { namespace qls_jrnl { -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/): +wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc): pmgr(jc, emap, tmap), -// _wrfc(wrfc), + _lfc(lfc), _max_dtokpp(0), _max_io_wait_us(0), - _fhdr_base_ptr(0), - _fhdr_ptr_arr(0), - _fhdr_aio_cb_arr(0), _cached_offset_dblks(0), -// _jfsize_dblks(0), -// _jfsize_pgs(0), -// _num_jfiles(0), _enq_busy(false), _deq_busy(false), _abort_busy(false), @@ -55,19 +52,12 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/): _txn_pending_set() {} -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/, - const uint32_t max_dtokpp, const uint32_t max_iowait_us): +wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us): pmgr(jc, emap, tmap /* , dtoklp */), -// _wrfc(wrfc), + _lfc(lfc), _max_dtokpp(max_dtokpp), _max_io_wait_us(max_iowait_us), - _fhdr_base_ptr(0), - _fhdr_ptr_arr(0), - _fhdr_aio_cb_arr(0), _cached_offset_dblks(0), -// _jfsize_dblks(0), -// _jfsize_pgs(0), -// _num_jfiles(0), _enq_busy(false), _deq_busy(false), _abort_busy(false), @@ -94,14 +84,10 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); -// _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS; -// _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; -// assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); - if (eo) { const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS; - uint32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr + uint32_t data_dblks = (eo / JRNL_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr _pg_cntr = data_dblks / wr_pg_size_dblks; _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); } @@ -138,7 +124,7 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();/*_wrfc.get_incr_rid()*/ _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient, external); if (!cont) @@ -155,15 +141,15 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks, (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - // TODO: replace for linearstore: _wrfc -// if (data_offs_dblks == 0) -// dtokp->set_fid(_wrfc.index()); + if (data_offs_dblks == 0) { + dtokp->set_fid(_lfc.getCurrentFileSeqNum()); + } _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -180,7 +166,7 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, // long multi-page messages have their token on the page containing the END of the // message. AIO callbacks will then only process this token when entire message is // enqueued. - //_wrfc.incr_enqcnt(dtokp->fid()); // TODO: replace for linearstore: _wrfc + _lfc.incrEnqueuedRecordCount(); if (xid_len) // If part of transaction, add to transaction map { @@ -189,7 +175,7 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, } else { - if (_emap.insert_pfid(rid, dtokp->fid()) < enq_map::EMAP_OK) // fail + if (_emap.insert_pfid(rid, dtokp->fid(), 0) < enq_map::EMAP_OK) // fail { // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; @@ -260,15 +246,15 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - // TODO: replace for linearstore: _wrfc -// if (data_offs_dblks == 0) -// dtokp->set_fid(_wrfc.index()); + if (data_offs_dblks == 0) { + dtokp->set_fid(_lfc.getCurrentFileSeqNum()); + } _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -290,23 +276,24 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ } else { - int16_t fid = _emap.get_remove_pfid(dtokp->dequeue_rid()); - if (fid < enq_map::EMAP_OK) // fail + int16_t fid; + short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid); + if (eres < enq_map::EMAP_OK) // fail { - if (fid == enq_map::EMAP_RID_NOT_FOUND) + if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; oss << std::hex << "rid=0x" << rid; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); } - if (fid == enq_map::EMAP_LOCKED) + if (eres == enq_map::EMAP_LOCKED) { std::ostringstream oss; oss << std::hex << "rid=0x" << rid; throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } -// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc + _lfc.decrEnqueuedRecordCount(); } done = true; @@ -348,7 +335,7 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() :/* _wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId(); _txn_rec.reset(QLS_TXA_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/); if (!cont) { @@ -362,15 +349,14 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - // TODO: replace for linearstore: _wrfc -// if (data_offs_dblks == 0) -// dtokp->set_fid(_wrfc.index()); + if (data_offs_dblks == 0) + dtokp->set_fid(_lfc.getCurrentFileSeqNum()); _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -389,9 +375,8 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le { if (!itr->_enq_flag) _emap.unlock(itr->_drid); // ignore rid not found error - // TODO: replace for linearstore: _wrfc -// if (itr->_enq_flag) -// _wrfc.decr_enqcnt(itr->_pfid); + if (itr->_enq_flag) + _lfc.decrEnqueuedRecordCount(itr->_pfid); } std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); if (!res.second) @@ -440,7 +425,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId(); _txn_rec.reset(QLS_TXC_MAGIC, rid, xid_ptr, xid_len/*, _wrfc.owi()*/); if (!cont) { @@ -454,15 +439,14 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files - // TODO: replace for linearstore: _wrfc -// if (data_offs_dblks == 0) -// dtokp->set_fid(_wrfc.index()); + if (data_offs_dblks == 0) + dtokp->set_fid(_lfc.getCurrentFileSeqNum()); _pg_offset_dblks += ret; _cached_offset_dblks += ret; dtokp->incr_dblocks_written(ret); @@ -481,7 +465,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l { if (itr->_enq_flag) // txn enqueue { - if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail + if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail { // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; @@ -491,23 +475,24 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l } else // txn dequeue { - int16_t fid = _emap.get_remove_pfid(itr->_drid, true); - if (fid < enq_map::EMAP_OK) // fail + int16_t fid; + short eres = _emap.get_remove_pfid(itr->_drid, fid, true); + if (eres < enq_map::EMAP_OK) // fail { - if (fid == enq_map::EMAP_RID_NOT_FOUND) + if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; oss << std::hex << "rid=0x" << rid; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); } - if (fid == enq_map::EMAP_LOCKED) + if (eres == enq_map::EMAP_LOCKED) { std::ostringstream oss; oss << std::hex << "rid=0x" << rid; throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } -// _wrfc.decr_enqcnt(fid); // TODO: replace for linearstore: _wrfc + _lfc.decrEnqueuedRecordCount(fid); } } std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); @@ -532,30 +517,26 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l } void -wmgr::file_header_check(const uint64_t /*rid*/, const bool /*cont*/, const uint32_t /*rec_dblks_rem*/) +wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem) { - // Has the file header been written (i.e. write pointers still at 0)? - // TODO: replace for linearstore: _wrfc -/* - if (_wrfc.is_void()) // TODO: replace for linearstore: _wrfc + if (_lfc.isEmpty()) // File never written (i.e. no header or data) { - bool file_fit = rec_dblks_rem <= _jfsize_dblks; - bool file_full = rec_dblks_rem == _jfsize_dblks; std::size_t fro = 0; - if (cont) - { - if (file_fit && !file_full) - fro = (rec_dblks_rem + JRNL_SBLK_SIZE_DBLKS) * JRNL_DBLK_SIZE; + if (cont) { + bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will fit within this journal file + bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will exactly fill this journal file + if (file_fit && !file_full) { + fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS)) * JRNL_DBLK_SIZE_BYTES; + } + } else { + fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES; } - else - fro = JRNL_SBLK_SIZE; - write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc + _lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro); } -*/ } void -wmgr::flush_check(iores& res, bool& /*cont*/, bool& done) +wmgr::flush_check(iores& res, bool& cont, bool& done) { // Is page is full, flush if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) @@ -569,22 +550,15 @@ wmgr::flush_check(iores& res, bool& /*cont*/, bool& done) done = true; } -/* // If file is full, rotate to next file - if (_pg_cntr >= _jfsize_pgs) + uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks; + if (_pg_cntr >= fileSize_pgs) { - iores rfres = rotate_file(); - if (rfres != RHM_IORES_SUCCESS) - res = rfres; - if (!done) - { - if (rfres == RHM_IORES_SUCCESS) - cont = true; - else - done = true; + get_next_file(); + if (!done) { + cont = true; } } -*/ } } @@ -592,14 +566,10 @@ iores wmgr::flush() { iores res = write_flush(); -/* - if (_pg_cntr >= _jfsize_pgs) - { - iores rfres = rotate_file(); - if (rfres != RHM_IORES_SUCCESS) - res = rfres; + uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks; + if (res == RHM_IORES_SUCCESS && _pg_cntr >= fileSize_pgs) { + get_next_file(); } -*/ return res; } @@ -607,7 +577,6 @@ iores wmgr::write_flush() { iores res = RHM_IORES_SUCCESS; -/* // Don't bother flushing an empty page or one that is still in state AIO_PENDING if (_cached_offset_dblks) { @@ -629,18 +598,9 @@ wmgr::write_flush() // if necessary. dblk_roundup(); - std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE; + std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE_BYTES; aio_cb* aiocbp = &_aio_cb_arr[_pg_index]; - aio::prep_pwrite_2(aiocbp, _wrfc.fh(), - (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks * JRNL_DBLK_SIZE, - _wrfc.subm_offs()); - page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb) - pcbp->_wdblks = _cached_offset_dblks; - pcbp->_wfh = _wrfc.file_controller(); - if (aio::submit(_ioctx, 1, &aiocbp) < 0) - throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush"); - _wrfc.add_subm_cnt_dblks(_cached_offset_dblks); - _wrfc.incr_aio_cnt(); + _lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks); _aio_evt_rem++; _cached_offset_dblks = 0; _jc->instr_incr_outstanding_aio_cnt(); @@ -653,21 +613,14 @@ wmgr::write_flush() get_events(UNUSED, 0); if (_page_cb_arr[_pg_index]._state == UNUSED) _page_cb_arr[_pg_index]._state = IN_USE; -*/ return res; } -iores -wmgr::rotate_file() +void +wmgr::get_next_file() { - // TODO: replace for linearstore: _wrfc -/* _pg_cntr = 0; - iores res = _wrfc.rotate(); - _jc->chk_wr_frot(); - return res; -*/ - return RHM_IORES_SUCCESS; + _lfc.pullEmptyFileFromEfp(); } int32_t @@ -702,17 +655,15 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb) page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb) long aioret = (long)_aio_event_arr[i].res; - if (aioret < 0) - { + if (aioret < 0) { std::ostringstream oss; oss << "AIO write operation failed: " << std::strerror(-aioret) << " (" << aioret << ") ["; - if (pcbp) + if (pcbp) { oss << "pg=" << pcbp->_index; - else - { - // TODO: replace for linearstore: fhp->_pfid -// file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf; -// oss << "fid=" << fhp->_pfid; + } else { + file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf; + oss << "fnum=" << fhp->_file_number; + oss << " qname=" << std::string((char*)fhp + sizeof(file_hdr_t), fhp->_queue_name_len); } oss << " size=" << aiocbp->u.c.nbytes; oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]"; @@ -790,16 +741,15 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) oss << "dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str(); throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr", "get_events"); - } // switch - } // if - } // for + } + } + } // Increment the completed write offset // NOTE: We cannot use _wrfc here, as it may have rotated since submitting count. // Use stored pointer to fcntl in the pcb instead. - // TODO: replace for linearstore: pcbp->_wfh -// pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks); -// pcbp->_wfh->decr_aio_cnt(); + pcbp->_jfp->addCompletedDblkCount(pcbp->_wdblks); + pcbp->_jfp->decrOutstandingAioOperationCount(); _jc->instr_decr_outstanding_aio_cnt(); // Clean up this pcb's data_tok list @@ -812,16 +762,10 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) } else // File header writes have no pcb { - // get lfid from original file header record, update info for that lfid - // TODO: replace for linearstore: lfid -/* - file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf; - uint32_t lfid = fhp->_lfid; - fcntl* fcntlp = _jc->get_fcntlp(lfid); - fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); - fcntlp->decr_aio_cnt(); - fcntlp->set_wr_fhdr_aio_outstanding(false); -*/ + file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf; + _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS); + _lfc.decrOutstandingAioOperationCount(fhp->_file_number); + //fcntlp->set_wr_fhdr_aio_outstanding(false); // TODO: Do we need this? } } @@ -840,35 +784,15 @@ wmgr::is_txn_synced(const std::string& xid) } void -wmgr::initialize(aio_callback* const /*cbp*/, const uint32_t /*wcache_pgsize_sblks*/, const uint16_t /*wcache_num_pages*/) +wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages) { -/* + pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); wmgr::clean(); - _num_jfiles = _jc->num_jfiles(); // TODO: replace for linearstore: _jc->num_jfiles() - if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles)) - { - wmgr::clean(); - std::ostringstream oss; - oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize; - oss << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR__MALLOC, oss.str(), "wmgr", "initialize"); - } - _fhdr_ptr_arr = (void**)std::malloc(_num_jfiles * sizeof(void*)); - MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize"); - _fhdr_aio_cb_arr = (aio_cb**)std::malloc(sizeof(aio_cb*) * _num_jfiles); - MALLOC_CHK(_fhdr_aio_cb_arr, "_fhdr_aio_cb_arr", "wmgr", "initialize"); - std::memset(_fhdr_aio_cb_arr, 0, sizeof(aio_cb*) * _num_jfiles); - for (uint16_t i=0; i<_num_jfiles; i++) - { - _fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i); - _fhdr_aio_cb_arr[i] = new aio_cb; - } _page_cb_arr[0]._state = IN_USE; _ddtokl.clear(); _cached_offset_dblks = 0; _enq_busy = false; -*/ } iores @@ -877,7 +801,7 @@ wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, ) const { // Check status of current file - // TODO: replace for linearstore: _wrfc + // TODO: Replace for LFC /* if (!_wrfc.is_wr_reset()) { @@ -907,13 +831,6 @@ wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, { case WMGR_ENQUEUE: { - // Check for enqueue reaching cutoff threshold - // TODO: replace for linearstore: _wrfc -/* - uint32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize, external)); - if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks)) - return RHM_IORES_ENQCAPTHRESH; -*/ if (!dtokp->is_writable()) { std::ostringstream oss; @@ -948,25 +865,22 @@ wmgr::dequeue_check(const std::string& xid, const uint64_t drid) { // First check emap bool found = false; - int16_t fid = _emap.get_pfid(drid); - if (fid < enq_map::EMAP_OK) // fail - { - if (fid == enq_map::EMAP_RID_NOT_FOUND) - { - if (xid.size()) + int16_t fid; + short eres = _emap.get_pfid(drid, fid); + if (eres < enq_map::EMAP_OK) { // fail + if (eres == enq_map::EMAP_RID_NOT_FOUND) { + if (xid.size()) { found = _tmap.data_exists(xid, drid); - } - else if (fid == enq_map::EMAP_LOCKED) - { + } + } else if (eres == enq_map::EMAP_LOCKED) { std::ostringstream oss; oss << std::hex << "drid=0x" << drid; throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue_check"); } - } - else + } else { found = true; - if (!found) - { + } + if (!found) { std::ostringstream oss; oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid; throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check"); @@ -980,10 +894,10 @@ wmgr::dblk_roundup() uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS; while (_cached_offset_dblks < wdblks) { - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic)); #ifdef RHM_CLEAN - std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic)); + std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic)); #endif _pg_offset_dblks++; _cached_offset_dblks++; @@ -991,28 +905,6 @@ wmgr::dblk_roundup() } void -wmgr::write_fhdr(uint64_t rid, uint16_t fid, uint16_t /*lid*/, std::size_t fro) -{ - file_hdr_t fhdr/*(QLS_FILE_MAGIC, QLS_JRNL_VERSION, rid, fid, lid, fro, _wrfc.owi(), true)*/; - /*int err =*/ ::file_hdr_init(&fhdr, 0, rid, fro, 0, _jc->id().length(), _jc->id().c_str()); - std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr)); -#ifdef RHM_CLEAN - std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr)); -#endif - aio_cb* aiocbp = _fhdr_aio_cb_arr[fid]; -// aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0); // TODO: replace for linearstore: _wrfc - if (aio::submit(_ioctx, 1, &aiocbp) < 0) - throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr"); - _aio_evt_rem++; - // TODO: replace for linearstore: _wrfc -/* - _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); - _wrfc.incr_aio_cnt(); - _wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true); -*/ -} - -void wmgr::rotate_page() { _page_cb_arr[_pg_index]._state = AIO_PENDING; @@ -1026,21 +918,8 @@ wmgr::rotate_page() } void -wmgr::clean() -{ - std::free(_fhdr_base_ptr); - _fhdr_base_ptr = 0; - - std::free(_fhdr_ptr_arr); - _fhdr_ptr_arr = 0; - - if (_fhdr_aio_cb_arr) - { -// for (uint32_t i=0; i<_num_jfiles; i++) -// delete _fhdr_aio_cb_arr[i]; - std::free(_fhdr_aio_cb_arr); - _fhdr_aio_cb_arr = 0; - } +wmgr::clean() { + // Clean up allocated memory here } const std::string @@ -1063,7 +942,7 @@ wmgr::status_str() const default: oss << _page_cb_arr[i]._state; } } - oss << "] " /*<< _wrfc.status_str()*/; // TODO: replace for linearstore: _wrfc + oss << "] " << _lfc.status(0); return oss.str(); } diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h index a4afd8974c..e859f17063 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h @@ -30,15 +30,18 @@ class wmgr; }} #include <cstring> +#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h" #include "qpid/linearstore/jrnl/enums.h" #include "qpid/linearstore/jrnl/pmgr.h" -//#include "qpid/linearstore/jrnl/wrfc.h" #include <set> +class file_hdr_t; + namespace qpid { namespace qls_jrnl { + class LinearFileController; /** * \brief Class for managing a write page cache of arbitrary size and number of pages. @@ -59,17 +62,11 @@ namespace qls_jrnl class wmgr : public pmgr { private: -// wrfc& _wrfc; ///< Ref to write rotating file controller + LinearFileController& _lfc; ///< Linear File Controller ref uint32_t _max_dtokpp; ///< Max data writes per page uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit - void* _fhdr_base_ptr; ///< Base pointer to file header memory - void** _fhdr_ptr_arr; ///< Array of pointers to file headers memory - aio_cb** _fhdr_aio_cb_arr; ///< Array of iocb pointers for file header writes uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list -// uint32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!) -// uint32_t _jfsize_pgs; ///< Journal file size in cache pages -// uint16_t _num_jfiles; ///< Number of files used in iocb mallocs // TODO: Convert _enq_busy etc into a proper threadsafe lock // TODO: Convert to enum? Are these encodes mutually exclusive? @@ -87,9 +84,8 @@ namespace qls_jrnl std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts public: - wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/); - wmgr(jcntl* jc, enq_map& emap, txn_map& tmap/*, wrfc& wrfc*/, const uint32_t max_dtokpp, - const uint32_t max_iowait_us); + wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc); + wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us); virtual ~wmgr(); void initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, @@ -106,7 +102,6 @@ namespace qls_jrnl int32_t get_events(page_state state, timespec* const timeout, bool flush = false); bool is_txn_synced(const std::string& xid); inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; } -// inline bool curr_file_blocked() const; { return _wrfc.aio_cnt() > 0; } inline uint32_t unflushed_dblks() { return _cached_offset_dblks; } // Debug aid @@ -122,9 +117,8 @@ namespace qls_jrnl void file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem); void flush_check(iores& res, bool& cont, bool& done); iores write_flush(); - iores rotate_file(); + void get_next_file(); void dblk_roundup(); - void write_fhdr(uint64_t rid, uint16_t fid, uint16_t lid, std::size_t fro); void rotate_page(); void clean(); }; diff --git a/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh new file mode 100755 index 0000000000..b4b8721edd --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +STORE_DIR=/tmp +LINEARSTOREDIR=~/RedHat/linearstore + +rm -rf $STORE_DIR/qls +rm -rf $STORE_DIR/p002 +rm $STORE_DIR/p004 + +mkdir $STORE_DIR/qls +mkdir $STORE_DIR/p002 +touch $STORE_DIR/p004 +mkdir $STORE_DIR/qls/p001 +touch $STORE_DIR/qls/p003 +ln -s $STORE_DIR/p002 $STORE_DIR/qls/p002 +ln -s $STORE_DIR/p004 $STORE_DIR/qls/p004 + +${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25 +${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25 +${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25 + +${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -l +tree -la $STORE_DIR/qls + |